• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 21724056693

05 Feb 2026 06:42PM UTC coverage: 49.043% (+0.08%) from 48.959%
21724056693

push

github

web-flow
core: fix MAVLink message sequence (#2762)

* core: fix MAVLink message sequence

The MAVLink headers use an inline function to get the mavlink channel,
so we end up having separate sequence numbers in different translation
units. By defining this function ourselves and linking it later, we make
sure to use the same sequence everywhere.

This should fix seq numbering that's invalid (per component).

We also add a system test to actually test this.

* system_tests: fixup thread sanitizer issues

* system_tests: use TCP for seq to avoid drops

51 of 56 new or added lines in 6 files covered. (91.07%)

6 existing lines in 5 files now uncovered.

18367 of 37451 relevant lines covered (49.04%)

672.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.1
/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "raw_connection.h"
13
#include "system.h"
14
#include "system_impl.h"
15
#include "serial_connection.h"
16
#include "version.h"
17
#include "server_component_impl.h"
18
#include "overloaded.h"
19
#include "mavlink_channels.h"
20
#include "callback_list.tpp"
21
#include "hostname_to_ip.h"
22
#include "embedded_mavlink_xml.h"
23
#include <mav/MessageSet.h>
24

25
namespace mavsdk {
26

27
template class CallbackList<>;
28

29
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
148✔
30
    timeout_handler(time),
148✔
31
    call_every_handler(time)
296✔
32
{
33
    LogInfo() << "MAVSDK version: " << mavsdk_version;
148✔
34

35
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
148✔
36
        if (std::string(env_p) == "1") {
×
37
            LogDebug() << "Callback debugging is on.";
×
38
            _callback_debugging = true;
×
39
            _callback_tracker = std::make_unique<CallbackTracker>();
×
40
        }
41
    }
42

43
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
148✔
44
        if (std::string(env_p) == "1") {
×
45
            LogDebug() << "Message debugging is on.";
×
46
            _message_logging_on = true;
×
47
        }
48
    }
49

50
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
148✔
51
        if (std::string(env_p) == "1") {
×
52
            LogDebug() << "System debugging is on.";
×
53
            _system_debugging = true;
×
54
        }
55
    }
56

57
    set_configuration(configuration);
148✔
58

59
    // Initialize MessageSet with embedded XML content in dependency order
60
    // This happens at startup before any connections are created, so no synchronization needed
61
    _message_set = std::make_unique<mav::MessageSet>();
148✔
62
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
148✔
63
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
148✔
64
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
148✔
65
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);
148✔
66

67
    // Initialize BufferParser for thread-safe parsing
68
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);
148✔
69

70
    // Start the user callback thread first, so it is ready for anything generated by
71
    // the work thread.
72

73
    _process_user_callbacks_thread =
296✔
74
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);
148✔
75

76
    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
148✔
77
}
148✔
78

79
MavsdkImpl::~MavsdkImpl()
148✔
80
{
81
    {
82
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
148✔
83
        call_every_handler.remove(_heartbeat_send_cookie);
148✔
84
    }
148✔
85

86
    _should_exit = true;
148✔
87

88
    // Stop work first because we don't want to trigger anything that would
89
    // potentially want to call into user code.
90

91
    if (_work_thread != nullptr) {
148✔
92
        _work_thread->join();
148✔
93
        delete _work_thread;
148✔
94
        _work_thread = nullptr;
148✔
95
    }
96

97
    if (_process_user_callbacks_thread != nullptr) {
148✔
98
        _user_callback_queue.stop();
148✔
99
        _process_user_callbacks_thread->join();
148✔
100
        delete _process_user_callbacks_thread;
148✔
101
        _process_user_callbacks_thread = nullptr;
148✔
102
    }
103

104
    std::lock_guard lock(_mutex);
148✔
105

106
    _systems.clear();
148✔
107
    _connections.clear();
148✔
108
}
148✔
109

110
std::string MavsdkImpl::version()
1✔
111
{
112
    static unsigned version_counter = 0;
113

114
    ++version_counter;
1✔
115

116
    switch (version_counter) {
1✔
117
        case 10:
×
118
            return "You were wondering about the name of this library?";
×
119
        case 11:
×
120
            return "Let's look at the history:";
×
121
        case 12:
×
122
            return "DroneLink";
×
123
        case 13:
×
124
            return "DroneCore";
×
125
        case 14:
×
126
            return "DronecodeSDK";
×
127
        case 15:
×
128
            return "MAVSDK";
×
129
        case 16:
×
130
            return "And that's it...";
×
131
        case 17:
×
132
            return "At least for now ¯\\_(ツ)_/¯.";
×
133
        default:
1✔
134
            return mavsdk_version;
1✔
135
    }
136
}
137

138
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
186✔
139
{
140
    std::vector<std::shared_ptr<System>> systems_result{};
186✔
141

142
    std::lock_guard lock(_mutex);
186✔
143
    for (auto& system : _systems) {
294✔
144
        // We ignore the 0 entry because it's just a null system.
145
        // It's only created because the older, deprecated API needs a
146
        // reference.
147
        if (system.first == 0) {
108✔
148
            continue;
×
149
        }
150
        systems_result.push_back(system.second);
108✔
151
    }
152

153
    return systems_result;
186✔
154
}
186✔
155

156
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
62✔
157
{
158
    {
159
        std::lock_guard lock(_mutex);
62✔
160
        for (auto system : _systems) {
63✔
161
            if (system.second->is_connected() && system.second->has_autopilot()) {
4✔
162
                return system.second;
3✔
163
            }
164
        }
4✔
165
    }
62✔
166

167
    if (timeout_s == 0.0) {
59✔
168
        // Don't wait at all.
169
        return {};
×
170
    }
171

172
    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
59✔
173
    auto fut = prom->get_future();
59✔
174

175
    auto flag = std::make_shared<std::once_flag>();
59✔
176
    auto handle = subscribe_on_new_system([this, prom, flag]() {
59✔
177
        // Check all systems, not just the first one
178
        auto all_systems = systems();
60✔
179
        for (auto& system : all_systems) {
62✔
180
            if (system->is_connected() && system->has_autopilot()) {
61✔
181
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
118✔
182
                break;
59✔
183
            }
184
        }
185
    });
60✔
186

187
    if (timeout_s > 0.0) {
59✔
188
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
59✔
189
            std::future_status::ready) {
190
            unsubscribe_on_new_system(handle);
59✔
191
            return fut.get();
59✔
192

193
        } else {
194
            unsubscribe_on_new_system(handle);
×
195
            return std::nullopt;
×
196
        }
197
    } else {
198
        fut.wait();
×
199
        unsubscribe_on_new_system(handle);
×
200
        return std::optional(fut.get());
×
201
    }
202
}
59✔
203

204
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
58✔
205
{
206
    std::lock_guard lock(_mutex);
58✔
207

208
    auto component_type = _configuration.get_component_type();
58✔
209
    switch (component_type) {
58✔
210
        case ComponentType::Autopilot:
58✔
211
        case ComponentType::GroundStation:
212
        case ComponentType::CompanionComputer:
213
        case ComponentType::Camera:
214
        case ComponentType::Gimbal:
215
        case ComponentType::RemoteId:
216
        case ComponentType::Custom:
217
            return server_component_by_type(component_type, instance);
58✔
218
        default:
×
219
            LogErr() << "Unknown component type";
×
220
            return {};
×
221
    }
222
}
58✔
223

224
std::shared_ptr<ServerComponent>
225
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
58✔
226
{
227
    const auto mav_type = mav_type_for_component_type(server_component_type);
58✔
228

229
    switch (server_component_type) {
58✔
230
        case ComponentType::Autopilot:
44✔
231
            if (instance == 0) {
44✔
232
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1, mav_type);
44✔
233
            } else {
234
                LogErr() << "Only autopilot instance 0 is valid";
×
235
                return {};
×
236
            }
237

238
        case ComponentType::GroundStation:
×
239
            if (instance == 0) {
×
240
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER, mav_type);
×
241
            } else {
242
                LogErr() << "Only one ground station supported at this time";
×
243
                return {};
×
244
            }
245

246
        case ComponentType::CompanionComputer:
1✔
247
            if (instance == 0) {
1✔
248
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER, mav_type);
1✔
249
            } else if (instance == 1) {
×
250
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2, mav_type);
×
251
            } else if (instance == 2) {
×
252
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3, mav_type);
×
253
            } else if (instance == 3) {
×
254
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4, mav_type);
×
255
            } else {
256
                LogErr() << "Only companion computer 0..3 are supported";
×
257
                return {};
×
258
            }
259

260
        case ComponentType::Camera:
13✔
261
            if (instance == 0) {
13✔
262
                return server_component_by_id(MAV_COMP_ID_CAMERA, mav_type);
13✔
263
            } else if (instance == 1) {
×
264
                return server_component_by_id(MAV_COMP_ID_CAMERA2, mav_type);
×
265
            } else if (instance == 2) {
×
266
                return server_component_by_id(MAV_COMP_ID_CAMERA3, mav_type);
×
267
            } else if (instance == 3) {
×
268
                return server_component_by_id(MAV_COMP_ID_CAMERA4, mav_type);
×
269
            } else if (instance == 4) {
×
270
                return server_component_by_id(MAV_COMP_ID_CAMERA5, mav_type);
×
271
            } else if (instance == 5) {
×
272
                return server_component_by_id(MAV_COMP_ID_CAMERA6, mav_type);
×
273
            } else {
274
                LogErr() << "Only camera 0..5 are supported";
×
275
                return {};
×
276
            }
277

278
        case ComponentType::Gimbal:
×
279
            if (instance == 0) {
×
280
                return server_component_by_id(MAV_COMP_ID_GIMBAL, mav_type);
×
281
            } else {
282
                LogErr() << "Only gimbal instance 0 is valid";
×
283
                return {};
×
284
            }
285

286
        case ComponentType::RemoteId:
×
287
            if (instance == 0) {
×
288
                return server_component_by_id(MAV_COMP_ID_ODID_TXRX_1, mav_type);
×
289
            } else {
290
                LogErr() << "Only remote ID instance 0 is valid";
×
291
                return {};
×
292
            }
293

294
        case ComponentType::Custom:
×
295
            LogErr() << "Custom component type requires explicit component ID";
×
296
            return {};
×
297

298
        default:
×
299
            LogErr() << "Unknown server component type";
×
300
            return {};
×
301
    }
302
}
303

304
std::shared_ptr<ServerComponent>
305
MavsdkImpl::server_component_by_id(uint8_t component_id, uint8_t mav_type)
58✔
306
{
307
    if (component_id == 0) {
58✔
308
        LogErr() << "Server component with component ID 0 not allowed";
×
309
        return nullptr;
×
310
    }
311

312
    std::lock_guard lock(_server_components_mutex);
58✔
313

314
    return server_component_by_id_with_lock(component_id, mav_type);
58✔
315
}
58✔
316

317
std::shared_ptr<ServerComponent>
318
MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id, uint8_t mav_type)
279✔
319
{
320
    for (auto& it : _server_components) {
280✔
321
        if (it.first == component_id) {
131✔
322
            if (it.second != nullptr) {
130✔
323
                return it.second;
130✔
324
            } else {
325
                it.second = std::make_shared<ServerComponent>(*this, component_id, mav_type);
×
326
            }
327
        }
328
    }
329

330
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
149✔
331
        component_id, std::make_shared<ServerComponent>(*this, component_id, mav_type)));
298✔
332

333
    return _server_components.back().second;
149✔
334
}
335

336
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
50✔
337
{
338
    // Forward_message Function implementing Mavlink routing rules.
339
    // See https://mavlink.io/en/guide/routing.html
340

341
    bool forward_heartbeats_enabled = true;
50✔
342
    const uint8_t target_system_id = get_target_system_id(message);
50✔
343
    const uint8_t target_component_id = get_target_component_id(message);
50✔
344

345
    // If it's a message only for us, we keep it, otherwise, we forward it.
346
    const bool targeted_only_at_us =
347
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
50✔
348

349
    // We don't forward heartbeats unless it's specifically enabled.
350
    const bool heartbeat_check_ok =
50✔
351
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
50✔
352

353
    if (!targeted_only_at_us && heartbeat_check_ok) {
50✔
354
        unsigned successful_emissions = 0;
43✔
355
        for (auto& entry : _connections) {
126✔
356
            // Check whether the connection is not the one from which we received the message.
357
            // And also check if the connection was set to forward messages.
358
            if (entry.connection.get() == connection ||
123✔
359
                !entry.connection->should_forward_messages()) {
40✔
360
                continue;
43✔
361
            }
362
            auto result = (*entry.connection).send_message(message);
40✔
363
            if (result.first) {
40✔
364
                successful_emissions++;
39✔
365
            } else {
366
                _connections_errors_subscriptions.queue(
2✔
367
                    Mavsdk::ConnectionError{result.second, entry.handle},
1✔
368
                    [this](const auto& func) { call_user_callback(func); });
×
369
            }
370
        }
40✔
371
        if (successful_emissions == 0) {
43✔
372
            if (_system_debugging) {
4✔
373
                LogErr() << "Message forwarding failed";
×
374
            }
375
        }
376
    }
377
}
50✔
378

379
void MavsdkImpl::receive_message(
9,344✔
380
    MavlinkReceiver::ParseResult result, mavlink_message_t& message, Connection* connection)
381
{
382
    if (result == MavlinkReceiver::ParseResult::MessageParsed) {
9,344✔
383
        // Valid message: queue for full processing (which includes forwarding)
384
        {
385
            std::lock_guard lock(_received_messages_mutex);
9,342✔
386
            _received_messages.emplace(ReceivedMessage{std::move(message), connection});
9,344✔
387
        }
9,343✔
388
        _received_messages_cv.notify_one();
9,345✔
389

390
    } else if (result == MavlinkReceiver::ParseResult::BadCrc) {
2✔
391
        // Unknown message: forward only, don't process locally
392
        forward_message(message, connection);
4✔
393
    }
394
}
9,349✔
395

396
void MavsdkImpl::receive_libmav_message(
9,342✔
397
    const Mavsdk::MavlinkMessage& message, Connection* connection)
398
{
399
    {
400
        std::lock_guard lock(_received_libmav_messages_mutex);
9,342✔
401
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
9,345✔
402
    }
9,350✔
403
    _received_libmav_messages_cv.notify_one();
9,348✔
404
}
9,349✔
405

406
void MavsdkImpl::process_messages()
41,721✔
407
{
408
    std::lock_guard lock(_received_messages_mutex);
41,721✔
409
    while (!_received_messages.empty()) {
50,632✔
410
        auto message_copied = _received_messages.front();
9,326✔
411
        process_message(message_copied.message, message_copied.connection_ptr);
9,325✔
412
        _received_messages.pop();
9,327✔
413
    }
414
}
40,671✔
415

416
void MavsdkImpl::process_libmav_messages()
40,877✔
417
{
418
    std::lock_guard lock(_received_libmav_messages_mutex);
40,877✔
419
    while (!_received_libmav_messages.empty()) {
50,045✔
420
        auto message_copied = _received_libmav_messages.front();
9,329✔
421
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
9,326✔
422
        _received_libmav_messages.pop();
9,323✔
423
    }
9,326✔
424
}
41,036✔
425

426
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
9,322✔
427
{
428
    // Assumes _received_messages_mutex
429

430
    if (_message_logging_on) {
9,322✔
431
        LogDebug() << "Processing message " << message.msgid << " from "
×
432
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
433
    }
434

435
    if (_should_exit) {
9,322✔
436
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
437
        return;
×
438
    }
439

440
    {
441
        std::lock_guard lock(_mutex);
9,324✔
442

443
        // This is a low level interface where incoming messages can be tampered
444
        // with or even dropped. This happens BEFORE forwarding, so modifications
445
        // and drops affect both local processing AND forwarded messages.
446
        {
447
            bool keep = true;
9,326✔
448
            {
449
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
9,326✔
450
                if (_intercept_incoming_messages_callback != nullptr) {
9,326✔
451
                    keep = _intercept_incoming_messages_callback(message);
352✔
452
                }
453
            }
9,322✔
454

455
            if (!keep) {
9,329✔
456
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
39✔
457
                return;
39✔
458
            }
459
        }
460

461
        if (_should_exit) {
9,290✔
462
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
463
            return;
×
464
        }
465

466
        /* Forward message (after intercept) if option is enabled and multiple interfaces
467
         * are connected.
468
         * Performs message forwarding checks for every message if message forwarding is
469
         * enabled on at least one connection, and in case of a single forwarding connection,
470
         * we check that it is not the one which received the current message.
471
         *
472
         * Conditions:
473
         * 1. At least 2 connections.
474
         * 2. At least 1 forwarding connection.
475
         * 3. At least 2 forwarding connections or current connection is not forwarding.
476
         */
477

478
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
9,334✔
479
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
46✔
480
             !connection->should_forward_messages())) {
481
            if (_message_logging_on) {
46✔
482
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
483
                           << static_cast<int>(message.sysid) << "/"
×
484
                           << static_cast<int>(message.compid);
×
485
            }
486
            forward_message(message, connection);
46✔
487
        }
488

489
        // Don't ever create a system with sysid 0.
490
        if (message.sysid == 0) {
9,287✔
491
            if (_message_logging_on) {
×
492
                LogDebug() << "Ignoring message with sysid == 0";
×
493
            }
494
            return;
×
495
        }
496

497
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
498
        // is also implementing a ground station and not if it is used in another
499
        // configuration, e.g. on a companion.
500
        //
501
        // This is a workaround because PX4 started forwarding messages between
502
        // mavlink instances which leads to existing implementations (including
503
        // examples and integration tests) to connect to QGroundControl by accident
504
        // instead of PX4 because the check `has_autopilot()` is not used.
505

506
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
9,287✔
507
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
9,288✔
508
            if (_message_logging_on) {
×
509
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
510
            }
511
            return;
×
512
        }
513

514
        bool found_system = false;
9,288✔
515
        for (auto& system : _systems) {
9,324✔
516
            if (system.first == message.sysid) {
9,179✔
517
                system.second->system_impl()->add_new_component(message.compid);
9,143✔
518
                found_system = true;
9,143✔
519
                break;
9,143✔
520
            }
521
        }
522

523
        if (!found_system) {
9,289✔
524
            if (_system_debugging) {
146✔
525
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
526
                          << (int)message.compid;
×
527
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
528
                          << (int)message.len;
×
529
                std::string bytes = "";
×
530
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
531
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
532
                }
533
                LogWarn() << "Bytes: " << bytes;
×
534
            }
×
535
            make_system_with_component(message.sysid, message.compid);
146✔
536

537
            // We now better talk back.
538
            start_sending_heartbeats();
146✔
539
        }
540

541
        if (_should_exit) {
9,289✔
542
            // Don't try to call at() if systems have already been destroyed
543
            // in destructor.
544
            return;
×
545
        }
546
    }
9,324✔
547

548
    mavlink_message_handler.process_message(message);
9,288✔
549

550
    for (auto& system : _systems) {
9,324✔
551
        if (system.first == message.sysid) {
9,325✔
552
            system.second->system_impl()->process_mavlink_message(message);
9,289✔
553
            break;
9,289✔
554
        }
555
    }
556
}
557

558
void MavsdkImpl::process_libmav_message(
9,329✔
559
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
560
{
561
    // Assumes _received_libmav_messages_mutex
562

563
    if (_message_logging_on) {
9,329✔
564
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
×
565
                   << static_cast<int>(message.system_id) << "/"
×
566
                   << static_cast<int>(message.component_id);
×
567
    }
568

569
    // JSON message interception for incoming messages
570
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
9,329✔
571
        // Message was dropped by interception callback
572
        if (_message_logging_on) {
×
573
            LogDebug() << "Incoming JSON message " << message.message_name
×
574
                       << " dropped by interception";
×
575
        }
576
        return;
×
577
    }
578

579
    if (_message_logging_on) {
9,326✔
580
        LogDebug() << "Processing libmav message " << message.message_name << " from "
×
581
                   << static_cast<int>(message.system_id) << "/"
×
582
                   << static_cast<int>(message.component_id);
×
583
    }
584

585
    if (_should_exit) {
9,326✔
586
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
587
        return;
×
588
    }
589

590
    {
591
        std::lock_guard lock(_mutex);
9,323✔
592

593
        // Don't ever create a system with sysid 0.
594
        if (message.system_id == 0) {
9,324✔
595
            if (_message_logging_on) {
×
596
                LogDebug() << "Ignoring libmav message with sysid == 0";
×
597
            }
598
            return;
×
599
        }
600

601
        // Filter out QGroundControl messages similar to regular mavlink processing
602
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
9,324✔
603
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
9,326✔
604
            if (_message_logging_on) {
×
605
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
×
606
            }
607
            return;
×
608
        }
609

610
        bool found_system = false;
9,326✔
611
        for (auto& system : _systems) {
9,360✔
612
            if (system.first == message.system_id) {
9,356✔
613
                system.second->system_impl()->add_new_component(message.component_id);
9,322✔
614
                found_system = true;
9,323✔
615
                break;
9,323✔
616
            }
617
        }
618

619
        if (!found_system) {
9,329✔
620
            if (_system_debugging) {
6✔
621
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
×
622
                          << "/" << (int)message.component_id;
×
623
            }
624
            make_system_with_component(message.system_id, message.component_id);
6✔
625

626
            // We now better talk back.
627
            start_sending_heartbeats();
6✔
628
        }
629

630
        if (_should_exit) {
9,329✔
631
            // Don't try to call at() if systems have already been destroyed
632
            // in destructor.
633
            return;
×
634
        }
635
    }
9,328✔
636

637
    // Distribute libmav message to systems for libmav-specific handling
638
    bool found_system = false;
9,326✔
639
    for (auto& system : _systems) {
18,716✔
640
        if (system.first == message.system_id) {
9,393✔
641
            if (_message_logging_on) {
9,325✔
642
                LogDebug() << "Distributing libmav message " << message.message_name
×
643
                           << " to SystemImpl for system " << system.first;
×
644
            }
645
            system.second->system_impl()->process_libmav_message(message);
9,325✔
646
            found_system = true;
9,322✔
647
            // Don't break - distribute to all matching system instances
648
        }
649
    }
650

651
    if (!found_system) {
9,319✔
652
        LogWarn() << "No system found for libmav message " << message.message_name
×
653
                  << " from system " << message.system_id;
×
654
    }
655
}
656

657
bool MavsdkImpl::send_message(mavlink_message_t& message)
9,483✔
658
{
659
    // Create a copy of the message to avoid reference issues
660
    mavlink_message_t message_copy = message;
9,483✔
661

662
    {
663
        std::lock_guard lock(_messages_to_send_mutex);
9,483✔
664
        _messages_to_send.push(std::move(message_copy));
9,481✔
665
    }
9,486✔
666

667
    // For heartbeat messages, we want to process them immediately to speed up system discovery
668
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
9,482✔
669
        // Trigger message processing in the work thread
670
        // This is a hint to process messages sooner, but doesn't block
671
        std::this_thread::yield();
510✔
672
    }
673

674
    return true;
9,486✔
675
}
676

677
void MavsdkImpl::deliver_messages()
50,661✔
678
{
679
    // Process messages one at a time to avoid holding the mutex while delivering
680
    while (true) {
681
        mavlink_message_t message;
682
        {
683
            std::lock_guard lock(_messages_to_send_mutex);
50,661✔
684
            if (_messages_to_send.empty()) {
50,506✔
685
                break;
41,015✔
686
            }
687
            message = _messages_to_send.front();
9,480✔
688
            _messages_to_send.pop();
9,479✔
689
        }
50,497✔
690
        deliver_message(message);
9,482✔
691
    }
9,485✔
692
}
40,683✔
693

694
void MavsdkImpl::deliver_message(mavlink_message_t& message)
9,487✔
695
{
696
    if (_message_logging_on) {
9,487✔
697
        LogDebug() << "Sending message " << message.msgid << " from "
×
698
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
699
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
700
                   << static_cast<int>(get_target_component_id(message));
×
701
    }
702

703
    // This is a low level interface where outgoing messages can be tampered
704
    // with or even dropped.
705
    bool keep = true;
9,487✔
706
    {
707
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
9,487✔
708
        if (_intercept_outgoing_messages_callback != nullptr) {
9,484✔
709
            keep = _intercept_outgoing_messages_callback(message);
320✔
710
        }
711
    }
9,477✔
712

713
    if (!keep) {
9,485✔
714
        // We fake that everything was sent as instructed because
715
        // a potential loss would happen later, and we would not be informed
716
        // about it.
717
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
88✔
718
        return;
178✔
719
    }
720

721
    // JSON message interception for outgoing messages
722
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
723
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
724
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);
9,397✔
725

726
    size_t bytes_consumed = 0;
9,397✔
727
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);
9,397✔
728

729
    if (libmav_msg_opt) {
9,393✔
730
        // Create Mavsdk::MavlinkMessage directly for JSON interception
731
        Mavsdk::MavlinkMessage json_message;
9,388✔
732
        json_message.message_name = libmav_msg_opt.value().name();
9,390✔
733
        json_message.system_id = message.sysid;
9,392✔
734
        json_message.component_id = message.compid;
9,392✔
735

736
        // Extract target_system and target_component if present
737
        uint8_t target_system_id = 0;
9,392✔
738
        uint8_t target_component_id = 0;
9,392✔
739
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
9,392✔
740
            mav::MessageResult::Success) {
741
            json_message.target_system_id = target_system_id;
8,620✔
742
        } else {
743
            json_message.target_system_id = 0;
771✔
744
        }
745
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
9,391✔
746
            mav::MessageResult::Success) {
747
            json_message.target_component_id = target_component_id;
8,619✔
748
        } else {
749
            json_message.target_component_id = 0;
773✔
750
        }
751

752
        // Generate JSON using LibmavReceiver's public method
753
        auto connections = get_connections();
9,392✔
754
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
9,394✔
755
            json_message.fields_json =
756
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
27,908✔
757
                    libmav_msg_opt.value());
18,613✔
758
        } else {
759
            // Fallback: create minimal JSON if no receiver available
760
            json_message.fields_json =
761
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
180✔
762
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
180✔
763
        }
764

765
        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
9,397✔
766
            // Message was dropped by JSON interception callback
UNCOV
767
            if (_message_logging_on) {
×
768
                LogDebug() << "Outgoing JSON message " << json_message.message_name
×
769
                           << " dropped by interception";
×
770
            }
771
            return;
×
772
        }
773
    }
9,393✔
774

775
    std::lock_guard lock(_mutex);
9,401✔
776

777
    if (_connections.empty()) {
9,391✔
778
        // We obviously can't send any messages without a connection added, so
779
        // we silently ignore this.
780
        return;
90✔
781
    }
782

783
    uint8_t successful_emissions = 0;
9,295✔
784
    for (auto& _connection : _connections) {
18,620✔
785
        const uint8_t target_system_id = get_target_system_id(message);
9,324✔
786

787
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
9,324✔
788
            continue;
5✔
789
        }
790
        const auto result = (*_connection.connection).send_message(message);
9,315✔
791
        if (result.first) {
9,322✔
792
            successful_emissions++;
9,316✔
793
        } else {
794
            _connections_errors_subscriptions.queue(
12✔
795
                Mavsdk::ConnectionError{result.second, _connection.handle},
6✔
796
                [this](const auto& func) { call_user_callback(func); });
×
797
        }
798
    }
9,322✔
799

800
    if (successful_emissions == 0) {
9,308✔
801
        LogErr() << "Sending message failed";
4✔
802
    }
803
}
9,397✔
804

805
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
154✔
806
    const std::string& connection_url, ForwardingOption forwarding_option)
807
{
808
    CliArg cli_arg;
154✔
809
    if (!cli_arg.parse(connection_url)) {
154✔
810
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
811
    }
812

813
    return std::visit(
154✔
814
        overloaded{
308✔
815
            [](std::monostate) {
×
816
                // Should not happen anyway.
817
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
818
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
819
            },
820
            [this, forwarding_option](const CliArg::Udp& udp) {
142✔
821
                return add_udp_connection(udp, forwarding_option);
142✔
822
            },
823
            [this, forwarding_option](const CliArg::Tcp& tcp) {
10✔
824
                return add_tcp_connection(tcp, forwarding_option);
10✔
825
            },
826
            [this, forwarding_option](const CliArg::Serial& serial) {
×
827
                return add_serial_connection(
×
828
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
829
            },
830
            [this, forwarding_option](const CliArg::Raw&) {
2✔
831
                return add_raw_connection(forwarding_option);
2✔
832
            }},
833
        cli_arg.protocol);
154✔
834
}
154✔
835

836
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
837
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
142✔
838
{
839
    auto new_conn = std::make_unique<UdpConnection>(
840
        [this](
9,258✔
841
            MavlinkReceiver::ParseResult result,
842
            mavlink_message_t& message,
843
            Connection* connection) { receive_message(result, message, connection); },
9,258✔
844
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
9,258✔
845
            receive_libmav_message(message, connection);
9,258✔
846
        },
9,260✔
847
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
848
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
284✔
849
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
142✔
850
        forwarding_option);
142✔
851

852
    if (!new_conn) {
142✔
853
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
854
    }
855

856
    ConnectionResult ret = new_conn->start();
142✔
857

858
    if (ret != ConnectionResult::Success) {
142✔
859
        return {ret, Mavsdk::ConnectionHandle{}};
×
860
    }
861

862
    if (udp.mode == CliArg::Udp::Mode::Out) {
142✔
863
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
864
        // one for the IP, and one for a hostname.
865
        auto remote_ip = resolve_hostname_to_ip(udp.host);
71✔
866

867
        if (!remote_ip) {
71✔
868
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
869
        }
870

871
        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
71✔
872
        std::lock_guard lock(_mutex);
71✔
873

874
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
875
        auto new_configuration = get_configuration();
71✔
876
        new_configuration.set_always_send_heartbeats(true);
71✔
877
        set_configuration(new_configuration);
71✔
878
    }
71✔
879

880
    auto handle = add_connection(std::move(new_conn));
142✔
881

882
    return {ConnectionResult::Success, handle};
142✔
883
}
142✔
884

885
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
886
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
10✔
887
{
888
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
10✔
889
        auto new_conn = std::make_unique<TcpClientConnection>(
890
            [this](
46✔
891
                MavlinkReceiver::ParseResult result,
892
                mavlink_message_t& message,
893
                Connection* connection) { receive_message(result, message, connection); },
46✔
894
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
46✔
895
                receive_libmav_message(message, connection);
46✔
896
            },
46✔
897
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
898
            tcp.host,
5✔
899
            tcp.port,
5✔
900
            forwarding_option);
5✔
901
        if (!new_conn) {
5✔
902
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
903
        }
904
        ConnectionResult ret = new_conn->start();
5✔
905
        if (ret == ConnectionResult::Success) {
5✔
906
            return {ret, add_connection(std::move(new_conn))};
5✔
907
        } else {
908
            return {ret, Mavsdk::ConnectionHandle{}};
×
909
        }
910
    } else {
5✔
911
        auto new_conn = std::make_unique<TcpServerConnection>(
912
            [this](
42✔
913
                MavlinkReceiver::ParseResult result,
914
                mavlink_message_t& message,
915
                Connection* connection) { receive_message(result, message, connection); },
42✔
916
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
42✔
917
                receive_libmav_message(message, connection);
42✔
918
            },
42✔
919
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
920
            tcp.host,
5✔
921
            tcp.port,
5✔
922
            forwarding_option);
5✔
923
        if (!new_conn) {
5✔
924
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
925
        }
926
        ConnectionResult ret = new_conn->start();
5✔
927
        if (ret == ConnectionResult::Success) {
5✔
928
            return {ret, add_connection(std::move(new_conn))};
5✔
929
        } else {
930
            return {ret, Mavsdk::ConnectionHandle{}};
×
931
        }
932
    }
5✔
933
}
934

935
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
936
    const std::string& dev_path,
937
    int baudrate,
938
    bool flow_control,
939
    ForwardingOption forwarding_option)
940
{
941
    auto new_conn = std::make_unique<SerialConnection>(
942
        [this](
×
943
            MavlinkReceiver::ParseResult result,
944
            mavlink_message_t& message,
945
            Connection* connection) { receive_message(result, message, connection); },
×
946
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
×
947
            receive_libmav_message(message, connection);
×
948
        },
×
949
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
950
        dev_path,
951
        baudrate,
952
        flow_control,
953
        forwarding_option);
×
954
    if (!new_conn) {
×
955
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
956
    }
957
    ConnectionResult ret = new_conn->start();
×
958
    if (ret == ConnectionResult::Success) {
×
959
        auto handle = add_connection(std::move(new_conn));
×
960

961
        auto new_configuration = get_configuration();
×
962

963
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
964
        // to initiate the MAVLink connection by sending heartbeats.
965
        // Therefore, we override the default here and enable sending heartbeats.
966
        new_configuration.set_always_send_heartbeats(true);
×
967
        set_configuration(new_configuration);
×
968

969
        return {ret, handle};
×
970

971
    } else {
972
        return {ret, Mavsdk::ConnectionHandle{}};
×
973
    }
974
}
×
975

976
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
977
MavsdkImpl::add_raw_connection(ForwardingOption forwarding_option)
2✔
978
{
979
    // Check if a raw connection already exists
980
    if (find_raw_connection() != nullptr) {
2✔
981
        LogErr() << "Raw connection already exists. Only one raw connection is allowed.";
×
982
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
983
    }
984

985
    auto new_conn = std::make_unique<RawConnection>(
986
        [this](
1✔
987
            MavlinkReceiver::ParseResult result,
988
            mavlink_message_t& message,
989
            Connection* connection) { receive_message(result, message, connection); },
1✔
990
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
1✔
991
            receive_libmav_message(message, connection);
1✔
992
        },
1✔
993
        *this,
994
        forwarding_option);
2✔
995

996
    if (!new_conn) {
2✔
997
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
998
    }
999

1000
    ConnectionResult ret = new_conn->start();
2✔
1001
    if (ret != ConnectionResult::Success) {
2✔
1002
        return {ret, Mavsdk::ConnectionHandle{}};
×
1003
    }
1004

1005
    auto handle = add_connection(std::move(new_conn));
2✔
1006

1007
    // Enable heartbeats for raw connection
1008
    auto new_configuration = get_configuration();
2✔
1009
    new_configuration.set_always_send_heartbeats(true);
2✔
1010
    set_configuration(new_configuration);
2✔
1011

1012
    return {ConnectionResult::Success, handle};
2✔
1013
}
2✔
1014

1015
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
154✔
1016
{
1017
    std::lock_guard lock(_mutex);
154✔
1018
    auto handle = _connections_handle_factory.create();
154✔
1019
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
154✔
1020

1021
    return handle;
308✔
1022
}
154✔
1023

1024
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
12✔
1025
{
1026
    std::lock_guard lock(_mutex);
12✔
1027

1028
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
12✔
1029
        return (entry.handle == handle);
12✔
1030
    }));
1031
}
12✔
1032

1033
Mavsdk::Configuration MavsdkImpl::get_configuration() const
73✔
1034
{
1035
    std::lock_guard configuration_lock(_mutex);
73✔
1036
    return _configuration;
146✔
1037
}
73✔
1038

1039
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
221✔
1040
{
1041
    std::lock_guard server_components_lock(_server_components_mutex);
221✔
1042
    // We just point the default to the newly created component. This means
1043
    // that the previous default component will be deleted if it is not
1044
    // used/referenced anywhere.
1045
    _default_server_component = server_component_by_id_with_lock(
442✔
1046
        new_configuration.get_component_id(), new_configuration.get_mav_type());
442✔
1047

1048
    if (new_configuration.get_always_send_heartbeats() &&
368✔
1049
        !_configuration.get_always_send_heartbeats()) {
147✔
1050
        start_sending_heartbeats();
78✔
1051
    } else if (
143✔
1052
        !new_configuration.get_always_send_heartbeats() &&
217✔
1053
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
217✔
1054
        stop_sending_heartbeats();
×
1055
    }
1056

1057
    _configuration = new_configuration;
221✔
1058
    // We cache these values as atomic to avoid having to lock any mutex for them.
1059
    _our_system_id = new_configuration.get_system_id();
221✔
1060
    _our_component_id = new_configuration.get_component_id();
221✔
1061
}
221✔
1062

1063
uint8_t MavsdkImpl::get_own_system_id() const
12,855✔
1064
{
1065
    return _our_system_id;
12,855✔
1066
}
1067

1068
uint8_t MavsdkImpl::get_own_component_id() const
1,474✔
1069
{
1070
    return _our_component_id;
1,474✔
1071
}
1072

1073
uint8_t MavsdkImpl::get_mav_type() const
×
1074
{
1075
    return _configuration.get_mav_type();
×
1076
}
1077

1078
Autopilot MavsdkImpl::get_autopilot() const
×
1079
{
1080
    return _configuration.get_autopilot();
×
1081
}
1082

1083
uint8_t MavsdkImpl::get_mav_autopilot() const
273✔
1084
{
1085
    switch (_configuration.get_autopilot()) {
273✔
1086
        case Autopilot::Px4:
×
1087
            return MAV_AUTOPILOT_PX4;
×
1088
        case Autopilot::ArduPilot:
×
1089
            return MAV_AUTOPILOT_ARDUPILOTMEGA;
×
1090
        case Autopilot::Unknown:
273✔
1091
        default:
1092
            return MAV_AUTOPILOT_GENERIC;
273✔
1093
    }
1094
}
1095

1096
CompatibilityMode MavsdkImpl::get_compatibility_mode() const
39✔
1097
{
1098
    return _configuration.get_compatibility_mode();
39✔
1099
}
1100

1101
Autopilot MavsdkImpl::effective_autopilot(Autopilot detected) const
1,554✔
1102
{
1103
    switch (_configuration.get_compatibility_mode()) {
1,554✔
1104
        case CompatibilityMode::Auto:
1,554✔
1105
            return detected;
1,554✔
1106
        case CompatibilityMode::Pure:
×
1107
            return Autopilot::Unknown; // Unknown = no quirks
×
1108
        case CompatibilityMode::Px4:
×
1109
            return Autopilot::Px4;
×
1110
        case CompatibilityMode::ArduPilot:
×
1111
            return Autopilot::ArduPilot;
×
1112
        default:
×
1113
            return detected;
×
1114
    }
1115
}
1116

1117
uint8_t MavsdkImpl::mav_type_for_component_type(ComponentType component_type)
355✔
1118
{
1119
    switch (component_type) {
355✔
1120
        case ComponentType::Autopilot:
107✔
1121
            return MAV_TYPE_GENERIC;
107✔
1122
        case ComponentType::GroundStation:
222✔
1123
            return MAV_TYPE_GCS;
222✔
1124
        case ComponentType::CompanionComputer:
4✔
1125
            return MAV_TYPE_ONBOARD_CONTROLLER;
4✔
1126
        case ComponentType::Camera:
22✔
1127
            return MAV_TYPE_CAMERA;
22✔
1128
        case ComponentType::Gimbal:
×
1129
            return MAV_TYPE_GIMBAL;
×
1130
        case ComponentType::RemoteId:
×
1131
            return MAV_TYPE_ODID;
×
1132
        case ComponentType::Custom:
×
1133
            return MAV_TYPE_GENERIC;
×
1134
        default:
×
1135
            return MAV_TYPE_GENERIC;
×
1136
    }
1137
}
1138

1139
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
152✔
1140
{
1141
    // Needs _systems_lock
1142

1143
    if (_should_exit) {
152✔
1144
        // When the system got destroyed in the destructor, we have to give up.
1145
        return;
×
1146
    }
1147

1148
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
152✔
1149
        LogDebug() << "Initializing connection to remote system...";
×
1150
    }
1151

1152
    // Make a system with its first component
1153
    auto new_system = std::make_shared<System>(*this);
152✔
1154
    new_system->init(system_id, comp_id);
152✔
1155

1156
    _systems.emplace_back(system_id, new_system);
152✔
1157
}
152✔
1158

1159
void MavsdkImpl::notify_on_discover()
157✔
1160
{
1161
    // Queue the callbacks without holding the mutex to avoid deadlocks
1162
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
227✔
1163
}
157✔
1164

1165
void MavsdkImpl::notify_on_timeout()
6✔
1166
{
1167
    // Queue the callbacks without holding the mutex to avoid deadlocks
1168
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
6✔
1169
}
6✔
1170

1171
Mavsdk::NewSystemHandle
1172
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
69✔
1173
{
1174
    std::lock_guard lock(_mutex);
69✔
1175

1176
    const auto handle = _new_system_callbacks.subscribe(callback);
69✔
1177

1178
    if (is_any_system_connected()) {
69✔
1179
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1180
    }
1181

1182
    return handle;
138✔
1183
}
69✔
1184

1185
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
68✔
1186
{
1187
    _new_system_callbacks.unsubscribe(handle);
68✔
1188
}
68✔
1189

1190
bool MavsdkImpl::is_any_system_connected() const
69✔
1191
{
1192
    std::vector<std::shared_ptr<System>> connected_systems = systems();
69✔
1193
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
69✔
1194
        return system->is_connected();
×
1195
    });
69✔
1196
}
69✔
1197

1198
void MavsdkImpl::work_thread()
148✔
1199
{
1200
    while (!_should_exit) {
41,236✔
1201
        // Process incoming messages
1202
        process_messages();
40,222✔
1203

1204
        // Process incoming libmav messages
1205
        process_libmav_messages();
40,195✔
1206

1207
        // Run timers
1208
        timeout_handler.run_once();
40,576✔
1209
        call_every_handler.run_once();
41,164✔
1210

1211
        // Do component work
1212
        {
1213
            std::lock_guard lock(_server_components_mutex);
41,437✔
1214
            for (auto& it : _server_components) {
82,590✔
1215
                if (it.second != nullptr) {
41,272✔
1216
                    it.second->_impl->do_work();
41,216✔
1217
                }
1218
            }
1219
        }
40,805✔
1220

1221
        // Deliver outgoing messages
1222
        deliver_messages();
40,669✔
1223

1224
        // If no messages to send, check if there are messages to receive
1225
        std::unique_lock lock_received(_received_messages_mutex);
40,820✔
1226
        if (_received_messages.empty()) {
40,900✔
1227
            // No messages to process, wait for a signal or timeout
1228
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
40,603✔
1229
                return !_received_messages.empty() || _should_exit;
81,346✔
1230
            });
1231
        }
1232
    }
41,716✔
1233
}
450✔
1234

1235
void MavsdkImpl::call_user_callback_located(
1,262✔
1236
    const std::string& filename, const int linenumber, const std::function<void()>& func)
1237
{
1238
    // Don't enqueue callbacks if we're shutting down
1239
    if (_should_exit) {
1,262✔
1240
        return;
×
1241
    }
1242

1243
    auto callback_size = _user_callback_queue.size();
1,262✔
1244

1245
    if (_callback_tracker) {
1,262✔
1246
        _callback_tracker->record_queued(filename, linenumber);
×
1247
        _callback_tracker->maybe_print_stats(callback_size);
×
1248
    }
1249

1250
    if (callback_size >= 100) {
1,262✔
1251
        return;
×
1252

1253
    } else if (callback_size == 99) {
1,262✔
1254
        LogErr()
×
1255
            << "User callback queue overflown\n"
1256
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1257
        return;
×
1258

1259
    } else if (callback_size >= 10) {
1,262✔
1260
        LogWarn()
×
1261
            << "User callback queue slow (queue size: " << callback_size
×
1262
            << ").\n"
1263
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1264
    }
1265

1266
    // We only need to keep track of filename and linenumber if we're actually debugging this.
1267
    UserCallback user_callback =
1268
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,524✔
1269

1270
    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
1,262✔
1271
}
1,262✔
1272

1273
void MavsdkImpl::process_user_callbacks_thread()
148✔
1274
{
1275
    while (!_should_exit) {
1,557✔
1276
        UserCallback callback;
1,409✔
1277
        {
1278
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
1,409✔
1279
            auto ptr = guard.wait_and_pop_front();
1,409✔
1280
            if (!ptr) {
1,409✔
1281
                continue;
147✔
1282
            }
1283
            // We need to get a copy instead of just a shared_ptr because the queue might
1284
            // be invalidated when the lock is released.
1285
            callback = *ptr;
1,262✔
1286
        }
1,556✔
1287

1288
        // Check if we're in the process of shutting down before executing the callback
1289
        if (_should_exit) {
1,262✔
1290
            continue;
×
1291
        }
1292

1293
        const double timeout_s = 1.0;
1,262✔
1294
        auto cookie = timeout_handler.add(
1,262✔
1295
            [&]() {
×
1296
                if (_callback_debugging) {
×
1297
                    LogWarn() << "Callback called from " << callback.filename << ":"
×
1298
                              << callback.linenumber << " took more than " << timeout_s
×
1299
                              << " second to run.";
×
1300
                    fflush(stdout);
×
1301
                    fflush(stderr);
×
1302
                    abort();
×
1303
                } else {
1304
                    LogWarn()
×
1305
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
1306
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1307
                }
1308
            },
×
1309
            timeout_s);
1310
        auto callback_start = std::chrono::steady_clock::now();
1,262✔
1311
        callback.func();
1,262✔
1312
        auto callback_end = std::chrono::steady_clock::now();
1,262✔
1313
        timeout_handler.remove(cookie);
1,262✔
1314

1315
        if (_callback_tracker) {
1,262✔
1316
            auto callback_duration_us =
1317
                std::chrono::duration_cast<std::chrono::microseconds>(callback_end - callback_start)
×
1318
                    .count();
×
1319
            _callback_tracker->record_executed(
×
1320
                callback.filename, callback.linenumber, callback_duration_us);
1321
        }
1322
    }
1,409✔
1323
}
148✔
1324

1325
void MavsdkImpl::start_sending_heartbeats()
387✔
1326
{
1327
    // Check if we're in the process of shutting down
1328
    if (_should_exit) {
387✔
1329
        return;
×
1330
    }
1331

1332
    // Before sending out first heartbeats we need to make sure we have a
1333
    // default server component.
1334
    default_server_component_impl();
387✔
1335

1336
    {
1337
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
387✔
1338
        call_every_handler.remove(_heartbeat_send_cookie);
387✔
1339
        _heartbeat_send_cookie =
387✔
1340
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
895✔
1341
    }
387✔
1342
}
1343

1344
void MavsdkImpl::stop_sending_heartbeats()
6✔
1345
{
1346
    if (!_configuration.get_always_send_heartbeats()) {
6✔
1347
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
1✔
1348
        call_every_handler.remove(_heartbeat_send_cookie);
1✔
1349
    }
1✔
1350
}
6✔
1351

1352
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,436✔
1353
{
1354
    std::lock_guard lock(_server_components_mutex);
1,436✔
1355
    return default_server_component_with_lock();
1,434✔
1356
}
1,434✔
1357

1358
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,436✔
1359
{
1360
    if (_default_server_component == nullptr) {
1,436✔
1361
        _default_server_component =
1362
            server_component_by_id_with_lock(_our_component_id, get_mav_type());
×
1363
    }
1364
    return *_default_server_component->_impl;
1,432✔
1365
}
1366

1367
void MavsdkImpl::send_heartbeats()
492✔
1368
{
1369
    std::lock_guard lock(_server_components_mutex);
492✔
1370

1371
    for (auto& it : _server_components) {
1,003✔
1372
        if (it.second != nullptr) {
506✔
1373
            it.second->_impl->send_heartbeat();
504✔
1374
        }
1375
    }
1376
}
503✔
1377

1378
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
28✔
1379
{
1380
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
28✔
1381
    _intercept_incoming_messages_callback = callback;
28✔
1382
}
28✔
1383

1384
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
16✔
1385
{
1386
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
16✔
1387
    _intercept_outgoing_messages_callback = callback;
16✔
1388
}
16✔
1389

1390
bool MavsdkImpl::call_json_interception_callbacks(
18,728✔
1391
    const Mavsdk::MavlinkMessage& json_message,
1392
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
1393
        callback_list)
1394
{
1395
    bool keep_message = true;
18,728✔
1396

1397
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
18,728✔
1398
    for (const auto& subscription : callback_list) {
18,725✔
1399
        if (!subscription.second(json_message)) {
4✔
1400
            keep_message = false;
×
1401
        }
1402
    }
1403

1404
    return keep_message;
37,418✔
1405
}
18,697✔
1406

1407
Mavsdk::InterceptJsonHandle
1408
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1409
{
1410
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1411
    auto handle = _json_handle_factory.create();
1✔
1412
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1413
    return handle;
2✔
1414
}
1✔
1415

1416
void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1417
{
1418
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1419
    auto it = std::find_if(
1✔
1420
        _incoming_json_message_subscriptions.begin(),
1421
        _incoming_json_message_subscriptions.end(),
1422
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1423
    if (it != _incoming_json_message_subscriptions.end()) {
1✔
1424
        _incoming_json_message_subscriptions.erase(it);
1✔
1425
    }
1426
}
1✔
1427

1428
Mavsdk::InterceptJsonHandle
1429
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1430
{
1431
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1432
    auto handle = _json_handle_factory.create();
1✔
1433
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1434
    return handle;
2✔
1435
}
1✔
1436

1437
void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1438
{
1439
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1440
    auto it = std::find_if(
1✔
1441
        _outgoing_json_message_subscriptions.begin(),
1442
        _outgoing_json_message_subscriptions.end(),
1443
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1444
    if (it != _outgoing_json_message_subscriptions.end()) {
1✔
1445
        _outgoing_json_message_subscriptions.erase(it);
1✔
1446
    }
1447
}
1✔
1448

1449
RawConnection* MavsdkImpl::find_raw_connection()
4✔
1450
{
1451
    std::lock_guard lock(_mutex);
4✔
1452

1453
    for (auto& entry : _connections) {
4✔
1454
        auto* raw_conn = dynamic_cast<RawConnection*>(entry.connection.get());
2✔
1455
        if (raw_conn != nullptr) {
2✔
1456
            return raw_conn;
2✔
1457
        }
1458
    }
1459
    return nullptr;
2✔
1460
}
4✔
1461

1462
void MavsdkImpl::pass_received_raw_bytes(const char* bytes, size_t length)
1✔
1463
{
1464
    auto* raw_conn = find_raw_connection();
1✔
1465
    if (raw_conn == nullptr) {
1✔
1466
        LogErr()
×
1467
            << "No raw connection available. Please add one using add_any_connection(\"raw://\")";
×
1468
        return;
×
1469
    }
1470

1471
    raw_conn->receive(bytes, length);
1✔
1472
}
1473

1474
Mavsdk::RawBytesHandle
1475
MavsdkImpl::subscribe_raw_bytes_to_be_sent(const Mavsdk::RawBytesCallback& callback)
1✔
1476
{
1477
    if (find_raw_connection() == nullptr) {
1✔
1478
        LogWarn() << "No raw connection available. Subscription will only receive bytes after you "
×
1479
                     "add a connection using add_any_connection(\"raw://\")";
×
1480
    }
1481
    return _raw_bytes_subscriptions.subscribe(callback);
1✔
1482
}
1483

1484
void MavsdkImpl::unsubscribe_raw_bytes_to_be_sent(Mavsdk::RawBytesHandle handle)
1✔
1485
{
1486
    _raw_bytes_subscriptions.unsubscribe(handle);
1✔
1487
}
1✔
1488

1489
bool MavsdkImpl::notify_raw_bytes_sent(const char* bytes, size_t length)
3✔
1490
{
1491
    if (_raw_bytes_subscriptions.empty()) {
3✔
1492
        return false;
2✔
1493
    }
1494

1495
    _raw_bytes_subscriptions(bytes, length);
1✔
1496

1497
    return true;
1✔
1498
}
1499

1500
Mavsdk::ConnectionErrorHandle
1501
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1502
{
1503
    std::lock_guard lock(_mutex);
×
1504

1505
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1506

1507
    return handle;
×
1508
}
×
1509

1510
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1511
{
1512
    std::lock_guard lock(_mutex);
×
1513
    _connections_errors_subscriptions.unsubscribe(handle);
×
1514
}
×
1515

1516
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
9,373✔
1517
{
1518
    // Checks whether connection knows target system ID by extracting target system if set.
1519
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
9,373✔
1520

1521
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
9,374✔
1522
        return 0;
736✔
1523
    }
1524

1525
    // Don't look at the target system offset if it is outside the payload length.
1526
    // This can happen if the fields are trimmed.
1527
    if (meta->target_system_ofs >= message.len) {
8,638✔
1528
        return 0;
22✔
1529
    }
1530

1531
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
8,616✔
1532
}
1533

1534
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
50✔
1535
{
1536
    // Checks whether connection knows target system ID by extracting target system if set.
1537
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
50✔
1538

1539
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
50✔
1540
        return 0;
35✔
1541
    }
1542

1543
    // Don't look at the target component offset if it is outside the payload length.
1544
    // This can happen if the fields are trimmed.
1545
    if (meta->target_component_ofs >= message.len) {
15✔
1546
        return 0;
1✔
1547
    }
1548

1549
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
14✔
1550
}
1551

1552
Sender& MavsdkImpl::sender()
×
1553
{
1554
    std::lock_guard lock(_server_components_mutex);
×
1555
    return default_server_component_with_lock().sender();
×
1556
}
×
1557

1558
std::vector<Connection*> MavsdkImpl::get_connections() const
9,389✔
1559
{
1560
    std::lock_guard lock(_mutex);
9,389✔
1561
    std::vector<Connection*> connections;
9,394✔
1562
    for (const auto& connection_entry : _connections) {
18,710✔
1563
        connections.push_back(connection_entry.connection.get());
9,327✔
1564
    }
1565
    return connections;
9,390✔
1566
}
9,390✔
1567

1568
mav::MessageSet& MavsdkImpl::get_message_set() const
23✔
1569
{
1570
    // Note: This returns a reference to MessageSet without locking.
1571
    // Thread safety for MessageSet operations must be ensured by:
1572
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
1573
    // 2. libmav MessageSet should be internally thread-safe for read operations
1574
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
1575
    return *_message_set;
23✔
1576
}
1577

1578
bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
6✔
1579
{
1580
    std::lock_guard<std::mutex> lock(_message_set_mutex);
6✔
1581
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
6✔
1582
    return result == ::mav::MessageSetResult::Success;
12✔
1583
}
6✔
1584

1585
// Thread-safe MessageSet read operations
1586
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
×
1587
{
1588
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1589
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
×
1590
    if (message_def) {
×
1591
        return message_def.get().name();
×
1592
    }
1593
    return std::nullopt;
×
1594
}
×
1595

1596
std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
×
1597
{
1598
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1599
    return _message_set->idForMessage(name);
×
1600
}
×
1601

1602
std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
×
1603
{
1604
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1605
    return _message_set->create(message_name);
×
1606
}
×
1607

1608
// Thread-safe parsing for LibmavReceiver
1609
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
18,727✔
1610
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
1611
{
1612
    std::lock_guard<std::mutex> lock(_message_set_mutex);
18,727✔
1613
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
18,744✔
1614
}
18,744✔
1615

1616
mav::OptionalReference<const mav::MessageDefinition>
1617
MavsdkImpl::get_message_definition_safe(int message_id) const
18,654✔
1618
{
1619
    std::lock_guard<std::mutex> lock(_message_set_mutex);
18,654✔
1620
    return _message_set->getMessageDefinition(message_id);
18,647✔
1621
}
18,656✔
1622

1623
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc