• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 18330687927

08 Oct 2025 01:12AM UTC coverage: 47.859% (+0.2%) from 47.68%
18330687927

Pull #2682

github

web-flow
Merge 71946e649 into cafb83cb6
Pull Request #2682: core: add raw connection

133 of 150 new or added lines in 6 files covered. (88.67%)

12 existing lines in 5 files now uncovered.

17223 of 35987 relevant lines covered (47.86%)

447.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.1
/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "raw_connection.h"
13
#include "system.h"
14
#include "system_impl.h"
15
#include "serial_connection.h"
16
#include "version.h"
17
#include "server_component_impl.h"
18
#include "overloaded.h"
19
#include "mavlink_channels.h"
20
#include "callback_list.tpp"
21
#include "hostname_to_ip.h"
22
#include "embedded_mavlink_xml.h"
23
#include <mav/MessageSet.h>
24

25
namespace mavsdk {
26

27
template class CallbackList<>;
28

29
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
125✔
30
    timeout_handler(time),
125✔
31
    call_every_handler(time)
250✔
32
{
33
    LogInfo() << "MAVSDK version: " << mavsdk_version;
125✔
34

35
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
125✔
36
        if (std::string(env_p) == "1") {
×
37
            LogDebug() << "Callback debugging is on.";
×
38
            _callback_debugging = true;
×
39
        }
40
    }
41

42
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
125✔
43
        if (std::string(env_p) == "1") {
×
44
            LogDebug() << "Message debugging is on.";
×
45
            _message_logging_on = true;
×
46
        }
47
    }
48

49
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
125✔
50
        if (std::string(env_p) == "1") {
×
51
            LogDebug() << "System debugging is on.";
×
52
            _system_debugging = true;
×
53
        }
54
    }
55

56
    set_configuration(configuration);
125✔
57

58
    // Initialize MessageSet with embedded XML content in dependency order
59
    // This happens at startup before any connections are created, so no synchronization needed
60
    _message_set = std::make_unique<mav::MessageSet>();
125✔
61
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
125✔
62
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
125✔
63
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
125✔
64
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);
125✔
65

66
    // Initialize BufferParser for thread-safe parsing
67
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);
125✔
68

69
    // Start the user callback thread first, so it is ready for anything generated by
70
    // the work thread.
71

72
    _process_user_callbacks_thread =
250✔
73
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);
125✔
74

75
    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
125✔
76
}
125✔
77

78
MavsdkImpl::~MavsdkImpl()
125✔
79
{
80
    {
81
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
125✔
82
        call_every_handler.remove(_heartbeat_send_cookie);
125✔
83
    }
125✔
84

85
    _should_exit = true;
125✔
86

87
    // Stop work first because we don't want to trigger anything that would
88
    // potentially want to call into user code.
89

90
    if (_work_thread != nullptr) {
125✔
91
        _work_thread->join();
125✔
92
        delete _work_thread;
125✔
93
        _work_thread = nullptr;
125✔
94
    }
95

96
    if (_process_user_callbacks_thread != nullptr) {
125✔
97
        _user_callback_queue.stop();
125✔
98
        _process_user_callbacks_thread->join();
125✔
99
        delete _process_user_callbacks_thread;
125✔
100
        _process_user_callbacks_thread = nullptr;
125✔
101
    }
102

103
    std::lock_guard lock(_mutex);
125✔
104

105
    _systems.clear();
125✔
106
    _connections.clear();
125✔
107
}
125✔
108

109
std::string MavsdkImpl::version()
1✔
110
{
111
    static unsigned version_counter = 0;
112

113
    ++version_counter;
1✔
114

115
    switch (version_counter) {
1✔
116
        case 10:
×
117
            return "You were wondering about the name of this library?";
×
118
        case 11:
×
119
            return "Let's look at the history:";
×
120
        case 12:
×
121
            return "DroneLink";
×
122
        case 13:
×
123
            return "DroneCore";
×
124
        case 14:
×
125
            return "DronecodeSDK";
×
126
        case 15:
×
127
            return "MAVSDK";
×
128
        case 16:
×
129
            return "And that's it...";
×
130
        case 17:
×
131
            return "At least for now ¯\\_(ツ)_/¯.";
×
132
        default:
1✔
133
            return mavsdk_version;
1✔
134
    }
135
}
136

137
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
163✔
138
{
139
    std::vector<std::shared_ptr<System>> systems_result{};
163✔
140

141
    std::lock_guard lock(_mutex);
163✔
142
    for (auto& system : _systems) {
259✔
143
        // We ignore the 0 entry because it's just a null system.
144
        // It's only created because the older, deprecated API needs a
145
        // reference.
146
        if (system.first == 0) {
96✔
147
            continue;
×
148
        }
149
        systems_result.push_back(system.second);
96✔
150
    }
151

152
    return systems_result;
163✔
153
}
163✔
154

155
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
51✔
156
{
157
    {
158
        std::lock_guard lock(_mutex);
51✔
159
        for (auto system : _systems) {
52✔
160
            if (system.second->is_connected() && system.second->has_autopilot()) {
4✔
161
                return system.second;
3✔
162
            }
163
        }
4✔
164
    }
51✔
165

166
    if (timeout_s == 0.0) {
48✔
167
        // Don't wait at all.
168
        return {};
×
169
    }
170

171
    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
48✔
172
    auto fut = prom->get_future();
48✔
173

174
    auto flag = std::make_shared<std::once_flag>();
48✔
175
    auto handle = subscribe_on_new_system([this, prom, flag]() {
48✔
176
        // Check all systems, not just the first one
177
        auto all_systems = systems();
49✔
178
        for (auto& system : all_systems) {
51✔
179
            if (system->is_connected() && system->has_autopilot()) {
50✔
180
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
96✔
181
                break;
48✔
182
            }
183
        }
184
    });
49✔
185

186
    if (timeout_s > 0.0) {
48✔
187
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
48✔
188
            std::future_status::ready) {
189
            unsubscribe_on_new_system(handle);
48✔
190
            return fut.get();
48✔
191

192
        } else {
193
            unsubscribe_on_new_system(handle);
×
194
            return std::nullopt;
×
195
        }
196
    } else {
197
        fut.wait();
×
198
        unsubscribe_on_new_system(handle);
×
199
        return std::optional(fut.get());
×
200
    }
201
}
48✔
202

203
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
51✔
204
{
205
    std::lock_guard lock(_mutex);
51✔
206

207
    auto component_type = _configuration.get_component_type();
51✔
208
    switch (component_type) {
51✔
209
        case ComponentType::Autopilot:
51✔
210
        case ComponentType::GroundStation:
211
        case ComponentType::CompanionComputer:
212
        case ComponentType::Camera:
213
        case ComponentType::Gimbal:
214
        case ComponentType::RemoteId:
215
        case ComponentType::Custom:
216
            return server_component_by_type(component_type, instance);
51✔
217
        default:
×
218
            LogErr() << "Unknown component type";
×
219
            return {};
×
220
    }
221
}
51✔
222

223
std::shared_ptr<ServerComponent>
224
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
51✔
225
{
226
    switch (server_component_type) {
51✔
227
        case ComponentType::Autopilot:
37✔
228
            if (instance == 0) {
37✔
229
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1);
37✔
230
            } else {
231
                LogErr() << "Only autopilot instance 0 is valid";
×
232
                return {};
×
233
            }
234

235
        case ComponentType::GroundStation:
×
236
            if (instance == 0) {
×
237
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER);
×
238
            } else {
239
                LogErr() << "Only one ground station supported at this time";
×
240
                return {};
×
241
            }
242

243
        case ComponentType::CompanionComputer:
1✔
244
            if (instance == 0) {
1✔
245
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER);
1✔
246
            } else if (instance == 1) {
×
247
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2);
×
248
            } else if (instance == 2) {
×
249
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3);
×
250
            } else if (instance == 3) {
×
251
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4);
×
252
            } else {
253
                LogErr() << "Only companion computer 0..3 are supported";
×
254
                return {};
×
255
            }
256

257
        case ComponentType::Camera:
13✔
258
            if (instance == 0) {
13✔
259
                return server_component_by_id(MAV_COMP_ID_CAMERA);
13✔
260
            } else if (instance == 1) {
×
261
                return server_component_by_id(MAV_COMP_ID_CAMERA2);
×
262
            } else if (instance == 2) {
×
263
                return server_component_by_id(MAV_COMP_ID_CAMERA3);
×
264
            } else if (instance == 3) {
×
265
                return server_component_by_id(MAV_COMP_ID_CAMERA4);
×
266
            } else if (instance == 4) {
×
267
                return server_component_by_id(MAV_COMP_ID_CAMERA5);
×
268
            } else if (instance == 5) {
×
269
                return server_component_by_id(MAV_COMP_ID_CAMERA6);
×
270
            } else {
271
                LogErr() << "Only camera 0..5 are supported";
×
272
                return {};
×
273
            }
274

275
        default:
×
276
            LogErr() << "Unknown server component type";
×
277
            return {};
×
278
    }
279
}
280

281
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id(uint8_t component_id)
51✔
282
{
283
    if (component_id == 0) {
51✔
284
        LogErr() << "Server component with component ID 0 not allowed";
×
285
        return nullptr;
×
286
    }
287

288
    std::lock_guard lock(_server_components_mutex);
51✔
289

290
    return server_component_by_id_with_lock(component_id);
51✔
291
}
51✔
292

293
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id)
239✔
294
{
295
    for (auto& it : _server_components) {
240✔
296
        if (it.first == component_id) {
114✔
297
            if (it.second != nullptr) {
113✔
298
                return it.second;
113✔
299
            } else {
300
                it.second = std::make_shared<ServerComponent>(*this, component_id);
×
301
            }
302
        }
303
    }
304

305
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
126✔
306
        component_id, std::make_shared<ServerComponent>(*this, component_id)));
252✔
307

308
    return _server_components.back().second;
126✔
309
}
310

311
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
42✔
312
{
313
    // Forward_message Function implementing Mavlink routing rules.
314
    // See https://mavlink.io/en/guide/routing.html
315

316
    bool forward_heartbeats_enabled = true;
42✔
317
    const uint8_t target_system_id = get_target_system_id(message);
42✔
318
    const uint8_t target_component_id = get_target_component_id(message);
42✔
319

320
    // If it's a message only for us, we keep it, otherwise, we forward it.
321
    const bool targeted_only_at_us =
322
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
42✔
323

324
    // We don't forward heartbeats unless it's specifically enabled.
325
    const bool heartbeat_check_ok =
42✔
326
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
42✔
327

328
    if (!targeted_only_at_us && heartbeat_check_ok) {
42✔
329
        unsigned successful_emissions = 0;
37✔
330
        for (auto& entry : _connections) {
111✔
331
            // Check whether the connection is not the one from which we received the message.
332
            // And also check if the connection was set to forward messages.
333
            if (entry.connection.get() == connection ||
111✔
334
                !entry.connection->should_forward_messages()) {
37✔
335
                continue;
37✔
336
            }
337
            auto result = (*entry.connection).send_message(message);
37✔
338
            if (result.first) {
37✔
339
                successful_emissions++;
36✔
340
            } else {
341
                _connections_errors_subscriptions.queue(
2✔
342
                    Mavsdk::ConnectionError{result.second, entry.handle},
1✔
343
                    [this](const auto& func) { call_user_callback(func); });
×
344
            }
345
        }
37✔
346
        if (successful_emissions == 0) {
37✔
347
            LogErr() << "Message forwarding failed";
1✔
348
        }
349
    }
350
}
42✔
351

352
void MavsdkImpl::receive_message(mavlink_message_t& message, Connection* connection)
2,398✔
353
{
354
    {
355
        std::lock_guard lock(_received_messages_mutex);
2,398✔
356
        _received_messages.emplace(ReceivedMessage{std::move(message), connection});
2,401✔
357
    }
2,403✔
358
    _received_messages_cv.notify_one();
2,402✔
359
}
2,403✔
360

361
void MavsdkImpl::receive_libmav_message(
2,397✔
362
    const Mavsdk::MavlinkMessage& message, Connection* connection)
363
{
364
    {
365
        std::lock_guard lock(_received_libmav_messages_mutex);
2,397✔
366
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
2,399✔
367
    }
2,400✔
368
    _received_libmav_messages_cv.notify_one();
2,396✔
369
}
2,400✔
370

371
void MavsdkImpl::process_messages()
25,771✔
372
{
373
    std::lock_guard lock(_received_messages_mutex);
25,771✔
374
    while (!_received_messages.empty()) {
28,092✔
375
        auto message_copied = _received_messages.front();
2,388✔
376
        process_message(message_copied.message, message_copied.connection_ptr);
2,386✔
377
        _received_messages.pop();
2,387✔
378
    }
379
}
25,492✔
380

381
void MavsdkImpl::process_libmav_messages()
25,689✔
382
{
383
    std::lock_guard lock(_received_libmav_messages_mutex);
25,689✔
384
    while (!_received_libmav_messages.empty()) {
27,915✔
385
        auto message_copied = _received_libmav_messages.front();
2,383✔
386
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
2,379✔
387
        _received_libmav_messages.pop();
2,383✔
388
    }
2,382✔
389
}
25,557✔
390

391
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
2,387✔
392
{
393
    // Assumes _received_messages_mutex
394

395
    if (_message_logging_on) {
2,387✔
396
        LogDebug() << "Processing message " << message.msgid << " from "
×
397
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
398
    }
399

400
    if (_should_exit) {
2,387✔
401
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
402
        return;
×
403
    }
404

405
    {
406
        std::lock_guard lock(_mutex);
2,388✔
407

408
        /** @note: Forward message FIRST (before intercept) if option is enabled and multiple
409
         * interfaces are connected. This ensures that forwarded messages are not affected by
410
         * intercept modifications. Performs message forwarding checks for every messages if message
411
         * forwarding is enabled on at least one connection, and in case of a single forwarding
412
         * connection, we check that it is not the one which received the current message.
413
         *
414
         * Conditions:
415
         * 1. At least 2 connections.
416
         * 2. At least 1 forwarding connection.
417
         * 3. At least 2 forwarding connections or current connection is not forwarding.
418
         */
419

420
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
2,430✔
421
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
42✔
422
             !connection->should_forward_messages())) {
×
423
            if (_message_logging_on) {
42✔
424
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
425
                           << static_cast<int>(message.sysid) << "/"
×
426
                           << static_cast<int>(message.compid);
×
427
            }
428
            forward_message(message, connection);
42✔
429
        }
430

431
        if (_should_exit) {
2,386✔
432
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
433
            return;
×
434
        }
435

436
        // This is a low level interface where incoming messages can be tampered
437
        // with or even dropped FOR LOCAL PROCESSING ONLY (after forwarding).
438
        {
439
            bool keep = true;
2,385✔
440
            {
441
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
2,385✔
442
                if (_intercept_incoming_messages_callback != nullptr) {
2,388✔
443
                    keep = _intercept_incoming_messages_callback(message);
242✔
444
                }
445
            }
2,386✔
446

447
            if (!keep) {
2,387✔
448
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
32✔
449
                return;
32✔
450
            }
451
        }
452

453
        // Don't ever create a system with sysid 0.
454
        if (message.sysid == 0) {
2,355✔
455
            if (_message_logging_on) {
×
456
                LogDebug() << "Ignoring message with sysid == 0";
×
457
            }
458
            return;
×
459
        }
460

461
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
462
        // is also implementing a ground station and not if it is used in another
463
        // configuration, e.g. on a companion.
464
        //
465
        // This is a workaround because PX4 started forwarding messages between
466
        // mavlink instances which leads to existing implementations (including
467
        // examples and integration tests) to connect to QGroundControl by accident
468
        // instead of PX4 because the check `has_autopilot()` is not used.
469

470
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,355✔
471
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
2,354✔
472
            if (_message_logging_on) {
×
473
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
474
            }
475
            return;
×
476
        }
477

478
        bool found_system = false;
2,354✔
479
        for (auto& system : _systems) {
2,387✔
480
            if (system.first == message.sysid) {
2,264✔
481
                system.second->system_impl()->add_new_component(message.compid);
2,231✔
482
                found_system = true;
2,229✔
483
                break;
2,229✔
484
            }
485
        }
486

487
        if (!found_system) {
2,352✔
488
            if (_system_debugging) {
124✔
489
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
490
                          << (int)message.compid;
×
491
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
492
                          << (int)message.len;
×
493
                std::string bytes = "";
×
494
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
495
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
496
                }
497
                LogWarn() << "Bytes: " << bytes;
×
498
            }
×
499
            make_system_with_component(message.sysid, message.compid);
124✔
500

501
            // We now better talk back.
502
            start_sending_heartbeats();
124✔
503
        }
504

505
        if (_should_exit) {
2,352✔
506
            // Don't try to call at() if systems have already been destroyed
507
            // in destructor.
508
            return;
×
509
        }
510
    }
2,383✔
511

512
    mavlink_message_handler.process_message(message);
2,355✔
513

514
    for (auto& system : _systems) {
2,391✔
515
        if (system.first == message.sysid) {
2,390✔
516
            system.second->system_impl()->process_mavlink_message(message);
2,355✔
517
            break;
2,353✔
518
        }
519
    }
520
}
521

522
void MavsdkImpl::process_libmav_message(
2,383✔
523
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
524
{
525
    // Assumes _received_libmav_messages_mutex
526

527
    if (_message_logging_on) {
2,383✔
528
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
×
529
                   << static_cast<int>(message.system_id) << "/"
×
530
                   << static_cast<int>(message.component_id);
×
531
    }
532

533
    // JSON message interception for incoming messages
534
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
2,383✔
535
        // Message was dropped by interception callback
536
        if (_message_logging_on) {
×
537
            LogDebug() << "Incoming JSON message " << message.message_name
×
538
                       << " dropped by interception";
×
539
        }
540
        return;
×
541
    }
542

543
    if (_message_logging_on) {
2,382✔
544
        LogDebug() << "Processing libmav message " << message.message_name << " from "
×
545
                   << static_cast<int>(message.system_id) << "/"
×
546
                   << static_cast<int>(message.component_id);
×
547
    }
548

549
    if (_should_exit) {
2,382✔
550
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
551
        return;
×
552
    }
553

554
    {
555
        std::lock_guard lock(_mutex);
2,380✔
556

557
        // Don't ever create a system with sysid 0.
558
        if (message.system_id == 0) {
2,380✔
559
            if (_message_logging_on) {
×
560
                LogDebug() << "Ignoring libmav message with sysid == 0";
×
561
            }
562
            return;
×
563
        }
564

565
        // Filter out QGroundControl messages similar to regular mavlink processing
566
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,380✔
567
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
2,379✔
568
            if (_message_logging_on) {
×
569
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
×
570
            }
571
            return;
×
572
        }
573

574
        bool found_system = false;
2,379✔
575
        for (auto& system : _systems) {
2,412✔
576
            if (system.first == message.system_id) {
2,409✔
577
                system.second->system_impl()->add_new_component(message.component_id);
2,376✔
578
                found_system = true;
2,376✔
579
                break;
2,376✔
580
            }
581
        }
582

583
        if (!found_system) {
2,384✔
584
            if (_system_debugging) {
6✔
585
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
×
586
                          << "/" << (int)message.component_id;
×
587
            }
588
            make_system_with_component(message.system_id, message.component_id);
6✔
589

590
            // We now better talk back.
591
            start_sending_heartbeats();
6✔
592
        }
593

594
        if (_should_exit) {
2,384✔
595
            // Don't try to call at() if systems have already been destroyed
596
            // in destructor.
597
            return;
×
598
        }
599
    }
2,379✔
600

601
    // Distribute libmav message to systems for libmav-specific handling
602
    bool found_system = false;
2,382✔
603
    for (auto& system : _systems) {
4,827✔
604
        if (system.first == message.system_id) {
2,445✔
605
            if (_message_logging_on) {
2,380✔
606
                LogDebug() << "Distributing libmav message " << message.message_name
×
607
                           << " to SystemImpl for system " << system.first;
×
608
            }
609
            system.second->system_impl()->process_libmav_message(message);
2,380✔
610
            found_system = true;
2,380✔
611
            // Don't break - distribute to all matching system instances
612
        }
613
    }
614

615
    if (!found_system) {
2,380✔
616
        LogWarn() << "No system found for libmav message " << message.message_name
×
617
                  << " from system " << message.system_id;
×
618
    }
619
}
620

621
bool MavsdkImpl::send_message(mavlink_message_t& message)
2,504✔
622
{
623
    // Create a copy of the message to avoid reference issues
624
    mavlink_message_t message_copy = message;
2,504✔
625

626
    {
627
        std::lock_guard lock(_messages_to_send_mutex);
2,504✔
628
        _messages_to_send.push(std::move(message_copy));
2,503✔
629
    }
2,507✔
630

631
    // For heartbeat messages, we want to process them immediately to speed up system discovery
632
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
2,506✔
633
        // Trigger message processing in the work thread
634
        // This is a hint to process messages sooner, but doesn't block
635
        std::this_thread::yield();
374✔
636
    }
637

638
    return true;
2,507✔
639
}
640

641
void MavsdkImpl::deliver_messages()
28,243✔
642
{
643
    // Process messages one at a time to avoid holding the mutex while delivering
644
    while (true) {
645
        mavlink_message_t message;
646
        {
647
            std::lock_guard lock(_messages_to_send_mutex);
28,243✔
648
            if (_messages_to_send.empty()) {
28,045✔
649
                break;
25,565✔
650
            }
651
            message = _messages_to_send.front();
2,506✔
652
            _messages_to_send.pop();
2,506✔
653
        }
28,069✔
654
        deliver_message(message);
2,507✔
655
    }
2,507✔
656
}
25,469✔
657

658
void MavsdkImpl::deliver_message(mavlink_message_t& message)
2,507✔
659
{
660
    if (_message_logging_on) {
2,507✔
661
        LogDebug() << "Sending message " << message.msgid << " from "
×
662
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
663
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
664
                   << static_cast<int>(get_target_component_id(message));
×
665
    }
666

667
    // This is a low level interface where outgoing messages can be tampered
668
    // with or even dropped.
669
    bool keep = true;
2,507✔
670
    {
671
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
2,507✔
672
        if (_intercept_outgoing_messages_callback != nullptr) {
2,505✔
673
            keep = _intercept_outgoing_messages_callback(message);
226✔
674
        }
675
    }
2,505✔
676

677
    if (!keep) {
2,506✔
678
        // We fake that everything was sent as instructed because
679
        // a potential loss would happen later, and we would not be informed
680
        // about it.
681
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
87✔
682
        return;
150✔
683
    }
684

685
    // JSON message interception for outgoing messages
686
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
687
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
688
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);
2,419✔
689

690
    size_t bytes_consumed = 0;
2,420✔
691
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);
2,420✔
692

693
    if (libmav_msg_opt) {
2,418✔
694
        // Create Mavsdk::MavlinkMessage directly for JSON interception
695
        Mavsdk::MavlinkMessage json_message;
2,419✔
696
        json_message.message_name = libmav_msg_opt.value().name();
2,417✔
697
        json_message.system_id = message.sysid;
2,416✔
698
        json_message.component_id = message.compid;
2,416✔
699

700
        // Extract target_system and target_component if present
701
        uint8_t target_system_id = 0;
2,416✔
702
        uint8_t target_component_id = 0;
2,416✔
703
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
2,416✔
704
            mav::MessageResult::Success) {
705
            json_message.target_system_id = target_system_id;
1,820✔
706
        } else {
707
            json_message.target_system_id = 0;
595✔
708
        }
709
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
2,415✔
710
            mav::MessageResult::Success) {
711
            json_message.target_component_id = target_component_id;
1,820✔
712
        } else {
713
            json_message.target_component_id = 0;
598✔
714
        }
715

716
        // Generate JSON using LibmavReceiver's public method
717
        auto connections = get_connections();
2,418✔
718
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
2,418✔
719
            json_message.fields_json =
720
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
7,063✔
721
                    libmav_msg_opt.value());
4,710✔
722
        } else {
723
            // Fallback: create minimal JSON if no receiver available
724
            json_message.fields_json =
725
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
126✔
726
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
126✔
727
        }
728

729
        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
2,419✔
730
            // Message was dropped by JSON interception callback
731
            if (_message_logging_on) {
2✔
732
                LogDebug() << "Outgoing JSON message " << json_message.message_name
×
733
                           << " dropped by interception";
×
734
            }
735
            return;
×
736
        }
737
    }
2,415✔
738

739
    std::lock_guard lock(_mutex);
2,414✔
740

741
    if (_connections.empty()) {
2,418✔
742
        // We obviously can't send any messages without a connection added, so
743
        // we silently ignore this.
744
        return;
63✔
745
    }
746

747
    uint8_t successful_emissions = 0;
2,348✔
748
    for (auto& _connection : _connections) {
4,722✔
749
        const uint8_t target_system_id = get_target_system_id(message);
2,373✔
750

751
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
2,368✔
752
            continue;
5✔
753
        }
754
        const auto result = (*_connection.connection).send_message(message);
2,358✔
755
        if (result.first) {
2,369✔
756
            successful_emissions++;
2,367✔
757
        } else {
758
            _connections_errors_subscriptions.queue(
4✔
759
                Mavsdk::ConnectionError{result.second, _connection.handle},
2✔
760
                [this](const auto& func) { call_user_callback(func); });
×
761
        }
762
    }
2,369✔
763

764
    if (successful_emissions == 0) {
2,354✔
765
        LogErr() << "Sending message failed";
×
766
    }
767
}
2,419✔
768

769
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
127✔
770
    const std::string& connection_url, ForwardingOption forwarding_option)
771
{
772
    CliArg cli_arg;
127✔
773
    if (!cli_arg.parse(connection_url)) {
127✔
774
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
775
    }
776

777
    return std::visit(
127✔
778
        overloaded{
254✔
779
            [](std::monostate) {
×
780
                // Should not happen anyway.
781
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
782
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
783
            },
784
            [this, forwarding_option](const CliArg::Udp& udp) {
124✔
785
                return add_udp_connection(udp, forwarding_option);
124✔
786
            },
787
            [this, forwarding_option](const CliArg::Tcp& tcp) {
2✔
788
                return add_tcp_connection(tcp, forwarding_option);
2✔
789
            },
790
            [this, forwarding_option](const CliArg::Serial& serial) {
×
791
                return add_serial_connection(
×
792
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
793
            },
794
            [this, forwarding_option](const CliArg::Raw&) {
1✔
795
                return add_raw_connection(forwarding_option);
1✔
796
            }},
797
        cli_arg.protocol);
127✔
798
}
127✔
799

800
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
801
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
124✔
802
{
803
    auto new_conn = std::make_unique<UdpConnection>(
804
        [this](mavlink_message_t& message, Connection* connection) {
2,379✔
805
            receive_message(message, connection);
2,379✔
806
        },
2,382✔
807
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
2,381✔
808
            receive_libmav_message(message, connection);
2,381✔
809
        },
2,379✔
810
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
811
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
248✔
812
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
124✔
813
        forwarding_option);
124✔
814

815
    if (!new_conn) {
124✔
816
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
817
    }
818

819
    ConnectionResult ret = new_conn->start();
124✔
820

821
    if (ret != ConnectionResult::Success) {
124✔
822
        return {ret, Mavsdk::ConnectionHandle{}};
×
823
    }
824

825
    if (udp.mode == CliArg::Udp::Mode::Out) {
124✔
826
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
827
        // one for the IP, and one for a hostname.
828
        auto remote_ip = resolve_hostname_to_ip(udp.host);
62✔
829

830
        if (!remote_ip) {
62✔
831
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
832
        }
833

834
        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
62✔
835
        std::lock_guard lock(_mutex);
62✔
836

837
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
838
        auto new_configuration = get_configuration();
62✔
839
        new_configuration.set_always_send_heartbeats(true);
62✔
840
        set_configuration(new_configuration);
62✔
841
    }
62✔
842

843
    auto handle = add_connection(std::move(new_conn));
124✔
844

845
    return {ConnectionResult::Success, handle};
124✔
846
}
124✔
847

848
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
849
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
2✔
850
{
851
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
2✔
852
        auto new_conn = std::make_unique<TcpClientConnection>(
853
            [this](mavlink_message_t& message, Connection* connection) {
7✔
854
                receive_message(message, connection);
7✔
855
            },
7✔
856
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
7✔
857
                receive_libmav_message(message, connection);
7✔
858
            },
7✔
859
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
860
            tcp.host,
1✔
861
            tcp.port,
1✔
862
            forwarding_option);
1✔
863
        if (!new_conn) {
1✔
864
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
865
        }
866
        ConnectionResult ret = new_conn->start();
1✔
867
        if (ret == ConnectionResult::Success) {
1✔
868
            return {ret, add_connection(std::move(new_conn))};
1✔
869
        } else {
870
            return {ret, Mavsdk::ConnectionHandle{}};
×
871
        }
872
    } else {
1✔
873
        auto new_conn = std::make_unique<TcpServerConnection>(
874
            [this](mavlink_message_t& message, Connection* connection) {
13✔
875
                receive_message(message, connection);
13✔
876
            },
13✔
877
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
11✔
878
                receive_libmav_message(message, connection);
11✔
879
            },
11✔
880
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
881
            tcp.host,
1✔
882
            tcp.port,
1✔
883
            forwarding_option);
1✔
884
        if (!new_conn) {
1✔
885
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
886
        }
887
        ConnectionResult ret = new_conn->start();
1✔
888
        if (ret == ConnectionResult::Success) {
1✔
889
            return {ret, add_connection(std::move(new_conn))};
1✔
890
        } else {
891
            return {ret, Mavsdk::ConnectionHandle{}};
×
892
        }
893
    }
1✔
894
}
895

896
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
897
    const std::string& dev_path,
898
    int baudrate,
899
    bool flow_control,
900
    ForwardingOption forwarding_option)
901
{
902
    auto new_conn = std::make_unique<SerialConnection>(
903
        [this](mavlink_message_t& message, Connection* connection) {
×
904
            receive_message(message, connection);
×
905
        },
×
906
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
×
907
            receive_libmav_message(message, connection);
×
908
        },
×
909
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
910
        dev_path,
911
        baudrate,
912
        flow_control,
913
        forwarding_option);
×
914
    if (!new_conn) {
×
915
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
916
    }
917
    ConnectionResult ret = new_conn->start();
×
918
    if (ret == ConnectionResult::Success) {
×
919
        auto handle = add_connection(std::move(new_conn));
×
920

921
        auto new_configuration = get_configuration();
×
922

923
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
924
        // to initiate the MAVLink connection by sending heartbeats.
925
        // Therefore, we override the default here and enable sending heartbeats.
926
        new_configuration.set_always_send_heartbeats(true);
×
927
        set_configuration(new_configuration);
×
928

929
        return {ret, handle};
×
930

931
    } else {
932
        return {ret, Mavsdk::ConnectionHandle{}};
×
933
    }
934
}
×
935

936
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
937
MavsdkImpl::add_raw_connection(ForwardingOption forwarding_option)
1✔
938
{
939
    // Check if a raw connection already exists
940
    if (find_raw_connection() != nullptr) {
1✔
NEW
941
        LogErr() << "Raw connection already exists. Only one raw connection is allowed.";
×
NEW
942
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
943
    }
944

945
    auto new_conn = std::make_unique<RawConnection>(
946
        [this](mavlink_message_t& message, Connection* connection) {
1✔
947
            receive_message(message, connection);
1✔
948
        },
1✔
949
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
1✔
950
            receive_libmav_message(message, connection);
1✔
951
        },
1✔
952
        *this,
953
        forwarding_option);
1✔
954

955
    if (!new_conn) {
1✔
NEW
956
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
957
    }
958

959
    ConnectionResult ret = new_conn->start();
1✔
960
    if (ret != ConnectionResult::Success) {
1✔
NEW
961
        return {ret, Mavsdk::ConnectionHandle{}};
×
962
    }
963

964
    auto handle = add_connection(std::move(new_conn));
1✔
965

966
    // Enable heartbeats for raw connection
967
    auto new_configuration = get_configuration();
1✔
968
    new_configuration.set_always_send_heartbeats(true);
1✔
969
    set_configuration(new_configuration);
1✔
970

971
    return {ConnectionResult::Success, handle};
1✔
972
}
1✔
973

974
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
127✔
975
{
976
    std::lock_guard lock(_mutex);
127✔
977
    auto handle = _connections_handle_factory.create();
127✔
978
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
127✔
979

980
    return handle;
254✔
981
}
127✔
982

983
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
×
984
{
985
    std::lock_guard lock(_mutex);
×
986

987
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
×
988
        return (entry.handle == handle);
×
989
    }));
990
}
×
991

992
Mavsdk::Configuration MavsdkImpl::get_configuration() const
63✔
993
{
994
    std::lock_guard configuration_lock(_mutex);
63✔
995
    return _configuration;
126✔
996
}
63✔
997

998
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
188✔
999
{
1000
    std::lock_guard server_components_lock(_server_components_mutex);
188✔
1001
    // We just point the default to the newly created component. This means
1002
    // that the previous default component will be deleted if it is not
1003
    // used/referenced anywhere.
1004
    _default_server_component =
1005
        server_component_by_id_with_lock(new_configuration.get_component_id());
188✔
1006

1007
    if (new_configuration.get_always_send_heartbeats() &&
314✔
1008
        !_configuration.get_always_send_heartbeats()) {
126✔
1009
        start_sending_heartbeats();
64✔
1010
    } else if (
124✔
1011
        !new_configuration.get_always_send_heartbeats() &&
186✔
1012
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
186✔
1013
        stop_sending_heartbeats();
×
1014
    }
1015

1016
    _configuration = new_configuration;
188✔
1017
    // We cache these values as atomic to avoid having to lock any mutex for them.
1018
    _our_system_id = new_configuration.get_system_id();
188✔
1019
    _our_component_id = new_configuration.get_component_id();
188✔
1020
}
188✔
1021

1022
uint8_t MavsdkImpl::get_own_system_id() const
5,705✔
1023
{
1024
    return _our_system_id;
5,705✔
1025
}
1026

1027
uint8_t MavsdkImpl::get_own_component_id() const
1,454✔
1028
{
1029
    return _our_component_id;
1,454✔
1030
}
1031

1032
uint8_t MavsdkImpl::channel() const
×
1033
{
1034
    // TODO
1035
    return 0;
×
1036
}
1037

1038
Autopilot MavsdkImpl::autopilot() const
×
1039
{
1040
    // TODO
1041
    return Autopilot::Px4;
×
1042
}
1043

1044
// FIXME: this should be per component
1045
uint8_t MavsdkImpl::get_mav_type() const
368✔
1046
{
1047
    return _configuration.get_mav_type();
368✔
1048
}
1049

1050
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
130✔
1051
{
1052
    // Needs _systems_lock
1053

1054
    if (_should_exit) {
130✔
1055
        // When the system got destroyed in the destructor, we have to give up.
1056
        return;
×
1057
    }
1058

1059
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
130✔
1060
        LogDebug() << "Initializing connection to remote system...";
×
1061
    } else {
1062
        LogDebug() << "New system ID: " << static_cast<int>(system_id)
260✔
1063
                   << " Comp ID: " << static_cast<int>(comp_id);
130✔
1064
    }
1065

1066
    // Make a system with its first component
1067
    auto new_system = std::make_shared<System>(*this);
130✔
1068
    new_system->init(system_id, comp_id);
130✔
1069

1070
    _systems.emplace_back(system_id, new_system);
130✔
1071
}
130✔
1072

1073
void MavsdkImpl::notify_on_discover()
130✔
1074
{
1075
    // Queue the callbacks without holding the mutex to avoid deadlocks
1076
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
189✔
1077
}
129✔
1078

1079
void MavsdkImpl::notify_on_timeout()
×
1080
{
1081
    // Queue the callbacks without holding the mutex to avoid deadlocks
1082
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1083
}
×
1084

1085
Mavsdk::NewSystemHandle
1086
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
58✔
1087
{
1088
    std::lock_guard lock(_mutex);
58✔
1089

1090
    const auto handle = _new_system_callbacks.subscribe(callback);
58✔
1091

1092
    if (is_any_system_connected()) {
58✔
1093
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1094
    }
1095

1096
    return handle;
116✔
1097
}
58✔
1098

1099
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
57✔
1100
{
1101
    _new_system_callbacks.unsubscribe(handle);
57✔
1102
}
57✔
1103

1104
bool MavsdkImpl::is_any_system_connected() const
58✔
1105
{
1106
    std::vector<std::shared_ptr<System>> connected_systems = systems();
58✔
1107
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
58✔
1108
        return system->is_connected();
×
1109
    });
58✔
1110
}
58✔
1111

1112
void MavsdkImpl::work_thread()
125✔
1113
{
1114
    while (!_should_exit) {
25,763✔
1115
        // Process incoming messages
1116
        process_messages();
25,500✔
1117

1118
        // Process incoming libmav messages
1119
        process_libmav_messages();
25,575✔
1120

1121
        // Run timers
1122
        timeout_handler.run_once();
25,455✔
1123
        call_every_handler.run_once();
25,585✔
1124

1125
        // Do component work
1126
        {
1127
            std::lock_guard lock(_server_components_mutex);
25,686✔
1128
            for (auto& it : _server_components) {
51,303✔
1129
                if (it.second != nullptr) {
25,765✔
1130
                    it.second->_impl->do_work();
25,590✔
1131
                }
1132
            }
1133
        }
25,421✔
1134

1135
        // Deliver outgoing messages
1136
        deliver_messages();
25,466✔
1137

1138
        // If no messages to send, check if there are messages to receive
1139
        std::unique_lock lock_received(_received_messages_mutex);
25,449✔
1140
        if (_received_messages.empty()) {
25,563✔
1141
            // No messages to process, wait for a signal or timeout
1142
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
25,318✔
1143
                return !_received_messages.empty() || _should_exit;
51,047✔
1144
            });
1145
        }
1146
    }
25,742✔
1147
}
97✔
1148

1149
void MavsdkImpl::call_user_callback_located(
1,204✔
1150
    const std::string& filename, const int linenumber, const std::function<void()>& func)
1151
{
1152
    // Don't enqueue callbacks if we're shutting down
1153
    if (_should_exit) {
1,204✔
1154
        return;
×
1155
    }
1156

1157
    auto callback_size = _user_callback_queue.size();
1,203✔
1158
    if (callback_size == 10) {
1,204✔
1159
        LogWarn()
1✔
1160
            << "User callback queue too slow.\n"
1161
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1162

1163
    } else if (callback_size == 99) {
1,203✔
1164
        LogErr()
×
1165
            << "User callback queue overflown\n"
1166
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1167

1168
    } else if (callback_size == 100) {
1,203✔
1169
        return;
×
1170
    }
1171

1172
    // We only need to keep track of filename and linenumber if we're actually debugging this.
1173
    UserCallback user_callback =
1174
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,408✔
1175

1176
    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
1,204✔
1177
}
1,203✔
1178

1179
void MavsdkImpl::process_user_callbacks_thread()
125✔
1180
{
1181
    while (!_should_exit) {
1,454✔
1182
        UserCallback callback;
1,328✔
1183
        {
1184
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
1,329✔
1185
            auto ptr = guard.wait_and_pop_front();
1,329✔
1186
            if (!ptr) {
1,329✔
1187
                continue;
125✔
1188
            }
1189
            // We need to get a copy instead of just a shared_ptr because the queue might
1190
            // be invalidated when the lock is released.
1191
            callback = *ptr;
1,204✔
1192
        }
1,453✔
1193

1194
        // Check if we're in the process of shutting down before executing the callback
1195
        if (_should_exit) {
1,204✔
1196
            continue;
×
1197
        }
1198

1199
        const double timeout_s = 1.0;
1,204✔
1200
        auto cookie = timeout_handler.add(
1,204✔
1201
            [&]() {
×
1202
                if (_callback_debugging) {
×
1203
                    LogWarn() << "Callback called from " << callback.filename << ":"
×
1204
                              << callback.linenumber << " took more than " << timeout_s
×
1205
                              << " second to run.";
×
1206
                    fflush(stdout);
×
1207
                    fflush(stderr);
×
1208
                    abort();
×
1209
                } else {
1210
                    LogWarn()
×
1211
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
1212
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1213
                }
1214
            },
×
1215
            timeout_s);
1216
        callback.func();
1,203✔
1217
        timeout_handler.remove(cookie);
1,204✔
1218
    }
1,329✔
1219
}
126✔
1220

1221
void MavsdkImpl::start_sending_heartbeats()
324✔
1222
{
1223
    // Check if we're in the process of shutting down
1224
    if (_should_exit) {
324✔
1225
        return;
×
1226
    }
1227

1228
    // Before sending out first heartbeats we need to make sure we have a
1229
    // default server component.
1230
    default_server_component_impl();
324✔
1231

1232
    {
1233
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
323✔
1234
        call_every_handler.remove(_heartbeat_send_cookie);
324✔
1235
        _heartbeat_send_cookie =
324✔
1236
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
692✔
1237
    }
324✔
1238
}
1239

1240
void MavsdkImpl::stop_sending_heartbeats()
×
1241
{
1242
    if (!_configuration.get_always_send_heartbeats()) {
×
1243
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
×
1244
        call_every_handler.remove(_heartbeat_send_cookie);
×
1245
    }
×
1246
}
×
1247

1248
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,305✔
1249
{
1250
    std::lock_guard lock(_server_components_mutex);
1,305✔
1251
    return default_server_component_with_lock();
1,304✔
1252
}
1,304✔
1253

1254
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,305✔
1255
{
1256
    if (_default_server_component == nullptr) {
1,305✔
1257
        _default_server_component = server_component_by_id_with_lock(_our_component_id);
×
1258
    }
1259
    return *_default_server_component->_impl;
1,304✔
1260
}
1261

1262
void MavsdkImpl::send_heartbeats()
364✔
1263
{
1264
    std::lock_guard lock(_server_components_mutex);
364✔
1265

1266
    for (auto& it : _server_components) {
735✔
1267
        if (it.second != nullptr) {
368✔
1268
            it.second->_impl->send_heartbeat();
364✔
1269
        }
1270
    }
1271
}
368✔
1272

1273
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
24✔
1274
{
1275
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
24✔
1276
    _intercept_incoming_messages_callback = callback;
24✔
1277
}
24✔
1278

1279
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
14✔
1280
{
1281
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
14✔
1282
    _intercept_outgoing_messages_callback = callback;
14✔
1283
}
14✔
1284

1285
bool MavsdkImpl::call_json_interception_callbacks(
4,798✔
1286
    const Mavsdk::MavlinkMessage& json_message,
1287
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
1288
        callback_list)
1289
{
1290
    bool keep_message = true;
4,798✔
1291

1292
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
4,798✔
1293
    for (const auto& subscription : callback_list) {
4,801✔
1294
        if (!subscription.second(json_message)) {
4✔
1295
            keep_message = false;
×
1296
        }
1297
    }
1298

1299
    return keep_message;
9,584✔
1300
}
4,788✔
1301

1302
Mavsdk::InterceptJsonHandle
1303
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1304
{
1305
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1306
    auto handle = _json_handle_factory.create();
1✔
1307
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1308
    return handle;
2✔
1309
}
1✔
1310

1311
void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1312
{
1313
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1314
    auto it = std::find_if(
1✔
1315
        _incoming_json_message_subscriptions.begin(),
1316
        _incoming_json_message_subscriptions.end(),
1317
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1318
    if (it != _incoming_json_message_subscriptions.end()) {
1✔
1319
        _incoming_json_message_subscriptions.erase(it);
1✔
1320
    }
1321
}
1✔
1322

1323
Mavsdk::InterceptJsonHandle
1324
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1325
{
1326
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1327
    auto handle = _json_handle_factory.create();
1✔
1328
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1329
    return handle;
2✔
1330
}
1✔
1331

1332
void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1333
{
1334
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1335
    auto it = std::find_if(
1✔
1336
        _outgoing_json_message_subscriptions.begin(),
1337
        _outgoing_json_message_subscriptions.end(),
1338
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1339
    if (it != _outgoing_json_message_subscriptions.end()) {
1✔
1340
        _outgoing_json_message_subscriptions.erase(it);
1✔
1341
    }
1342
}
1✔
1343

1344
RawConnection* MavsdkImpl::find_raw_connection()
3✔
1345
{
1346
    std::lock_guard lock(_mutex);
3✔
1347

1348
    for (auto& entry : _connections) {
3✔
1349
        auto* raw_conn = dynamic_cast<RawConnection*>(entry.connection.get());
2✔
1350
        if (raw_conn != nullptr) {
2✔
1351
            return raw_conn;
2✔
1352
        }
1353
    }
1354
    return nullptr;
1✔
1355
}
3✔
1356

1357
bool MavsdkImpl::send_raw_bytes(const char* bytes, size_t length)
1✔
1358
{
1359
    auto* raw_conn = find_raw_connection();
1✔
1360
    if (raw_conn == nullptr) {
1✔
NEW
1361
        LogErr()
×
NEW
1362
            << "No raw connection available. Please add one using add_any_connection(\"raw://\")";
×
NEW
1363
        return false;
×
1364
    }
1365

1366
    raw_conn->receive(bytes, length);
1✔
1367
    return true;
1✔
1368
}
1369

1370
Mavsdk::RawBytesHandle MavsdkImpl::subscribe_raw_bytes(const Mavsdk::RawBytesCallback& callback)
1✔
1371
{
1372
    if (find_raw_connection() == nullptr) {
1✔
NEW
1373
        LogWarn() << "No raw connection available. Subscription will only receive bytes after you "
×
NEW
1374
                     "add a connection using add_any_connection(\"raw://\")";
×
1375
    }
1376
    return _raw_bytes_subscriptions.subscribe(callback);
1✔
1377
}
1378

1379
void MavsdkImpl::unsubscribe_raw_bytes(Mavsdk::RawBytesHandle handle)
1✔
1380
{
1381
    _raw_bytes_subscriptions.unsubscribe(handle);
1✔
1382
}
1✔
1383

1384
bool MavsdkImpl::notify_raw_bytes_sent(const char* bytes, size_t length)
1✔
1385
{
1386
    if (_raw_bytes_subscriptions.empty()) {
1✔
NEW
1387
        return false;
×
1388
    }
1389

1390
    _raw_bytes_subscriptions.queue(
1✔
1391
        bytes, length, [this](const std::function<void()>& func) { call_user_callback(func); });
1✔
1392

1393
    return true;
1✔
1394
}
1395

1396
Mavsdk::ConnectionErrorHandle
1397
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1398
{
1399
    std::lock_guard lock(_mutex);
×
1400

1401
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1402

1403
    return handle;
×
1404
}
×
1405

1406
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1407
{
1408
    std::lock_guard lock(_mutex);
×
1409
    _connections_errors_subscriptions.unsubscribe(handle);
×
1410
}
×
1411

1412
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
2,416✔
1413
{
1414
    // Checks whether connection knows target system ID by extracting target system if set.
1415
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
2,416✔
1416

1417
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
2,412✔
1418
        return 0;
574✔
1419
    }
1420

1421
    // Don't look at the target system offset if it is outside the payload length.
1422
    // This can happen if the fields are trimmed.
1423
    if (meta->target_system_ofs >= message.len) {
1,838✔
1424
        return 0;
11✔
1425
    }
1426

1427
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
1,827✔
1428
}
1429

1430
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
42✔
1431
{
1432
    // Checks whether connection knows target system ID by extracting target system if set.
1433
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
42✔
1434

1435
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
42✔
1436
        return 0;
29✔
1437
    }
1438

1439
    // Don't look at the target component offset if it is outside the payload length.
1440
    // This can happen if the fields are trimmed.
1441
    if (meta->target_component_ofs >= message.len) {
13✔
1442
        return 0;
1✔
1443
    }
1444

1445
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
12✔
1446
}
1447

1448
Sender& MavsdkImpl::sender()
×
1449
{
1450
    std::lock_guard lock(_server_components_mutex);
×
1451
    return default_server_component_with_lock().sender();
×
1452
}
×
1453

1454
std::vector<Connection*> MavsdkImpl::get_connections() const
2,418✔
1455
{
1456
    std::lock_guard lock(_mutex);
2,418✔
1457
    std::vector<Connection*> connections;
2,415✔
1458
    for (const auto& connection_entry : _connections) {
4,790✔
1459
        connections.push_back(connection_entry.connection.get());
2,374✔
1460
    }
1461
    return connections;
2,419✔
1462
}
2,419✔
1463

1464
mav::MessageSet& MavsdkImpl::get_message_set() const
23✔
1465
{
1466
    // Note: This returns a reference to MessageSet without locking.
1467
    // Thread safety for MessageSet operations must be ensured by:
1468
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
1469
    // 2. libmav MessageSet should be internally thread-safe for read operations
1470
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
1471
    return *_message_set;
23✔
1472
}
1473

1474
bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
6✔
1475
{
1476
    std::lock_guard<std::mutex> lock(_message_set_mutex);
6✔
1477
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
6✔
1478
    return result == ::mav::MessageSetResult::Success;
12✔
1479
}
6✔
1480

1481
// Thread-safe MessageSet read operations
1482
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
×
1483
{
1484
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1485
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
×
1486
    if (message_def) {
×
1487
        return message_def.get().name();
×
1488
    }
1489
    return std::nullopt;
×
1490
}
×
1491

1492
std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
×
1493
{
1494
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1495
    return _message_set->idForMessage(name);
×
1496
}
×
1497

1498
std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
×
1499
{
1500
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1501
    return _message_set->create(message_name);
×
1502
}
×
1503

1504
// Thread-safe parsing for LibmavReceiver
1505
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
4,815✔
1506
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
1507
{
1508
    std::lock_guard<std::mutex> lock(_message_set_mutex);
4,815✔
1509
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
4,817✔
1510
}
4,821✔
1511

1512
mav::OptionalReference<const mav::MessageDefinition>
1513
MavsdkImpl::get_message_definition_safe(int message_id) const
4,749✔
1514
{
1515
    std::lock_guard<std::mutex> lock(_message_set_mutex);
4,749✔
1516
    return _message_set->getMessageDefinition(message_id);
4,754✔
1517
}
4,754✔
1518

1519
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc