• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 22127126125

18 Feb 2026 04:54AM UTC coverage: 48.964% (-0.05%) from 49.009%
22127126125

Pull #2769

github

web-flow
Merge e391237ec into 72946bd67
Pull Request #2769: core: add set_callback_executor API for custom callback dispatch

10 of 42 new or added lines in 3 files covered. (23.81%)

5 existing lines in 3 files now uncovered.

18353 of 37483 relevant lines covered (48.96%)

671.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.19
/cpp/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "raw_connection.h"
13
#include "system.h"
14
#include "system_impl.h"
15
#include "serial_connection.h"
16
#include "version.h"
17
#include "server_component_impl.h"
18
#include "overloaded.h"
19
#include "mavlink_channels.h"
20
#include "callback_list.tpp"
21
#include "hostname_to_ip.h"
22
#include "embedded_mavlink_xml.h"
23
#include <mav/MessageSet.h>
24

25
namespace mavsdk {
26

27
template class CallbackList<>;
28

29
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
148✔
30
    timeout_handler(time),
148✔
31
    call_every_handler(time)
296✔
32
{
33
    LogInfo() << "MAVSDK version: " << mavsdk_version;
148✔
34

35
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
148✔
36
        if (std::string(env_p) == "1") {
×
37
            LogDebug() << "Callback debugging is on.";
×
38
            _callback_debugging = true;
×
39
            _callback_tracker = std::make_unique<CallbackTracker>();
×
40
        }
41
    }
42

43
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
148✔
44
        if (std::string(env_p) == "1") {
×
45
            LogDebug() << "Message debugging is on.";
×
46
            _message_logging_on = true;
×
47
        }
48
    }
49

50
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
148✔
51
        if (std::string(env_p) == "1") {
×
52
            LogDebug() << "System debugging is on.";
×
53
            _system_debugging = true;
×
54
        }
55
    }
56

57
    set_configuration(configuration);
148✔
58

59
    // Initialize MessageSet with embedded XML content in dependency order
60
    // This happens at startup before any connections are created, so no synchronization needed
61
    _message_set = std::make_unique<mav::MessageSet>();
148✔
62
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
148✔
63
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
148✔
64
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
148✔
65
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);
148✔
66

67
    // Initialize BufferParser for thread-safe parsing
68
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);
148✔
69

70
    // Start the user callback thread first, so it is ready for anything generated by
71
    // the work thread.
72

73
    _process_user_callbacks_thread =
74
        std::make_unique<std::thread>(&MavsdkImpl::process_user_callbacks_thread, this);
148✔
75

76
    _work_thread = std::make_unique<std::thread>(&MavsdkImpl::work_thread, this);
148✔
77
}
148✔
78

79
MavsdkImpl::~MavsdkImpl()
148✔
80
{
81
    {
82
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
148✔
83
        call_every_handler.remove(_heartbeat_send_cookie);
148✔
84
    }
148✔
85

86
    _should_exit = true;
148✔
87

88
    // Stop work first because we don't want to trigger anything that would
89
    // potentially want to call into user code.
90

91
    if (_work_thread) {
148✔
92
        _work_thread->join();
148✔
93
        _work_thread.reset();
148✔
94
    }
95

96
    if (_process_user_callbacks_thread) {
148✔
97
        _user_callback_queue.stop();
148✔
98
        _process_user_callbacks_thread->join();
148✔
99
        _process_user_callbacks_thread.reset();
148✔
100
    }
101

102
    std::lock_guard lock(_mutex);
148✔
103

104
    _systems.clear();
148✔
105
    _connections.clear();
148✔
106
}
148✔
107

108
std::string MavsdkImpl::version()
1✔
109
{
110
    static unsigned version_counter = 0;
111

112
    ++version_counter;
1✔
113

114
    switch (version_counter) {
1✔
115
        case 10:
×
116
            return "You were wondering about the name of this library?";
×
117
        case 11:
×
118
            return "Let's look at the history:";
×
119
        case 12:
×
120
            return "DroneLink";
×
121
        case 13:
×
122
            return "DroneCore";
×
123
        case 14:
×
124
            return "DronecodeSDK";
×
125
        case 15:
×
126
            return "MAVSDK";
×
127
        case 16:
×
128
            return "And that's it...";
×
129
        case 17:
×
130
            return "At least for now ¯\\_(ツ)_/¯.";
×
131
        default:
1✔
132
            return mavsdk_version;
1✔
133
    }
134
}
135

136
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
187✔
137
{
138
    std::vector<std::shared_ptr<System>> systems_result{};
187✔
139

140
    std::lock_guard lock(_mutex);
187✔
141
    for (auto& system : _systems) {
295✔
142
        // We ignore the 0 entry because it's just a null system.
143
        // It's only created because the older, deprecated API needs a
144
        // reference.
145
        if (system.first == 0) {
108✔
146
            continue;
×
147
        }
148
        systems_result.push_back(system.second);
108✔
149
    }
150

151
    return systems_result;
187✔
152
}
187✔
153

154
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
62✔
155
{
156
    {
157
        std::lock_guard lock(_mutex);
62✔
158
        for (auto system : _systems) {
63✔
159
            if (system.second->is_connected() && system.second->has_autopilot()) {
4✔
160
                return system.second;
3✔
161
            }
162
        }
4✔
163
    }
62✔
164

165
    if (timeout_s == 0.0) {
59✔
166
        // Don't wait at all.
167
        return {};
×
168
    }
169

170
    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
59✔
171
    auto fut = prom->get_future();
59✔
172

173
    auto flag = std::make_shared<std::once_flag>();
59✔
174
    auto handle = subscribe_on_new_system([this, prom, flag]() {
59✔
175
        // Check all systems, not just the first one
176
        auto all_systems = systems();
60✔
177
        for (auto& system : all_systems) {
62✔
178
            if (system->is_connected() && system->has_autopilot()) {
61✔
179
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
118✔
180
                break;
59✔
181
            }
182
        }
183
    });
60✔
184

185
    if (timeout_s > 0.0) {
59✔
186
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
59✔
187
            std::future_status::ready) {
188
            unsubscribe_on_new_system(handle);
59✔
189
            return fut.get();
59✔
190

191
        } else {
192
            unsubscribe_on_new_system(handle);
×
193
            return std::nullopt;
×
194
        }
195
    } else {
196
        fut.wait();
×
197
        unsubscribe_on_new_system(handle);
×
198
        return std::optional(fut.get());
×
199
    }
200
}
59✔
201

202
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
58✔
203
{
204
    std::lock_guard lock(_mutex);
58✔
205

206
    auto component_type = _configuration.get_component_type();
58✔
207
    switch (component_type) {
58✔
208
        case ComponentType::Autopilot:
58✔
209
        case ComponentType::GroundStation:
210
        case ComponentType::CompanionComputer:
211
        case ComponentType::Camera:
212
        case ComponentType::Gimbal:
213
        case ComponentType::RemoteId:
214
        case ComponentType::Custom:
215
            return server_component_by_type(component_type, instance);
58✔
216
        default:
×
217
            LogErr() << "Unknown component type";
×
218
            return {};
×
219
    }
220
}
58✔
221

222
std::shared_ptr<ServerComponent>
223
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
58✔
224
{
225
    const auto mav_type = mav_type_for_component_type(server_component_type);
58✔
226

227
    switch (server_component_type) {
58✔
228
        case ComponentType::Autopilot:
44✔
229
            if (instance == 0) {
44✔
230
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1, mav_type);
44✔
231
            } else {
232
                LogErr() << "Only autopilot instance 0 is valid";
×
233
                return {};
×
234
            }
235

236
        case ComponentType::GroundStation:
×
237
            if (instance == 0) {
×
238
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER, mav_type);
×
239
            } else {
240
                LogErr() << "Only one ground station supported at this time";
×
241
                return {};
×
242
            }
243

244
        case ComponentType::CompanionComputer:
1✔
245
            if (instance == 0) {
1✔
246
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER, mav_type);
1✔
247
            } else if (instance == 1) {
×
248
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2, mav_type);
×
249
            } else if (instance == 2) {
×
250
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3, mav_type);
×
251
            } else if (instance == 3) {
×
252
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4, mav_type);
×
253
            } else {
254
                LogErr() << "Only companion computer 0..3 are supported";
×
255
                return {};
×
256
            }
257

258
        case ComponentType::Camera:
13✔
259
            if (instance == 0) {
13✔
260
                return server_component_by_id(MAV_COMP_ID_CAMERA, mav_type);
13✔
261
            } else if (instance == 1) {
×
262
                return server_component_by_id(MAV_COMP_ID_CAMERA2, mav_type);
×
263
            } else if (instance == 2) {
×
264
                return server_component_by_id(MAV_COMP_ID_CAMERA3, mav_type);
×
265
            } else if (instance == 3) {
×
266
                return server_component_by_id(MAV_COMP_ID_CAMERA4, mav_type);
×
267
            } else if (instance == 4) {
×
268
                return server_component_by_id(MAV_COMP_ID_CAMERA5, mav_type);
×
269
            } else if (instance == 5) {
×
270
                return server_component_by_id(MAV_COMP_ID_CAMERA6, mav_type);
×
271
            } else {
272
                LogErr() << "Only camera 0..5 are supported";
×
273
                return {};
×
274
            }
275

276
        case ComponentType::Gimbal:
×
277
            if (instance == 0) {
×
278
                return server_component_by_id(MAV_COMP_ID_GIMBAL, mav_type);
×
279
            } else {
280
                LogErr() << "Only gimbal instance 0 is valid";
×
281
                return {};
×
282
            }
283

284
        case ComponentType::RemoteId:
×
285
            if (instance == 0) {
×
286
                return server_component_by_id(MAV_COMP_ID_ODID_TXRX_1, mav_type);
×
287
            } else {
288
                LogErr() << "Only remote ID instance 0 is valid";
×
289
                return {};
×
290
            }
291

292
        case ComponentType::Custom:
×
293
            LogErr() << "Custom component type requires explicit component ID";
×
294
            return {};
×
295

296
        default:
×
297
            LogErr() << "Unknown server component type";
×
298
            return {};
×
299
    }
300
}
301

302
std::shared_ptr<ServerComponent>
303
MavsdkImpl::server_component_by_id(uint8_t component_id, uint8_t mav_type)
58✔
304
{
305
    if (component_id == 0) {
58✔
306
        LogErr() << "Server component with component ID 0 not allowed";
×
307
        return nullptr;
×
308
    }
309

310
    std::lock_guard lock(_server_components_mutex);
58✔
311

312
    return server_component_by_id_with_lock(component_id, mav_type);
58✔
313
}
58✔
314

315
std::shared_ptr<ServerComponent>
316
MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id, uint8_t mav_type)
279✔
317
{
318
    for (auto& it : _server_components) {
280✔
319
        if (it.first == component_id) {
131✔
320
            if (it.second != nullptr) {
130✔
321
                return it.second;
130✔
322
            } else {
323
                it.second = std::make_shared<ServerComponent>(*this, component_id, mav_type);
×
324
            }
325
        }
326
    }
327

328
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
149✔
329
        component_id, std::make_shared<ServerComponent>(*this, component_id, mav_type)));
298✔
330

331
    return _server_components.back().second;
149✔
332
}
333

334
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
50✔
335
{
336
    // Forward_message Function implementing Mavlink routing rules.
337
    // See https://mavlink.io/en/guide/routing.html
338

339
    bool forward_heartbeats_enabled = true;
50✔
340
    const uint8_t target_system_id = get_target_system_id(message);
50✔
341
    const uint8_t target_component_id = get_target_component_id(message);
50✔
342

343
    // If it's a message only for us, we keep it, otherwise, we forward it.
344
    const bool targeted_only_at_us =
345
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
50✔
346

347
    // We don't forward heartbeats unless it's specifically enabled.
348
    const bool heartbeat_check_ok =
50✔
349
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
50✔
350

351
    if (!targeted_only_at_us && heartbeat_check_ok) {
50✔
352
        unsigned successful_emissions = 0;
41✔
353
        for (auto& entry : _connections) {
120✔
354
            // Check whether the connection is not the one from which we received the message.
355
            // And also check if the connection was set to forward messages.
356
            if (entry.connection.get() == connection ||
117✔
357
                !entry.connection->should_forward_messages()) {
38✔
358
                continue;
41✔
359
            }
360
            auto result = (*entry.connection).send_message(message);
38✔
361
            if (result.first) {
38✔
362
                successful_emissions++;
36✔
363
            } else {
364
                _connections_errors_subscriptions.queue(
4✔
365
                    Mavsdk::ConnectionError{result.second, entry.handle},
2✔
366
                    [this](const auto& func) { call_user_callback(func); });
×
367
            }
368
        }
38✔
369
        if (successful_emissions == 0) {
41✔
370
            if (_system_debugging) {
5✔
371
                LogErr() << "Message forwarding failed";
×
372
            }
373
        }
374
    }
375
}
50✔
376

377
void MavsdkImpl::receive_message(
9,342✔
378
    MavlinkReceiver::ParseResult result, mavlink_message_t& message, Connection* connection)
379
{
380
    if (result == MavlinkReceiver::ParseResult::MessageParsed) {
9,342✔
381
        // Valid message: queue for full processing (which includes forwarding)
382
        {
383
            std::lock_guard lock(_received_messages_mutex);
9,339✔
384
            _received_messages.emplace(ReceivedMessage{std::move(message), connection});
9,339✔
385
        }
9,338✔
386
        _received_messages_cv.notify_one();
9,337✔
387

388
    } else if (result == MavlinkReceiver::ParseResult::BadCrc) {
3✔
389
        // Unknown message: forward only, don't process locally
390
        forward_message(message, connection);
4✔
391
    }
392
}
9,345✔
393

394
void MavsdkImpl::receive_libmav_message(
9,343✔
395
    const Mavsdk::MavlinkMessage& message, Connection* connection)
396
{
397
    {
398
        std::lock_guard lock(_received_libmav_messages_mutex);
9,343✔
399
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
9,343✔
400
    }
9,343✔
401
    _received_libmav_messages_cv.notify_one();
9,343✔
402
}
9,345✔
403

404
void MavsdkImpl::process_messages()
41,274✔
405
{
406
    std::lock_guard lock(_received_messages_mutex);
41,274✔
407
    while (!_received_messages.empty()) {
50,269✔
408
        auto message_copied = _received_messages.front();
9,320✔
409
        process_message(message_copied.message, message_copied.connection_ptr);
9,318✔
410
        _received_messages.pop();
9,323✔
411
    }
412
}
40,271✔
413

414
void MavsdkImpl::process_libmav_messages()
41,224✔
415
{
416
    std::lock_guard lock(_received_libmav_messages_mutex);
41,224✔
417
    while (!_received_libmav_messages.empty()) {
49,743✔
418
        auto message_copied = _received_libmav_messages.front();
9,320✔
419
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
9,320✔
420
        _received_libmav_messages.pop();
9,322✔
421
    }
9,322✔
422
}
40,768✔
423

424
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
9,318✔
425
{
426
    // Assumes _received_messages_mutex
427

428
    if (_message_logging_on) {
9,318✔
429
        LogDebug() << "Processing message " << message.msgid << " from "
×
430
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
431
    }
432

433
    if (_should_exit) {
9,318✔
434
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
435
        return;
×
436
    }
437

438
    {
439
        std::lock_guard lock(_mutex);
9,321✔
440

441
        // This is a low level interface where incoming messages can be tampered
442
        // with or even dropped. This happens BEFORE forwarding, so modifications
443
        // and drops affect both local processing AND forwarded messages.
444
        {
445
            bool keep = true;
9,322✔
446
            {
447
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
9,322✔
448
                if (_intercept_incoming_messages_callback != nullptr) {
9,321✔
449
                    keep = _intercept_incoming_messages_callback(message);
351✔
450
                }
451
            }
9,321✔
452

453
            if (!keep) {
9,322✔
454
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
38✔
455
                return;
38✔
456
            }
457
        }
458

459
        if (_should_exit) {
9,284✔
460
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
461
            return;
×
462
        }
463

464
        /* Forward message (after intercept) if option is enabled and multiple interfaces
465
         * are connected.
466
         * Performs message forwarding checks for every message if message forwarding is
467
         * enabled on at least one connection, and in case of a single forwarding connection,
468
         * we check that it is not the one which received the current message.
469
         *
470
         * Conditions:
471
         * 1. At least 2 connections.
472
         * 2. At least 1 forwarding connection.
473
         * 3. At least 2 forwarding connections or current connection is not forwarding.
474
         */
475

476
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
9,329✔
477
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
46✔
478
             !connection->should_forward_messages())) {
479
            if (_message_logging_on) {
46✔
480
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
481
                           << static_cast<int>(message.sysid) << "/"
×
482
                           << static_cast<int>(message.compid);
×
483
            }
484
            forward_message(message, connection);
46✔
485
        }
486

487
        // Don't ever create a system with sysid 0.
488
        if (message.sysid == 0) {
9,279✔
489
            if (_message_logging_on) {
×
490
                LogDebug() << "Ignoring message with sysid == 0";
×
491
            }
492
            return;
×
493
        }
494

495
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
496
        // is also implementing a ground station and not if it is used in another
497
        // configuration, e.g. on a companion.
498
        //
499
        // This is a workaround because PX4 started forwarding messages between
500
        // mavlink instances which leads to existing implementations (including
501
        // examples and integration tests) to connect to QGroundControl by accident
502
        // instead of PX4 because the check `has_autopilot()` is not used.
503

504
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
9,279✔
505
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
9,282✔
506
            if (_message_logging_on) {
×
507
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
508
            }
509
            return;
×
510
        }
511

512
        bool found_system = false;
9,282✔
513
        for (auto& system : _systems) {
9,316✔
514
            if (system.first == message.sysid) {
9,174✔
515
                system.second->system_impl()->add_new_component(message.compid);
9,140✔
516
                found_system = true;
9,137✔
517
                break;
9,137✔
518
            }
519
        }
520

521
        if (!found_system) {
9,282✔
522
            if (_system_debugging) {
145✔
523
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
524
                          << (int)message.compid;
×
525
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
526
                          << (int)message.len;
×
527
                std::string bytes = "";
×
528
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
529
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
530
                }
531
                LogWarn() << "Bytes: " << bytes;
×
532
            }
×
533
            make_system_with_component(message.sysid, message.compid);
145✔
534

535
            // We now better talk back.
536
            start_sending_heartbeats();
145✔
537
        }
538

539
        if (_should_exit) {
9,282✔
540
            // Don't try to call at() if systems have already been destroyed
541
            // in destructor.
542
            return;
×
543
        }
544
    }
9,315✔
545

546
    mavlink_message_handler.process_message(message);
9,280✔
547

548
    for (auto& system : _systems) {
9,318✔
549
        if (system.first == message.sysid) {
9,319✔
550
            system.second->system_impl()->process_mavlink_message(message);
9,284✔
551
            break;
9,284✔
552
        }
553
    }
554
}
555

556
void MavsdkImpl::process_libmav_message(
9,323✔
557
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
558
{
559
    // Assumes _received_libmav_messages_mutex
560

561
    if (_message_logging_on) {
9,323✔
562
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
×
563
                   << static_cast<int>(message.system_id) << "/"
×
564
                   << static_cast<int>(message.component_id);
×
565
    }
566

567
    // JSON message interception for incoming messages
568
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
9,323✔
569
        // Message was dropped by interception callback
570
        if (_message_logging_on) {
×
571
            LogDebug() << "Incoming JSON message " << message.message_name
×
572
                       << " dropped by interception";
×
573
        }
574
        return;
×
575
    }
576

577
    if (_message_logging_on) {
9,319✔
578
        LogDebug() << "Processing libmav message " << message.message_name << " from "
×
579
                   << static_cast<int>(message.system_id) << "/"
×
580
                   << static_cast<int>(message.component_id);
×
581
    }
582

583
    if (_should_exit) {
9,319✔
584
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
585
        return;
×
586
    }
587

588
    {
589
        std::lock_guard lock(_mutex);
9,321✔
590

591
        // Don't ever create a system with sysid 0.
592
        if (message.system_id == 0) {
9,320✔
593
            if (_message_logging_on) {
×
594
                LogDebug() << "Ignoring libmav message with sysid == 0";
×
595
            }
596
            return;
×
597
        }
598

599
        // Filter out QGroundControl messages similar to regular mavlink processing
600
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
9,320✔
601
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
9,316✔
602
            if (_message_logging_on) {
×
603
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
×
604
            }
605
            return;
×
606
        }
607

608
        bool found_system = false;
9,316✔
609
        for (auto& system : _systems) {
9,349✔
610
            if (system.first == message.system_id) {
9,348✔
611
                system.second->system_impl()->add_new_component(message.component_id);
9,315✔
612
                found_system = true;
9,314✔
613
                break;
9,314✔
614
            }
615
        }
616

617
        if (!found_system) {
9,322✔
618
            if (_system_debugging) {
7✔
619
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
×
620
                          << "/" << (int)message.component_id;
×
621
            }
622
            make_system_with_component(message.system_id, message.component_id);
7✔
623

624
            // We now better talk back.
625
            start_sending_heartbeats();
7✔
626
        }
627

628
        if (_should_exit) {
9,322✔
629
            // Don't try to call at() if systems have already been destroyed
630
            // in destructor.
631
            return;
×
632
        }
633
    }
9,316✔
634

635
    // Distribute libmav message to systems for libmav-specific handling
636
    bool found_system = false;
9,323✔
637
    for (auto& system : _systems) {
18,710✔
638
        if (system.first == message.system_id) {
9,384✔
639
            if (_message_logging_on) {
9,318✔
640
                LogDebug() << "Distributing libmav message " << message.message_name
×
641
                           << " to SystemImpl for system " << system.first;
×
642
            }
643
            system.second->system_impl()->process_libmav_message(message);
9,318✔
644
            found_system = true;
9,321✔
645
            // Don't break - distribute to all matching system instances
646
        }
647
    }
648

649
    if (!found_system) {
9,318✔
650
        LogWarn() << "No system found for libmav message " << message.message_name
×
651
                  << " from system " << message.system_id;
×
652
    }
653
}
654

655
bool MavsdkImpl::send_message(mavlink_message_t& message)
9,480✔
656
{
657
    // Create a copy of the message to avoid reference issues
658
    mavlink_message_t message_copy = message;
9,480✔
659

660
    {
661
        std::lock_guard lock(_messages_to_send_mutex);
9,480✔
662
        _messages_to_send.push(std::move(message_copy));
9,476✔
663
    }
9,479✔
664

665
    // For heartbeat messages, we want to process them immediately to speed up system discovery
666
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
9,479✔
667
        // Trigger message processing in the work thread
668
        // This is a hint to process messages sooner, but doesn't block
669
        std::this_thread::yield();
500✔
670
    }
671

672
    return true;
9,477✔
673
}
674

675
void MavsdkImpl::deliver_messages()
50,692✔
676
{
677
    // Process messages one at a time to avoid holding the mutex while delivering
678
    while (true) {
679
        mavlink_message_t message;
680
        {
681
            std::lock_guard lock(_messages_to_send_mutex);
50,692✔
682
            if (_messages_to_send.empty()) {
49,998✔
683
                break;
40,889✔
684
            }
685
            message = _messages_to_send.front();
9,478✔
686
            _messages_to_send.pop();
9,477✔
687
        }
50,367✔
688
        deliver_message(message);
9,478✔
689
    }
9,481✔
690
}
40,340✔
691

692
void MavsdkImpl::deliver_message(mavlink_message_t& message)
9,479✔
693
{
694
    if (_message_logging_on) {
9,479✔
695
        LogDebug() << "Sending message " << message.msgid << " from "
×
696
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
697
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
698
                   << static_cast<int>(get_target_component_id(message));
×
699
    }
700

701
    // This is a low level interface where outgoing messages can be tampered
702
    // with or even dropped.
703
    bool keep = true;
9,479✔
704
    {
705
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
9,479✔
706
        if (_intercept_outgoing_messages_callback != nullptr) {
9,476✔
707
            keep = _intercept_outgoing_messages_callback(message);
319✔
708
        }
709
    }
9,469✔
710

711
    if (!keep) {
9,479✔
712
        // We fake that everything was sent as instructed because
713
        // a potential loss would happen later, and we would not be informed
714
        // about it.
715
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
88✔
716
        return;
176✔
717
    }
718

719
    // JSON message interception for outgoing messages
720
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
721
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
722
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);
9,391✔
723

724
    size_t bytes_consumed = 0;
9,391✔
725
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);
9,391✔
726

727
    if (libmav_msg_opt) {
9,391✔
728
        // Create Mavsdk::MavlinkMessage directly for JSON interception
729
        Mavsdk::MavlinkMessage json_message;
9,391✔
730
        json_message.message_name = libmav_msg_opt.value().name();
9,389✔
731
        json_message.system_id = message.sysid;
9,385✔
732
        json_message.component_id = message.compid;
9,385✔
733

734
        // Extract target_system and target_component if present
735
        uint8_t target_system_id = 0;
9,385✔
736
        uint8_t target_component_id = 0;
9,385✔
737
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
9,385✔
738
            mav::MessageResult::Success) {
739
            json_message.target_system_id = target_system_id;
8,624✔
740
        } else {
741
            json_message.target_system_id = 0;
761✔
742
        }
743
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
9,385✔
744
            mav::MessageResult::Success) {
745
            json_message.target_component_id = target_component_id;
8,624✔
746
        } else {
747
            json_message.target_component_id = 0;
766✔
748
        }
749

750
        // Generate JSON using LibmavReceiver's public method
751
        auto connections = get_connections();
9,390✔
752
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
9,388✔
753
            json_message.fields_json =
754
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
27,898✔
755
                    libmav_msg_opt.value());
18,602✔
756
        } else {
757
            // Fallback: create minimal JSON if no receiver available
758
            json_message.fields_json =
759
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
176✔
760
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
177✔
761
        }
762

763
        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
9,390✔
764
            // Message was dropped by JSON interception callback
765
            if (_message_logging_on) {
×
766
                LogDebug() << "Outgoing JSON message " << json_message.message_name
×
767
                           << " dropped by interception";
×
768
            }
769
            return;
×
770
        }
771
    }
9,390✔
772

773
    std::lock_guard lock(_mutex);
9,384✔
774

775
    if (_connections.empty()) {
9,391✔
776
        // We obviously can't send any messages without a connection added, so
777
        // we silently ignore this.
778
        return;
88✔
779
    }
780

781
    uint8_t successful_emissions = 0;
9,296✔
782
    for (auto& _connection : _connections) {
18,620✔
783
        const uint8_t target_system_id = get_target_system_id(message);
9,324✔
784

785
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
9,320✔
786
            continue;
5✔
787
        }
788
        const auto result = (*_connection.connection).send_message(message);
9,314✔
789
        if (result.first) {
9,320✔
790
            successful_emissions++;
9,314✔
791
        } else {
792
            _connections_errors_subscriptions.queue(
12✔
793
                Mavsdk::ConnectionError{result.second, _connection.handle},
6✔
794
                [this](const auto& func) { call_user_callback(func); });
×
795
        }
796
    }
9,320✔
797

798
    if (successful_emissions == 0) {
9,303✔
799
        LogErr() << "Sending message failed";
4✔
800
    }
801
}
9,389✔
802

803
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
154✔
804
    const std::string& connection_url, ForwardingOption forwarding_option)
805
{
806
    CliArg cli_arg;
154✔
807
    if (!cli_arg.parse(connection_url)) {
154✔
808
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
809
    }
810

811
    return std::visit(
154✔
812
        overloaded{
308✔
813
            [](std::monostate) {
×
814
                // Should not happen anyway.
815
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
816
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
817
            },
818
            [this, forwarding_option](const CliArg::Udp& udp) {
142✔
819
                return add_udp_connection(udp, forwarding_option);
142✔
820
            },
821
            [this, forwarding_option](const CliArg::Tcp& tcp) {
10✔
822
                return add_tcp_connection(tcp, forwarding_option);
10✔
823
            },
824
            [this, forwarding_option](const CliArg::Serial& serial) {
×
825
                return add_serial_connection(
×
826
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
827
            },
828
            [this, forwarding_option](const CliArg::Raw&) {
2✔
829
                return add_raw_connection(forwarding_option);
2✔
830
            }},
831
        cli_arg.protocol);
154✔
832
}
154✔
833

834
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
835
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
142✔
836
{
837
    auto new_conn = std::make_unique<UdpConnection>(
838
        [this](
9,251✔
839
            MavlinkReceiver::ParseResult result,
840
            mavlink_message_t& message,
841
            Connection* connection) { receive_message(result, message, connection); },
9,251✔
842
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
9,249✔
843
            receive_libmav_message(message, connection);
9,249✔
844
        },
9,251✔
845
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
846
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
284✔
847
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
142✔
848
        forwarding_option);
142✔
849

850
    if (!new_conn) {
142✔
851
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
852
    }
853

854
    ConnectionResult ret = new_conn->start();
142✔
855

856
    if (ret != ConnectionResult::Success) {
142✔
857
        return {ret, Mavsdk::ConnectionHandle{}};
×
858
    }
859

860
    if (udp.mode == CliArg::Udp::Mode::Out) {
142✔
861
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
862
        // one for the IP, and one for a hostname.
863
        auto remote_ip = resolve_hostname_to_ip(udp.host);
71✔
864

865
        if (!remote_ip) {
71✔
866
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
867
        }
868

869
        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
71✔
870
        std::lock_guard lock(_mutex);
71✔
871

872
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
873
        auto new_configuration = get_configuration();
71✔
874
        new_configuration.set_always_send_heartbeats(true);
71✔
875
        set_configuration(new_configuration);
71✔
876
    }
71✔
877

878
    auto handle = add_connection(std::move(new_conn));
142✔
879

880
    return {ConnectionResult::Success, handle};
142✔
881
}
142✔
882

883
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
884
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
10✔
885
{
886
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
10✔
887
        auto new_conn = std::make_unique<TcpClientConnection>(
888
            [this](
46✔
889
                MavlinkReceiver::ParseResult result,
890
                mavlink_message_t& message,
891
                Connection* connection) { receive_message(result, message, connection); },
46✔
892
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
46✔
893
                receive_libmav_message(message, connection);
46✔
894
            },
46✔
895
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
896
            tcp.host,
5✔
897
            tcp.port,
5✔
898
            forwarding_option);
5✔
899
        if (!new_conn) {
5✔
900
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
901
        }
902
        ConnectionResult ret = new_conn->start();
5✔
903
        if (ret == ConnectionResult::Success) {
5✔
904
            return {ret, add_connection(std::move(new_conn))};
5✔
905
        } else {
906
            return {ret, Mavsdk::ConnectionHandle{}};
×
907
        }
908
    } else {
5✔
909
        auto new_conn = std::make_unique<TcpServerConnection>(
910
            [this](
46✔
911
                MavlinkReceiver::ParseResult result,
912
                mavlink_message_t& message,
913
                Connection* connection) { receive_message(result, message, connection); },
46✔
914
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
46✔
915
                receive_libmav_message(message, connection);
46✔
916
            },
46✔
917
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
918
            tcp.host,
5✔
919
            tcp.port,
5✔
920
            forwarding_option);
5✔
921
        if (!new_conn) {
5✔
922
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
923
        }
924
        ConnectionResult ret = new_conn->start();
5✔
925
        if (ret == ConnectionResult::Success) {
5✔
926
            return {ret, add_connection(std::move(new_conn))};
5✔
927
        } else {
928
            return {ret, Mavsdk::ConnectionHandle{}};
×
929
        }
930
    }
5✔
931
}
932

933
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
934
    const std::string& dev_path,
935
    int baudrate,
936
    bool flow_control,
937
    ForwardingOption forwarding_option)
938
{
939
    auto new_conn = std::make_unique<SerialConnection>(
940
        [this](
×
941
            MavlinkReceiver::ParseResult result,
942
            mavlink_message_t& message,
943
            Connection* connection) { receive_message(result, message, connection); },
×
944
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
×
945
            receive_libmav_message(message, connection);
×
946
        },
×
947
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
948
        dev_path,
949
        baudrate,
950
        flow_control,
951
        forwarding_option);
×
952
    if (!new_conn) {
×
953
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
954
    }
955
    ConnectionResult ret = new_conn->start();
×
956
    if (ret == ConnectionResult::Success) {
×
957
        auto handle = add_connection(std::move(new_conn));
×
958

959
        auto new_configuration = get_configuration();
×
960

961
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
962
        // to initiate the MAVLink connection by sending heartbeats.
963
        // Therefore, we override the default here and enable sending heartbeats.
964
        new_configuration.set_always_send_heartbeats(true);
×
965
        set_configuration(new_configuration);
×
966

967
        return {ret, handle};
×
968

969
    } else {
970
        return {ret, Mavsdk::ConnectionHandle{}};
×
971
    }
972
}
×
973

974
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
975
MavsdkImpl::add_raw_connection(ForwardingOption forwarding_option)
2✔
976
{
977
    // Check if a raw connection already exists
978
    if (find_raw_connection() != nullptr) {
2✔
979
        LogErr() << "Raw connection already exists. Only one raw connection is allowed.";
×
980
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
981
    }
982

983
    auto new_conn = std::make_unique<RawConnection>(
984
        [this](
1✔
985
            MavlinkReceiver::ParseResult result,
986
            mavlink_message_t& message,
987
            Connection* connection) { receive_message(result, message, connection); },
1✔
988
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
1✔
989
            receive_libmav_message(message, connection);
1✔
990
        },
1✔
991
        *this,
992
        forwarding_option);
2✔
993

994
    if (!new_conn) {
2✔
995
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
996
    }
997

998
    ConnectionResult ret = new_conn->start();
2✔
999
    if (ret != ConnectionResult::Success) {
2✔
1000
        return {ret, Mavsdk::ConnectionHandle{}};
×
1001
    }
1002

1003
    auto handle = add_connection(std::move(new_conn));
2✔
1004

1005
    // Enable heartbeats for raw connection
1006
    auto new_configuration = get_configuration();
2✔
1007
    new_configuration.set_always_send_heartbeats(true);
2✔
1008
    set_configuration(new_configuration);
2✔
1009

1010
    return {ConnectionResult::Success, handle};
2✔
1011
}
2✔
1012

1013
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
154✔
1014
{
1015
    std::lock_guard lock(_mutex);
154✔
1016
    auto handle = _connections_handle_factory.create();
154✔
1017
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
154✔
1018

1019
    return handle;
308✔
1020
}
154✔
1021

1022
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
12✔
1023
{
1024
    std::lock_guard lock(_mutex);
12✔
1025

1026
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
12✔
1027
        return (entry.handle == handle);
12✔
1028
    }));
1029
}
12✔
1030

1031
Mavsdk::Configuration MavsdkImpl::get_configuration() const
73✔
1032
{
1033
    std::lock_guard configuration_lock(_mutex);
73✔
1034
    return _configuration;
146✔
1035
}
73✔
1036

1037
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
221✔
1038
{
1039
    std::lock_guard server_components_lock(_server_components_mutex);
221✔
1040
    // We just point the default to the newly created component. This means
1041
    // that the previous default component will be deleted if it is not
1042
    // used/referenced anywhere.
1043
    _default_server_component = server_component_by_id_with_lock(
442✔
1044
        new_configuration.get_component_id(), new_configuration.get_mav_type());
442✔
1045

1046
    if (new_configuration.get_always_send_heartbeats() &&
368✔
1047
        !_configuration.get_always_send_heartbeats()) {
147✔
1048
        start_sending_heartbeats();
78✔
1049
    } else if (
143✔
1050
        !new_configuration.get_always_send_heartbeats() &&
217✔
1051
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
217✔
1052
        stop_sending_heartbeats();
×
1053
    }
1054

1055
    _configuration = new_configuration;
221✔
1056
    // We cache these values as atomic to avoid having to lock any mutex for them.
1057
    _our_system_id = new_configuration.get_system_id();
221✔
1058
    _our_component_id = new_configuration.get_component_id();
221✔
1059
}
221✔
1060

1061
uint8_t MavsdkImpl::get_own_system_id() const
12,839✔
1062
{
1063
    return _our_system_id;
12,839✔
1064
}
1065

1066
uint8_t MavsdkImpl::get_own_component_id() const
1,473✔
1067
{
1068
    return _our_component_id;
1,473✔
1069
}
1070

1071
uint8_t MavsdkImpl::get_mav_type() const
×
1072
{
1073
    return _configuration.get_mav_type();
×
1074
}
1075

1076
Autopilot MavsdkImpl::get_autopilot() const
×
1077
{
1078
    return _configuration.get_autopilot();
×
1079
}
1080

1081
uint8_t MavsdkImpl::get_mav_autopilot() const
265✔
1082
{
1083
    switch (_configuration.get_autopilot()) {
265✔
1084
        case Autopilot::Px4:
×
1085
            return MAV_AUTOPILOT_PX4;
×
1086
        case Autopilot::ArduPilot:
×
1087
            return MAV_AUTOPILOT_ARDUPILOTMEGA;
×
1088
        case Autopilot::Unknown:
265✔
1089
        default:
1090
            return MAV_AUTOPILOT_GENERIC;
265✔
1091
    }
1092
}
1093

1094
CompatibilityMode MavsdkImpl::get_compatibility_mode() const
38✔
1095
{
1096
    return _configuration.get_compatibility_mode();
38✔
1097
}
1098

1099
Autopilot MavsdkImpl::effective_autopilot(Autopilot detected) const
1,543✔
1100
{
1101
    switch (_configuration.get_compatibility_mode()) {
1,543✔
1102
        case CompatibilityMode::Auto:
1,543✔
1103
            return detected;
1,543✔
1104
        case CompatibilityMode::Pure:
×
1105
            return Autopilot::Unknown; // Unknown = no quirks
×
1106
        case CompatibilityMode::Px4:
×
1107
            return Autopilot::Px4;
×
1108
        case CompatibilityMode::ArduPilot:
×
1109
            return Autopilot::ArduPilot;
×
1110
        default:
×
1111
            return detected;
×
1112
    }
1113
}
1114

1115
uint8_t MavsdkImpl::mav_type_for_component_type(ComponentType component_type)
355✔
1116
{
1117
    switch (component_type) {
355✔
1118
        case ComponentType::Autopilot:
107✔
1119
            return MAV_TYPE_GENERIC;
107✔
1120
        case ComponentType::GroundStation:
222✔
1121
            return MAV_TYPE_GCS;
222✔
1122
        case ComponentType::CompanionComputer:
4✔
1123
            return MAV_TYPE_ONBOARD_CONTROLLER;
4✔
1124
        case ComponentType::Camera:
22✔
1125
            return MAV_TYPE_CAMERA;
22✔
1126
        case ComponentType::Gimbal:
×
1127
            return MAV_TYPE_GIMBAL;
×
1128
        case ComponentType::RemoteId:
×
1129
            return MAV_TYPE_ODID;
×
1130
        case ComponentType::Custom:
×
1131
            return MAV_TYPE_GENERIC;
×
1132
        default:
×
1133
            return MAV_TYPE_GENERIC;
×
1134
    }
1135
}
1136

1137
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
152✔
1138
{
1139
    // Needs _systems_lock
1140

1141
    if (_should_exit) {
152✔
1142
        // When the system got destroyed in the destructor, we have to give up.
1143
        return;
×
1144
    }
1145

1146
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
152✔
1147
        LogDebug() << "Initializing connection to remote system...";
×
1148
    }
1149

1150
    // Make a system with its first component
1151
    auto new_system = std::make_shared<System>(*this);
152✔
1152
    new_system->init(system_id, comp_id);
152✔
1153

1154
    _systems.emplace_back(system_id, new_system);
152✔
1155
}
152✔
1156

1157
void MavsdkImpl::notify_on_discover()
156✔
1158
{
1159
    // Queue the callbacks without holding the mutex to avoid deadlocks
1160
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
226✔
1161
}
156✔
1162

1163
void MavsdkImpl::notify_on_timeout()
5✔
1164
{
1165
    // Queue the callbacks without holding the mutex to avoid deadlocks
1166
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
5✔
1167
}
5✔
1168

1169
Mavsdk::NewSystemHandle
1170
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
69✔
1171
{
1172
    std::lock_guard lock(_mutex);
69✔
1173

1174
    const auto handle = _new_system_callbacks.subscribe(callback);
69✔
1175

1176
    if (is_any_system_connected()) {
69✔
UNCOV
1177
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1178
    }
1179

1180
    return handle;
138✔
1181
}
69✔
1182

1183
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
68✔
1184
{
1185
    _new_system_callbacks.unsubscribe(handle);
68✔
1186
}
68✔
1187

1188
bool MavsdkImpl::is_any_system_connected() const
69✔
1189
{
1190
    std::vector<std::shared_ptr<System>> connected_systems = systems();
69✔
1191
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
69✔
UNCOV
1192
        return system->is_connected();
×
1193
    });
69✔
1194
}
69✔
1195

1196
void MavsdkImpl::work_thread()
148✔
1197
{
1198
    while (!_should_exit) {
40,993✔
1199
        // Process incoming messages
1200
        process_messages();
40,635✔
1201

1202
        // Process incoming libmav messages
1203
        process_libmav_messages();
40,019✔
1204

1205
        // Run timers
1206
        timeout_handler.run_once();
40,054✔
1207
        call_every_handler.run_once();
40,326✔
1208

1209
        // Do component work
1210
        {
1211
            std::lock_guard lock(_server_components_mutex);
40,977✔
1212
            for (auto& it : _server_components) {
81,845✔
1213
                if (it.second != nullptr) {
41,065✔
1214
                    it.second->_impl->do_work();
40,930✔
1215
                }
1216
            }
1217
        }
40,070✔
1218

1219
        // Deliver outgoing messages
1220
        deliver_messages();
40,336✔
1221

1222
        // If no messages to send, check if there are messages to receive
1223
        std::unique_lock lock_received(_received_messages_mutex);
40,514✔
1224
        if (_received_messages.empty()) {
40,532✔
1225
            // No messages to process, wait for a signal or timeout
1226
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
40,078✔
1227
                return !_received_messages.empty() || _should_exit;
81,499✔
1228
            });
1229
        }
1230
    }
41,186✔
1231
}
190✔
1232

NEW
1233
void MavsdkImpl::set_callback_executor(std::function<void(std::function<void()>)> executor)
×
1234
{
1235
    bool has_executor;
1236
    {
NEW
1237
        std::lock_guard<std::mutex> lock(_callback_executor_mutex);
×
NEW
1238
        _callback_executor = std::move(executor);
×
NEW
1239
        has_executor = !!_callback_executor;
×
NEW
1240
    }
×
1241

NEW
1242
    if (has_executor) {
×
1243
        // Stop the internal callback thread since user will handle callbacks
NEW
1244
        if (_process_user_callbacks_thread) {
×
NEW
1245
            _user_callback_queue.stop();
×
NEW
1246
            _process_user_callbacks_thread->join();
×
NEW
1247
            _process_user_callbacks_thread.reset();
×
1248
        }
1249

1250
        // Drain any remaining callbacks through the executor
1251
        {
NEW
1252
            std::lock_guard<std::mutex> lock(_callback_executor_mutex);
×
NEW
1253
            if (_callback_executor) {
×
NEW
1254
                LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
×
NEW
1255
                while (auto ptr = guard.get_front()) {
×
NEW
1256
                    _callback_executor(ptr->func);
×
NEW
1257
                    guard.pop_front();
×
NEW
1258
                }
×
NEW
1259
            }
×
NEW
1260
        }
×
1261
    } else {
1262
        // Revert to default internal callback thread
NEW
1263
        if (!_process_user_callbacks_thread) {
×
NEW
1264
            _user_callback_queue.restart();
×
1265
            _process_user_callbacks_thread =
NEW
1266
                std::make_unique<std::thread>(&MavsdkImpl::process_user_callbacks_thread, this);
×
1267
        }
1268
    }
NEW
1269
}
×
1270

1271
void MavsdkImpl::call_user_callback_located(
1,259✔
1272
    const std::string& filename, const int linenumber, const std::function<void()>& func)
1273
{
1274
    // Don't enqueue callbacks if we're shutting down
1275
    if (_should_exit) {
1,259✔
1276
        return;
×
1277
    }
1278

1279
    {
1280
        std::lock_guard<std::mutex> lock(_callback_executor_mutex);
1,259✔
1281
        if (_callback_executor) {
1,259✔
NEW
1282
            _callback_executor(func);
×
NEW
1283
            return;
×
1284
        }
1285
    }
1,259✔
1286

1287
    auto callback_size = _user_callback_queue.size();
1,259✔
1288

1289
    if (_callback_tracker) {
1,259✔
1290
        _callback_tracker->record_queued(filename, linenumber);
×
1291
        _callback_tracker->maybe_print_stats(callback_size);
×
1292
    }
1293

1294
    if (callback_size >= 100) {
1,259✔
1295
        return;
×
1296

1297
    } else if (callback_size == 99) {
1,259✔
1298
        LogErr()
×
1299
            << "User callback queue overflown\n"
1300
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1301
        return;
×
1302

1303
    } else if (callback_size >= 10) {
1,259✔
1304
        LogWarn()
×
1305
            << "User callback queue slow (queue size: " << callback_size
×
1306
            << ").\n"
1307
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1308
    }
1309

1310
    // We only need to keep track of filename and linenumber if we're actually debugging this.
1311
    UserCallback user_callback =
1312
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,518✔
1313

1314
    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
1,259✔
1315
}
1,259✔
1316

1317
void MavsdkImpl::process_user_callbacks_thread()
148✔
1318
{
1319
    while (!_should_exit) {
1,407✔
1320
        UserCallback callback;
1,407✔
1321
        {
1322
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
1,407✔
1323
            auto ptr = guard.wait_and_pop_front();
1,407✔
1324
            if (!ptr) {
1,407✔
1325
                break;
148✔
1326
            }
1327
            // We need to get a copy instead of just a shared_ptr because the queue might
1328
            // be invalidated when the lock is released.
1329
            callback = *ptr;
1,259✔
1330
        }
1,555✔
1331

1332
        // Check if we're in the process of shutting down before executing the callback
1333
        if (_should_exit) {
1,259✔
1334
            continue;
×
1335
        }
1336

1337
        const double timeout_s = 1.0;
1,259✔
1338
        auto cookie = timeout_handler.add(
1,259✔
1339
            [&]() {
×
1340
                if (_callback_debugging) {
×
1341
                    LogWarn() << "Callback called from " << callback.filename << ":"
×
1342
                              << callback.linenumber << " took more than " << timeout_s
×
1343
                              << " second to run.";
×
1344
                    fflush(stdout);
×
1345
                    fflush(stderr);
×
1346
                    abort();
×
1347
                } else {
1348
                    LogWarn()
×
1349
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
1350
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1351
                }
1352
            },
×
1353
            timeout_s);
1354
        auto callback_start = std::chrono::steady_clock::now();
1,259✔
1355
        callback.func();
1,259✔
1356
        auto callback_end = std::chrono::steady_clock::now();
1,259✔
1357
        timeout_handler.remove(cookie);
1,259✔
1358

1359
        if (_callback_tracker) {
1,259✔
1360
            auto callback_duration_us =
1361
                std::chrono::duration_cast<std::chrono::microseconds>(callback_end - callback_start)
×
1362
                    .count();
×
1363
            _callback_tracker->record_executed(
×
1364
                callback.filename, callback.linenumber, callback_duration_us);
1365
        }
1366
    }
1,407✔
1367
}
148✔
1368

1369
void MavsdkImpl::start_sending_heartbeats()
386✔
1370
{
1371
    // Check if we're in the process of shutting down
1372
    if (_should_exit) {
386✔
1373
        return;
×
1374
    }
1375

1376
    // Before sending out first heartbeats we need to make sure we have a
1377
    // default server component.
1378
    default_server_component_impl();
386✔
1379

1380
    {
1381
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
386✔
1382
        call_every_handler.remove(_heartbeat_send_cookie);
386✔
1383
        _heartbeat_send_cookie =
386✔
1384
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
879✔
1385
    }
386✔
1386
}
1387

1388
void MavsdkImpl::stop_sending_heartbeats()
5✔
1389
{
1390
    if (!_configuration.get_always_send_heartbeats()) {
5✔
1391
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
1✔
1392
        call_every_handler.remove(_heartbeat_send_cookie);
1✔
1393
    }
1✔
1394
}
5✔
1395

1396
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,436✔
1397
{
1398
    std::lock_guard lock(_server_components_mutex);
1,436✔
1399
    return default_server_component_with_lock();
1,437✔
1400
}
1,437✔
1401

1402
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,437✔
1403
{
1404
    if (_default_server_component == nullptr) {
1,437✔
1405
        _default_server_component =
1406
            server_component_by_id_with_lock(_our_component_id, get_mav_type());
×
1407
    }
1408
    return *_default_server_component->_impl;
1,436✔
1409
}
1410

1411
void MavsdkImpl::send_heartbeats()
488✔
1412
{
1413
    std::lock_guard lock(_server_components_mutex);
488✔
1414

1415
    for (auto& it : _server_components) {
984✔
1416
        if (it.second != nullptr) {
494✔
1417
            it.second->_impl->send_heartbeat();
492✔
1418
        }
1419
    }
1420
}
491✔
1421

1422
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
28✔
1423
{
1424
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
28✔
1425
    _intercept_incoming_messages_callback = callback;
28✔
1426
}
28✔
1427

1428
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
16✔
1429
{
1430
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
16✔
1431
    _intercept_outgoing_messages_callback = callback;
16✔
1432
}
16✔
1433

1434
bool MavsdkImpl::call_json_interception_callbacks(
18,710✔
1435
    const Mavsdk::MavlinkMessage& json_message,
1436
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
1437
        callback_list)
1438
{
1439
    bool keep_message = true;
18,710✔
1440

1441
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
18,710✔
1442
    for (const auto& subscription : callback_list) {
18,713✔
1443
        if (!subscription.second(json_message)) {
4✔
1444
            keep_message = false;
×
1445
        }
1446
    }
1447

1448
    return keep_message;
37,403✔
1449
}
18,696✔
1450

1451
Mavsdk::InterceptJsonHandle
1452
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1453
{
1454
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1455
    auto handle = _json_handle_factory.create();
1✔
1456
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1457
    return handle;
2✔
1458
}
1✔
1459

1460
void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1461
{
1462
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1463
    auto it = std::find_if(
1✔
1464
        _incoming_json_message_subscriptions.begin(),
1465
        _incoming_json_message_subscriptions.end(),
1466
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1467
    if (it != _incoming_json_message_subscriptions.end()) {
1✔
1468
        _incoming_json_message_subscriptions.erase(it);
1✔
1469
    }
1470
}
1✔
1471

1472
Mavsdk::InterceptJsonHandle
1473
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1474
{
1475
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1476
    auto handle = _json_handle_factory.create();
1✔
1477
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1478
    return handle;
2✔
1479
}
1✔
1480

1481
void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1482
{
1483
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1484
    auto it = std::find_if(
1✔
1485
        _outgoing_json_message_subscriptions.begin(),
1486
        _outgoing_json_message_subscriptions.end(),
1487
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1488
    if (it != _outgoing_json_message_subscriptions.end()) {
1✔
1489
        _outgoing_json_message_subscriptions.erase(it);
1✔
1490
    }
1491
}
1✔
1492

1493
RawConnection* MavsdkImpl::find_raw_connection()
4✔
1494
{
1495
    std::lock_guard lock(_mutex);
4✔
1496

1497
    for (auto& entry : _connections) {
4✔
1498
        auto* raw_conn = dynamic_cast<RawConnection*>(entry.connection.get());
2✔
1499
        if (raw_conn != nullptr) {
2✔
1500
            return raw_conn;
2✔
1501
        }
1502
    }
1503
    return nullptr;
2✔
1504
}
4✔
1505

1506
void MavsdkImpl::pass_received_raw_bytes(const char* bytes, size_t length)
1✔
1507
{
1508
    auto* raw_conn = find_raw_connection();
1✔
1509
    if (raw_conn == nullptr) {
1✔
1510
        LogErr()
×
1511
            << "No raw connection available. Please add one using add_any_connection(\"raw://\")";
×
1512
        return;
×
1513
    }
1514

1515
    raw_conn->receive(bytes, length);
1✔
1516
}
1517

1518
Mavsdk::RawBytesHandle
1519
MavsdkImpl::subscribe_raw_bytes_to_be_sent(const Mavsdk::RawBytesCallback& callback)
1✔
1520
{
1521
    if (find_raw_connection() == nullptr) {
1✔
1522
        LogWarn() << "No raw connection available. Subscription will only receive bytes after you "
×
1523
                     "add a connection using add_any_connection(\"raw://\")";
×
1524
    }
1525
    return _raw_bytes_subscriptions.subscribe(callback);
1✔
1526
}
1527

1528
void MavsdkImpl::unsubscribe_raw_bytes_to_be_sent(Mavsdk::RawBytesHandle handle)
1✔
1529
{
1530
    _raw_bytes_subscriptions.unsubscribe(handle);
1✔
1531
}
1✔
1532

1533
bool MavsdkImpl::notify_raw_bytes_sent(const char* bytes, size_t length)
3✔
1534
{
1535
    if (_raw_bytes_subscriptions.empty()) {
3✔
1536
        return false;
2✔
1537
    }
1538

1539
    _raw_bytes_subscriptions(bytes, length);
1✔
1540

1541
    return true;
1✔
1542
}
1543

1544
Mavsdk::ConnectionErrorHandle
1545
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1546
{
1547
    std::lock_guard lock(_mutex);
×
1548

1549
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1550

1551
    return handle;
×
1552
}
×
1553

1554
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1555
{
1556
    std::lock_guard lock(_mutex);
×
1557
    _connections_errors_subscriptions.unsubscribe(handle);
×
1558
}
×
1559

1560
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
9,370✔
1561
{
1562
    // Checks whether connection knows target system ID by extracting target system if set.
1563
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
9,370✔
1564

1565
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
9,370✔
1566
        return 0;
725✔
1567
    }
1568

1569
    // Don't look at the target system offset if it is outside the payload length.
1570
    // This can happen if the fields are trimmed.
1571
    if (meta->target_system_ofs >= message.len) {
8,645✔
1572
        return 0;
25✔
1573
    }
1574

1575
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
8,620✔
1576
}
1577

1578
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
50✔
1579
{
1580
    // Checks whether connection knows target system ID by extracting target system if set.
1581
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
50✔
1582

1583
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
50✔
1584
        return 0;
33✔
1585
    }
1586

1587
    // Don't look at the target component offset if it is outside the payload length.
1588
    // This can happen if the fields are trimmed.
1589
    if (meta->target_component_ofs >= message.len) {
17✔
1590
        return 0;
1✔
1591
    }
1592

1593
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
16✔
1594
}
1595

1596
Sender& MavsdkImpl::sender()
×
1597
{
1598
    std::lock_guard lock(_server_components_mutex);
×
1599
    return default_server_component_with_lock().sender();
×
1600
}
×
1601

1602
std::vector<Connection*> MavsdkImpl::get_connections() const
9,389✔
1603
{
1604
    std::lock_guard lock(_mutex);
9,389✔
1605
    std::vector<Connection*> connections;
9,390✔
1606
    for (const auto& connection_entry : _connections) {
18,708✔
1607
        connections.push_back(connection_entry.connection.get());
9,319✔
1608
    }
1609
    return connections;
9,386✔
1610
}
9,386✔
1611

1612
mav::MessageSet& MavsdkImpl::get_message_set() const
23✔
1613
{
1614
    // Note: This returns a reference to MessageSet without locking.
1615
    // Thread safety for MessageSet operations must be ensured by:
1616
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
1617
    // 2. libmav MessageSet should be internally thread-safe for read operations
1618
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
1619
    return *_message_set;
23✔
1620
}
1621

1622
bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
6✔
1623
{
1624
    std::lock_guard<std::mutex> lock(_message_set_mutex);
6✔
1625
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
6✔
1626
    return result == ::mav::MessageSetResult::Success;
12✔
1627
}
6✔
1628

1629
// Thread-safe MessageSet read operations
1630
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
×
1631
{
1632
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1633
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
×
1634
    if (message_def) {
×
1635
        return message_def.get().name();
×
1636
    }
1637
    return std::nullopt;
×
1638
}
×
1639

1640
std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
×
1641
{
1642
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1643
    return _message_set->idForMessage(name);
×
1644
}
×
1645

1646
std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
×
1647
{
1648
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1649
    return _message_set->create(message_name);
×
1650
}
×
1651

1652
// Thread-safe parsing for LibmavReceiver
1653
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
18,719✔
1654
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
1655
{
1656
    std::lock_guard<std::mutex> lock(_message_set_mutex);
18,719✔
1657
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
18,731✔
1658
}
18,731✔
1659

1660
mav::OptionalReference<const mav::MessageDefinition>
1661
MavsdkImpl::get_message_definition_safe(int message_id) const
18,644✔
1662
{
1663
    std::lock_guard<std::mutex> lock(_message_set_mutex);
18,644✔
1664
    return _message_set->getMessageDefinition(message_id);
18,642✔
1665
}
18,646✔
1666

1667
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc