• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 20734216309

06 Jan 2026 12:59AM UTC coverage: 48.052% (-0.006%) from 48.058%
20734216309

Pull #2747

github

web-flow
Merge 9651d3985 into 5d2947b34
Pull Request #2747: core: revert message intercept behavior

14 of 15 new or added lines in 2 files covered. (93.33%)

4 existing lines in 4 files now uncovered.

17736 of 36910 relevant lines covered (48.05%)

468.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.21
/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "raw_connection.h"
13
#include "system.h"
14
#include "system_impl.h"
15
#include "serial_connection.h"
16
#include "version.h"
17
#include "server_component_impl.h"
18
#include "overloaded.h"
19
#include "mavlink_channels.h"
20
#include "callback_list.tpp"
21
#include "hostname_to_ip.h"
22
#include "embedded_mavlink_xml.h"
23
#include <mav/MessageSet.h>
24

25
namespace mavsdk {
26

27
template class CallbackList<>;
28

29
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
138✔
30
    timeout_handler(time),
138✔
31
    call_every_handler(time)
276✔
32
{
33
    LogInfo() << "MAVSDK version: " << mavsdk_version;
138✔
34

35
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
138✔
36
        if (std::string(env_p) == "1") {
×
37
            LogDebug() << "Callback debugging is on.";
×
38
            _callback_debugging = true;
×
39
            _callback_tracker = std::make_unique<CallbackTracker>();
×
40
        }
41
    }
42

43
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
138✔
44
        if (std::string(env_p) == "1") {
×
45
            LogDebug() << "Message debugging is on.";
×
46
            _message_logging_on = true;
×
47
        }
48
    }
49

50
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
138✔
51
        if (std::string(env_p) == "1") {
×
52
            LogDebug() << "System debugging is on.";
×
53
            _system_debugging = true;
×
54
        }
55
    }
56

57
    set_configuration(configuration);
138✔
58

59
    // Initialize MessageSet with embedded XML content in dependency order
60
    // This happens at startup before any connections are created, so no synchronization needed
61
    _message_set = std::make_unique<mav::MessageSet>();
138✔
62
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
138✔
63
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
138✔
64
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
138✔
65
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);
138✔
66

67
    // Initialize BufferParser for thread-safe parsing
68
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);
138✔
69

70
    // Start the user callback thread first, so it is ready for anything generated by
71
    // the work thread.
72

73
    _process_user_callbacks_thread =
276✔
74
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);
138✔
75

76
    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
138✔
77
}
138✔
78

79
MavsdkImpl::~MavsdkImpl()
138✔
80
{
81
    {
82
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
138✔
83
        call_every_handler.remove(_heartbeat_send_cookie);
138✔
84
    }
138✔
85

86
    _should_exit = true;
138✔
87

88
    // Stop work first because we don't want to trigger anything that would
89
    // potentially want to call into user code.
90

91
    if (_work_thread != nullptr) {
138✔
92
        _work_thread->join();
138✔
93
        delete _work_thread;
138✔
94
        _work_thread = nullptr;
138✔
95
    }
96

97
    if (_process_user_callbacks_thread != nullptr) {
138✔
98
        _user_callback_queue.stop();
138✔
99
        _process_user_callbacks_thread->join();
138✔
100
        delete _process_user_callbacks_thread;
138✔
101
        _process_user_callbacks_thread = nullptr;
138✔
102
    }
103

104
    std::lock_guard lock(_mutex);
138✔
105

106
    _systems.clear();
138✔
107
    _connections.clear();
138✔
108
}
138✔
109

110
std::string MavsdkImpl::version()
1✔
111
{
112
    static unsigned version_counter = 0;
113

114
    ++version_counter;
1✔
115

116
    switch (version_counter) {
1✔
117
        case 10:
×
118
            return "You were wondering about the name of this library?";
×
119
        case 11:
×
120
            return "Let's look at the history:";
×
121
        case 12:
×
122
            return "DroneLink";
×
123
        case 13:
×
124
            return "DroneCore";
×
125
        case 14:
×
126
            return "DronecodeSDK";
×
127
        case 15:
×
128
            return "MAVSDK";
×
129
        case 16:
×
130
            return "And that's it...";
×
131
        case 17:
×
132
            return "At least for now ¯\\_(ツ)_/¯.";
×
133
        default:
1✔
134
            return mavsdk_version;
1✔
135
    }
136
}
137

138
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
175✔
139
{
140
    std::vector<std::shared_ptr<System>> systems_result{};
175✔
141

142
    std::lock_guard lock(_mutex);
175✔
143
    for (auto& system : _systems) {
278✔
144
        // We ignore the 0 entry because it's just a null system.
145
        // It's only created because the older, deprecated API needs a
146
        // reference.
147
        if (system.first == 0) {
103✔
148
            continue;
×
149
        }
150
        systems_result.push_back(system.second);
103✔
151
    }
152

153
    return systems_result;
175✔
154
}
175✔
155

156
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
57✔
157
{
158
    {
159
        std::lock_guard lock(_mutex);
57✔
160
        for (auto system : _systems) {
58✔
161
            if (system.second->is_connected() && system.second->has_autopilot()) {
4✔
162
                return system.second;
3✔
163
            }
164
        }
4✔
165
    }
57✔
166

167
    if (timeout_s == 0.0) {
54✔
168
        // Don't wait at all.
169
        return {};
×
170
    }
171

172
    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
54✔
173
    auto fut = prom->get_future();
54✔
174

175
    auto flag = std::make_shared<std::once_flag>();
54✔
176
    auto handle = subscribe_on_new_system([this, prom, flag]() {
54✔
177
        // Check all systems, not just the first one
178
        auto all_systems = systems();
55✔
179
        for (auto& system : all_systems) {
57✔
180
            if (system->is_connected() && system->has_autopilot()) {
56✔
181
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
108✔
182
                break;
54✔
183
            }
184
        }
185
    });
55✔
186

187
    if (timeout_s > 0.0) {
54✔
188
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
54✔
189
            std::future_status::ready) {
190
            unsubscribe_on_new_system(handle);
54✔
191
            return fut.get();
54✔
192

193
        } else {
194
            unsubscribe_on_new_system(handle);
×
195
            return std::nullopt;
×
196
        }
197
    } else {
198
        fut.wait();
×
199
        unsubscribe_on_new_system(handle);
×
200
        return std::optional(fut.get());
×
201
    }
202
}
54✔
203

204
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
54✔
205
{
206
    std::lock_guard lock(_mutex);
54✔
207

208
    auto component_type = _configuration.get_component_type();
54✔
209
    switch (component_type) {
54✔
210
        case ComponentType::Autopilot:
54✔
211
        case ComponentType::GroundStation:
212
        case ComponentType::CompanionComputer:
213
        case ComponentType::Camera:
214
        case ComponentType::Gimbal:
215
        case ComponentType::RemoteId:
216
        case ComponentType::Custom:
217
            return server_component_by_type(component_type, instance);
54✔
218
        default:
×
219
            LogErr() << "Unknown component type";
×
220
            return {};
×
221
    }
222
}
54✔
223

224
std::shared_ptr<ServerComponent>
225
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
54✔
226
{
227
    switch (server_component_type) {
54✔
228
        case ComponentType::Autopilot:
40✔
229
            if (instance == 0) {
40✔
230
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1);
40✔
231
            } else {
232
                LogErr() << "Only autopilot instance 0 is valid";
×
233
                return {};
×
234
            }
235

236
        case ComponentType::GroundStation:
×
237
            if (instance == 0) {
×
238
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER);
×
239
            } else {
240
                LogErr() << "Only one ground station supported at this time";
×
241
                return {};
×
242
            }
243

244
        case ComponentType::CompanionComputer:
1✔
245
            if (instance == 0) {
1✔
246
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER);
1✔
247
            } else if (instance == 1) {
×
248
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2);
×
249
            } else if (instance == 2) {
×
250
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3);
×
251
            } else if (instance == 3) {
×
252
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4);
×
253
            } else {
254
                LogErr() << "Only companion computer 0..3 are supported";
×
255
                return {};
×
256
            }
257

258
        case ComponentType::Camera:
13✔
259
            if (instance == 0) {
13✔
260
                return server_component_by_id(MAV_COMP_ID_CAMERA);
13✔
261
            } else if (instance == 1) {
×
262
                return server_component_by_id(MAV_COMP_ID_CAMERA2);
×
263
            } else if (instance == 2) {
×
264
                return server_component_by_id(MAV_COMP_ID_CAMERA3);
×
265
            } else if (instance == 3) {
×
266
                return server_component_by_id(MAV_COMP_ID_CAMERA4);
×
267
            } else if (instance == 4) {
×
268
                return server_component_by_id(MAV_COMP_ID_CAMERA5);
×
269
            } else if (instance == 5) {
×
270
                return server_component_by_id(MAV_COMP_ID_CAMERA6);
×
271
            } else {
272
                LogErr() << "Only camera 0..5 are supported";
×
273
                return {};
×
274
            }
275

276
        default:
×
277
            LogErr() << "Unknown server component type";
×
278
            return {};
×
279
    }
280
}
281

282
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id(uint8_t component_id)
54✔
283
{
284
    if (component_id == 0) {
54✔
285
        LogErr() << "Server component with component ID 0 not allowed";
×
286
        return nullptr;
×
287
    }
288

289
    std::lock_guard lock(_server_components_mutex);
54✔
290

291
    return server_component_by_id_with_lock(component_id);
54✔
292
}
54✔
293

294
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id)
261✔
295
{
296
    for (auto& it : _server_components) {
262✔
297
        if (it.first == component_id) {
123✔
298
            if (it.second != nullptr) {
122✔
299
                return it.second;
122✔
300
            } else {
301
                it.second = std::make_shared<ServerComponent>(*this, component_id);
×
302
            }
303
        }
304
    }
305

306
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
139✔
307
        component_id, std::make_shared<ServerComponent>(*this, component_id)));
278✔
308

309
    return _server_components.back().second;
139✔
310
}
311

312
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
58✔
313
{
314
    // Forward_message Function implementing Mavlink routing rules.
315
    // See https://mavlink.io/en/guide/routing.html
316

317
    bool forward_heartbeats_enabled = true;
58✔
318
    const uint8_t target_system_id = get_target_system_id(message);
58✔
319
    const uint8_t target_component_id = get_target_component_id(message);
58✔
320

321
    // If it's a message only for us, we keep it, otherwise, we forward it.
322
    const bool targeted_only_at_us =
323
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
58✔
324

325
    // We don't forward heartbeats unless it's specifically enabled.
326
    const bool heartbeat_check_ok =
58✔
327
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
58✔
328

329
    if (!targeted_only_at_us && heartbeat_check_ok) {
58✔
330
        unsigned successful_emissions = 0;
42✔
331
        for (auto& entry : _connections) {
123✔
332
            // Check whether the connection is not the one from which we received the message.
333
            // And also check if the connection was set to forward messages.
334
            if (entry.connection.get() == connection ||
120✔
335
                !entry.connection->should_forward_messages()) {
39✔
336
                continue;
42✔
337
            }
338
            auto result = (*entry.connection).send_message(message);
39✔
339
            if (result.first) {
39✔
340
                successful_emissions++;
38✔
341
            } else {
342
                _connections_errors_subscriptions.queue(
2✔
343
                    Mavsdk::ConnectionError{result.second, entry.handle},
1✔
344
                    [this](const auto& func) { call_user_callback(func); });
×
345
            }
346
        }
39✔
347
        if (successful_emissions == 0) {
42✔
348
            if (_system_debugging) {
4✔
349
                LogErr() << "Message forwarding failed";
×
350
            }
351
        }
352
    }
353
}
58✔
354

355
void MavsdkImpl::receive_message(
2,585✔
356
    MavlinkReceiver::ParseResult result, mavlink_message_t& message, Connection* connection)
357
{
358
    if (result == MavlinkReceiver::ParseResult::MessageParsed) {
2,585✔
359
        // Valid message: queue for full processing (which includes forwarding)
360
        {
361
            std::lock_guard lock(_received_messages_mutex);
2,582✔
362
            _received_messages.emplace(ReceivedMessage{std::move(message), connection});
2,582✔
363
        }
2,581✔
364
        _received_messages_cv.notify_one();
2,579✔
365

366
    } else if (result == MavlinkReceiver::ParseResult::BadCrc) {
3✔
367
        // Unknown message: forward only, don't process locally
368
        forward_message(message, connection);
4✔
369
    }
370
}
2,585✔
371

372
void MavsdkImpl::receive_libmav_message(
2,585✔
373
    const Mavsdk::MavlinkMessage& message, Connection* connection)
374
{
375
    {
376
        std::lock_guard lock(_received_libmav_messages_mutex);
2,585✔
377
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
2,583✔
378
    }
2,584✔
379
    _received_libmav_messages_cv.notify_one();
2,583✔
380
}
2,584✔
381

382
void MavsdkImpl::process_messages()
32,160✔
383
{
384
    std::lock_guard lock(_received_messages_mutex);
32,160✔
385
    while (!_received_messages.empty()) {
34,573✔
386
        auto message_copied = _received_messages.front();
2,565✔
387
        process_message(message_copied.message, message_copied.connection_ptr);
2,564✔
388
        _received_messages.pop();
2,566✔
389
    }
390
}
31,591✔
391

392
void MavsdkImpl::process_libmav_messages()
31,787✔
393
{
394
    std::lock_guard lock(_received_libmav_messages_mutex);
31,787✔
395
    while (!_received_libmav_messages.empty()) {
34,303✔
396
        auto message_copied = _received_libmav_messages.front();
2,562✔
397
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
2,567✔
398
        _received_libmav_messages.pop();
2,567✔
399
    }
2,567✔
400
}
31,780✔
401

402
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
2,565✔
403
{
404
    // Assumes _received_messages_mutex
405

406
    if (_message_logging_on) {
2,565✔
407
        LogDebug() << "Processing message " << message.msgid << " from "
×
408
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
409
    }
410

411
    if (_should_exit) {
2,565✔
412
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
413
        return;
×
414
    }
415

416
    {
417
        std::lock_guard lock(_mutex);
2,565✔
418

419
        // This is a low level interface where incoming messages can be tampered
420
        // with or even dropped. This happens BEFORE forwarding, so modifications
421
        // and drops affect both local processing AND forwarded messages.
422
        {
423
            bool keep = true;
2,566✔
424
            {
425
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
2,566✔
426
                if (_intercept_incoming_messages_callback != nullptr) {
2,565✔
427
                    keep = _intercept_incoming_messages_callback(message);
252✔
428
                }
429
            }
2,565✔
430

431
            if (!keep) {
2,565✔
432
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
34✔
433
                return;
34✔
434
            }
435
        }
436

437
        if (_should_exit) {
2,531✔
438
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
NEW
439
            return;
×
440
        }
441

442
        /* Forward message (after intercept) if option is enabled and multiple interfaces
443
         * are connected.
444
         * Performs message forwarding checks for every message if message forwarding is
445
         * enabled on at least one connection, and in case of a single forwarding connection,
446
         * we check that it is not the one which received the current message.
447
         *
448
         * Conditions:
449
         * 1. At least 2 connections.
450
         * 2. At least 1 forwarding connection.
451
         * 3. At least 2 forwarding connections or current connection is not forwarding.
452
         */
453

454
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
2,584✔
455
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
54✔
456
             !connection->should_forward_messages())) {
457
            if (_message_logging_on) {
54✔
458
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
459
                           << static_cast<int>(message.sysid) << "/"
×
460
                           << static_cast<int>(message.compid);
×
461
            }
462
            forward_message(message, connection);
54✔
463
        }
464

465
        // Don't ever create a system with sysid 0.
466
        if (message.sysid == 0) {
2,529✔
467
            if (_message_logging_on) {
×
468
                LogDebug() << "Ignoring message with sysid == 0";
×
469
            }
470
            return;
×
471
        }
472

473
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
474
        // is also implementing a ground station and not if it is used in another
475
        // configuration, e.g. on a companion.
476
        //
477
        // This is a workaround because PX4 started forwarding messages between
478
        // mavlink instances which leads to existing implementations (including
479
        // examples and integration tests) to connect to QGroundControl by accident
480
        // instead of PX4 because the check `has_autopilot()` is not used.
481

482
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,529✔
483
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
2,530✔
484
            if (_message_logging_on) {
×
485
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
486
            }
487
            return;
×
488
        }
489

490
        bool found_system = false;
2,530✔
491
        for (auto& system : _systems) {
2,572✔
492
            if (system.first == message.sysid) {
2,438✔
493
                system.second->system_impl()->add_new_component(message.compid);
2,396✔
494
                found_system = true;
2,395✔
495
                break;
2,395✔
496
            }
497
        }
498

499
        if (!found_system) {
2,531✔
500
            if (_system_debugging) {
136✔
501
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
502
                          << (int)message.compid;
×
503
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
504
                          << (int)message.len;
×
505
                std::string bytes = "";
×
506
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
507
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
508
                }
509
                LogWarn() << "Bytes: " << bytes;
×
510
            }
×
511
            make_system_with_component(message.sysid, message.compid);
136✔
512

513
            // We now better talk back.
514
            start_sending_heartbeats();
136✔
515
        }
516

517
        if (_should_exit) {
2,531✔
518
            // Don't try to call at() if systems have already been destroyed
519
            // in destructor.
520
            return;
×
521
        }
522
    }
2,565✔
523

524
    mavlink_message_handler.process_message(message);
2,532✔
525

526
    for (auto& system : _systems) {
2,574✔
527
        if (system.first == message.sysid) {
2,573✔
528
            system.second->system_impl()->process_mavlink_message(message);
2,530✔
529
            break;
2,531✔
530
        }
531
    }
532
}
533

534
void MavsdkImpl::process_libmav_message(
2,566✔
535
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
536
{
537
    // Assumes _received_libmav_messages_mutex
538

539
    if (_message_logging_on) {
2,566✔
540
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
×
541
                   << static_cast<int>(message.system_id) << "/"
×
542
                   << static_cast<int>(message.component_id);
×
543
    }
544

545
    // JSON message interception for incoming messages
546
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
2,566✔
547
        // Message was dropped by interception callback
548
        if (_message_logging_on) {
×
549
            LogDebug() << "Incoming JSON message " << message.message_name
×
550
                       << " dropped by interception";
×
551
        }
552
        return;
×
553
    }
554

555
    if (_message_logging_on) {
2,565✔
556
        LogDebug() << "Processing libmav message " << message.message_name << " from "
×
557
                   << static_cast<int>(message.system_id) << "/"
×
558
                   << static_cast<int>(message.component_id);
×
559
    }
560

561
    if (_should_exit) {
2,565✔
562
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
563
        return;
×
564
    }
565

566
    {
567
        std::lock_guard lock(_mutex);
2,566✔
568

569
        // Don't ever create a system with sysid 0.
570
        if (message.system_id == 0) {
2,564✔
571
            if (_message_logging_on) {
×
572
                LogDebug() << "Ignoring libmav message with sysid == 0";
×
573
            }
574
            return;
×
575
        }
576

577
        // Filter out QGroundControl messages similar to regular mavlink processing
578
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,564✔
579
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
2,563✔
580
            if (_message_logging_on) {
×
581
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
×
582
            }
583
            return;
×
584
        }
585

586
        bool found_system = false;
2,563✔
587
        for (auto& system : _systems) {
2,606✔
588
            if (system.first == message.system_id) {
2,602✔
589
                system.second->system_impl()->add_new_component(message.component_id);
2,559✔
590
                found_system = true;
2,559✔
591
                break;
2,559✔
592
            }
593
        }
594

595
        if (!found_system) {
2,564✔
596
            if (_system_debugging) {
6✔
597
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
×
598
                          << "/" << (int)message.component_id;
×
599
            }
600
            make_system_with_component(message.system_id, message.component_id);
6✔
601

602
            // We now better talk back.
603
            start_sending_heartbeats();
6✔
604
        }
605

606
        if (_should_exit) {
2,564✔
607
            // Don't try to call at() if systems have already been destroyed
608
            // in destructor.
609
            return;
×
610
        }
611
    }
2,563✔
612

613
    // Distribute libmav message to systems for libmav-specific handling
614
    bool found_system = false;
2,566✔
615
    for (auto& system : _systems) {
5,207✔
616
        if (system.first == message.system_id) {
2,644✔
617
            if (_message_logging_on) {
2,567✔
618
                LogDebug() << "Distributing libmav message " << message.message_name
×
619
                           << " to SystemImpl for system " << system.first;
×
620
            }
621
            system.second->system_impl()->process_libmav_message(message);
2,567✔
622
            found_system = true;
2,564✔
623
            // Don't break - distribute to all matching system instances
624
        }
625
    }
626

627
    if (!found_system) {
2,568✔
628
        LogWarn() << "No system found for libmav message " << message.message_name
×
629
                  << " from system " << message.system_id;
×
630
    }
631
}
632

633
bool MavsdkImpl::send_message(mavlink_message_t& message)
2,711✔
634
{
635
    // Create a copy of the message to avoid reference issues
636
    mavlink_message_t message_copy = message;
2,711✔
637

638
    {
639
        std::lock_guard lock(_messages_to_send_mutex);
2,711✔
640
        _messages_to_send.push(std::move(message_copy));
2,712✔
641
    }
2,714✔
642

643
    // For heartbeat messages, we want to process them immediately to speed up system discovery
644
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
2,712✔
645
        // Trigger message processing in the work thread
646
        // This is a hint to process messages sooner, but doesn't block
647
        std::this_thread::yield();
463✔
648
    }
649

650
    return true;
2,712✔
651
}
652

653
void MavsdkImpl::deliver_messages()
34,812✔
654
{
655
    // Process messages one at a time to avoid holding the mutex while delivering
656
    while (true) {
657
        mavlink_message_t message;
658
        {
659
            std::lock_guard lock(_messages_to_send_mutex);
34,812✔
660
            if (_messages_to_send.empty()) {
34,423✔
661
                break;
31,693✔
662
            }
663
            message = _messages_to_send.front();
2,709✔
664
            _messages_to_send.pop();
2,709✔
665
        }
34,405✔
666
        deliver_message(message);
2,709✔
667
    }
2,714✔
668
}
31,668✔
669

670
void MavsdkImpl::deliver_message(mavlink_message_t& message)
2,713✔
671
{
672
    if (_message_logging_on) {
2,713✔
673
        LogDebug() << "Sending message " << message.msgid << " from "
×
674
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
675
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
676
                   << static_cast<int>(get_target_component_id(message));
×
677
    }
678

679
    // This is a low level interface where outgoing messages can be tampered
680
    // with or even dropped.
681
    bool keep = true;
2,713✔
682
    {
683
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
2,713✔
684
        if (_intercept_outgoing_messages_callback != nullptr) {
2,712✔
685
            keep = _intercept_outgoing_messages_callback(message);
228✔
686
        }
687
    }
2,708✔
688

689
    if (!keep) {
2,712✔
690
        // We fake that everything was sent as instructed because
691
        // a potential loss would happen later, and we would not be informed
692
        // about it.
693
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
86✔
694
        return;
169✔
695
    }
696

697
    // JSON message interception for outgoing messages
698
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
699
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
700
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);
2,626✔
701

702
    size_t bytes_consumed = 0;
2,628✔
703
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);
2,628✔
704

705
    if (libmav_msg_opt) {
2,625✔
706
        // Create Mavsdk::MavlinkMessage directly for JSON interception
707
        Mavsdk::MavlinkMessage json_message;
2,627✔
708
        json_message.message_name = libmav_msg_opt.value().name();
2,627✔
709
        json_message.system_id = message.sysid;
2,623✔
710
        json_message.component_id = message.compid;
2,623✔
711

712
        // Extract target_system and target_component if present
713
        uint8_t target_system_id = 0;
2,623✔
714
        uint8_t target_component_id = 0;
2,623✔
715
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
2,623✔
716
            mav::MessageResult::Success) {
717
            json_message.target_system_id = target_system_id;
1,908✔
718
        } else {
719
            json_message.target_system_id = 0;
717✔
720
        }
721
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
2,625✔
722
            mav::MessageResult::Success) {
723
            json_message.target_component_id = target_component_id;
1,907✔
724
        } else {
725
            json_message.target_component_id = 0;
718✔
726
        }
727

728
        // Generate JSON using LibmavReceiver's public method
729
        auto connections = get_connections();
2,625✔
730
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
2,627✔
731
            json_message.fields_json =
732
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
7,628✔
733
                    libmav_msg_opt.value());
5,086✔
734
        } else {
735
            // Fallback: create minimal JSON if no receiver available
736
            json_message.fields_json =
737
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
166✔
738
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
166✔
739
        }
740

741
        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
2,626✔
742
            // Message was dropped by JSON interception callback
UNCOV
743
            if (_message_logging_on) {
×
744
                LogDebug() << "Outgoing JSON message " << json_message.message_name
×
745
                           << " dropped by interception";
×
746
            }
747
            return;
×
748
        }
749
    }
2,625✔
750

751
    std::lock_guard lock(_mutex);
2,624✔
752

753
    if (_connections.empty()) {
2,622✔
754
        // We obviously can't send any messages without a connection added, so
755
        // we silently ignore this.
756
        return;
83✔
757
    }
758

759
    uint8_t successful_emissions = 0;
2,539✔
760
    for (auto& _connection : _connections) {
5,108✔
761
        const uint8_t target_system_id = get_target_system_id(message);
2,567✔
762

763
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
2,568✔
764
            continue;
10✔
765
        }
766
        const auto result = (*_connection.connection).send_message(message);
2,556✔
767
        if (result.first) {
2,561✔
768
            successful_emissions++;
2,555✔
769
        } else {
770
            _connections_errors_subscriptions.queue(
12✔
771
                Mavsdk::ConnectionError{result.second, _connection.handle},
6✔
772
                [this](const auto& func) { call_user_callback(func); });
×
773
        }
774
    }
2,561✔
775

776
    if (successful_emissions == 0) {
2,545✔
777
        LogErr() << "Sending message failed";
3✔
778
    }
779
}
2,627✔
780

781
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
144✔
782
    const std::string& connection_url, ForwardingOption forwarding_option)
783
{
784
    CliArg cli_arg;
144✔
785
    if (!cli_arg.parse(connection_url)) {
144✔
786
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
787
    }
788

789
    return std::visit(
144✔
790
        overloaded{
288✔
791
            [](std::monostate) {
×
792
                // Should not happen anyway.
793
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
794
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
795
            },
796
            [this, forwarding_option](const CliArg::Udp& udp) {
134✔
797
                return add_udp_connection(udp, forwarding_option);
134✔
798
            },
799
            [this, forwarding_option](const CliArg::Tcp& tcp) {
8✔
800
                return add_tcp_connection(tcp, forwarding_option);
8✔
801
            },
802
            [this, forwarding_option](const CliArg::Serial& serial) {
×
803
                return add_serial_connection(
×
804
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
805
            },
806
            [this, forwarding_option](const CliArg::Raw&) {
2✔
807
                return add_raw_connection(forwarding_option);
2✔
808
            }},
809
        cli_arg.protocol);
144✔
810
}
144✔
811

812
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
813
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
134✔
814
{
815
    auto new_conn = std::make_unique<UdpConnection>(
816
        [this](
2,516✔
817
            MavlinkReceiver::ParseResult result,
818
            mavlink_message_t& message,
819
            Connection* connection) { receive_message(result, message, connection); },
2,516✔
820
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
2,513✔
821
            receive_libmav_message(message, connection);
2,513✔
822
        },
2,515✔
823
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
824
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
268✔
825
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
134✔
826
        forwarding_option);
134✔
827

828
    if (!new_conn) {
134✔
829
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
830
    }
831

832
    ConnectionResult ret = new_conn->start();
134✔
833

834
    if (ret != ConnectionResult::Success) {
134✔
835
        return {ret, Mavsdk::ConnectionHandle{}};
×
836
    }
837

838
    if (udp.mode == CliArg::Udp::Mode::Out) {
134✔
839
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
840
        // one for the IP, and one for a hostname.
841
        auto remote_ip = resolve_hostname_to_ip(udp.host);
67✔
842

843
        if (!remote_ip) {
67✔
844
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
845
        }
846

847
        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
67✔
848
        std::lock_guard lock(_mutex);
67✔
849

850
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
851
        auto new_configuration = get_configuration();
67✔
852
        new_configuration.set_always_send_heartbeats(true);
67✔
853
        set_configuration(new_configuration);
67✔
854
    }
67✔
855

856
    auto handle = add_connection(std::move(new_conn));
134✔
857

858
    return {ConnectionResult::Success, handle};
134✔
859
}
134✔
860

861
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
862
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
8✔
863
{
864
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
8✔
865
        auto new_conn = std::make_unique<TcpClientConnection>(
866
            [this](
38✔
867
                MavlinkReceiver::ParseResult result,
868
                mavlink_message_t& message,
869
                Connection* connection) { receive_message(result, message, connection); },
38✔
870
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
38✔
871
                receive_libmav_message(message, connection);
38✔
872
            },
38✔
873
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
874
            tcp.host,
4✔
875
            tcp.port,
4✔
876
            forwarding_option);
4✔
877
        if (!new_conn) {
4✔
878
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
879
        }
880
        ConnectionResult ret = new_conn->start();
4✔
881
        if (ret == ConnectionResult::Success) {
4✔
882
            return {ret, add_connection(std::move(new_conn))};
4✔
883
        } else {
884
            return {ret, Mavsdk::ConnectionHandle{}};
×
885
        }
886
    } else {
4✔
887
        auto new_conn = std::make_unique<TcpServerConnection>(
888
            [this](
31✔
889
                MavlinkReceiver::ParseResult result,
890
                mavlink_message_t& message,
891
                Connection* connection) { receive_message(result, message, connection); },
31✔
892
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
31✔
893
                receive_libmav_message(message, connection);
31✔
894
            },
31✔
895
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
896
            tcp.host,
4✔
897
            tcp.port,
4✔
898
            forwarding_option);
4✔
899
        if (!new_conn) {
4✔
900
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
901
        }
902
        ConnectionResult ret = new_conn->start();
4✔
903
        if (ret == ConnectionResult::Success) {
4✔
904
            return {ret, add_connection(std::move(new_conn))};
4✔
905
        } else {
906
            return {ret, Mavsdk::ConnectionHandle{}};
×
907
        }
908
    }
4✔
909
}
910

911
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
912
    const std::string& dev_path,
913
    int baudrate,
914
    bool flow_control,
915
    ForwardingOption forwarding_option)
916
{
917
    auto new_conn = std::make_unique<SerialConnection>(
918
        [this](
×
919
            MavlinkReceiver::ParseResult result,
920
            mavlink_message_t& message,
921
            Connection* connection) { receive_message(result, message, connection); },
×
922
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
×
923
            receive_libmav_message(message, connection);
×
924
        },
×
925
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
926
        dev_path,
927
        baudrate,
928
        flow_control,
929
        forwarding_option);
×
930
    if (!new_conn) {
×
931
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
932
    }
933
    ConnectionResult ret = new_conn->start();
×
934
    if (ret == ConnectionResult::Success) {
×
935
        auto handle = add_connection(std::move(new_conn));
×
936

937
        auto new_configuration = get_configuration();
×
938

939
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
940
        // to initiate the MAVLink connection by sending heartbeats.
941
        // Therefore, we override the default here and enable sending heartbeats.
942
        new_configuration.set_always_send_heartbeats(true);
×
943
        set_configuration(new_configuration);
×
944

945
        return {ret, handle};
×
946

947
    } else {
948
        return {ret, Mavsdk::ConnectionHandle{}};
×
949
    }
950
}
×
951

952
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
953
MavsdkImpl::add_raw_connection(ForwardingOption forwarding_option)
2✔
954
{
955
    // Check if a raw connection already exists
956
    if (find_raw_connection() != nullptr) {
2✔
957
        LogErr() << "Raw connection already exists. Only one raw connection is allowed.";
×
958
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
959
    }
960

961
    auto new_conn = std::make_unique<RawConnection>(
962
        [this](
1✔
963
            MavlinkReceiver::ParseResult result,
964
            mavlink_message_t& message,
965
            Connection* connection) { receive_message(result, message, connection); },
1✔
966
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
1✔
967
            receive_libmav_message(message, connection);
1✔
968
        },
1✔
969
        *this,
970
        forwarding_option);
2✔
971

972
    if (!new_conn) {
2✔
973
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
974
    }
975

976
    ConnectionResult ret = new_conn->start();
2✔
977
    if (ret != ConnectionResult::Success) {
2✔
978
        return {ret, Mavsdk::ConnectionHandle{}};
×
979
    }
980

981
    auto handle = add_connection(std::move(new_conn));
2✔
982

983
    // Enable heartbeats for raw connection
984
    auto new_configuration = get_configuration();
2✔
985
    new_configuration.set_always_send_heartbeats(true);
2✔
986
    set_configuration(new_configuration);
2✔
987

988
    return {ConnectionResult::Success, handle};
2✔
989
}
2✔
990

991
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
144✔
992
{
993
    std::lock_guard lock(_mutex);
144✔
994
    auto handle = _connections_handle_factory.create();
144✔
995
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
144✔
996

997
    return handle;
288✔
998
}
144✔
999

1000
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
12✔
1001
{
1002
    std::lock_guard lock(_mutex);
12✔
1003

1004
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
12✔
1005
        return (entry.handle == handle);
12✔
1006
    }));
1007
}
12✔
1008

1009
Mavsdk::Configuration MavsdkImpl::get_configuration() const
69✔
1010
{
1011
    std::lock_guard configuration_lock(_mutex);
69✔
1012
    return _configuration;
138✔
1013
}
69✔
1014

1015
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
207✔
1016
{
1017
    std::lock_guard server_components_lock(_server_components_mutex);
207✔
1018
    // We just point the default to the newly created component. This means
1019
    // that the previous default component will be deleted if it is not
1020
    // used/referenced anywhere.
1021
    _default_server_component =
1022
        server_component_by_id_with_lock(new_configuration.get_component_id());
207✔
1023

1024
    if (new_configuration.get_always_send_heartbeats() &&
345✔
1025
        !_configuration.get_always_send_heartbeats()) {
138✔
1026
        start_sending_heartbeats();
73✔
1027
    } else if (
134✔
1028
        !new_configuration.get_always_send_heartbeats() &&
203✔
1029
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
203✔
1030
        stop_sending_heartbeats();
×
1031
    }
1032

1033
    _configuration = new_configuration;
207✔
1034
    // We cache these values as atomic to avoid having to lock any mutex for them.
1035
    _our_system_id = new_configuration.get_system_id();
207✔
1036
    _our_component_id = new_configuration.get_component_id();
207✔
1037
}
207✔
1038

1039
uint8_t MavsdkImpl::get_own_system_id() const
6,043✔
1040
{
1041
    return _our_system_id;
6,043✔
1042
}
1043

1044
uint8_t MavsdkImpl::get_own_component_id() const
1,481✔
1045
{
1046
    return _our_component_id;
1,481✔
1047
}
1048

1049
uint8_t MavsdkImpl::channel() const
×
1050
{
1051
    // TODO
1052
    return 0;
×
1053
}
1054

1055
Autopilot MavsdkImpl::autopilot() const
×
1056
{
1057
    // TODO
1058
    return Autopilot::Px4;
×
1059
}
1060

1061
// FIXME: this should be per component
1062
uint8_t MavsdkImpl::get_mav_type() const
460✔
1063
{
1064
    return _configuration.get_mav_type();
460✔
1065
}
1066

1067
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
142✔
1068
{
1069
    // Needs _systems_lock
1070

1071
    if (_should_exit) {
142✔
1072
        // When the system got destroyed in the destructor, we have to give up.
1073
        return;
×
1074
    }
1075

1076
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
142✔
1077
        LogDebug() << "Initializing connection to remote system...";
×
1078
    } else {
1079
        LogDebug() << "New system ID: " << static_cast<int>(system_id)
284✔
1080
                   << " Comp ID: " << static_cast<int>(comp_id);
142✔
1081
    }
1082

1083
    // Make a system with its first component
1084
    auto new_system = std::make_shared<System>(*this);
142✔
1085
    new_system->init(system_id, comp_id);
142✔
1086

1087
    _systems.emplace_back(system_id, new_system);
142✔
1088
}
142✔
1089

1090
void MavsdkImpl::notify_on_discover()
146✔
1091
{
1092
    // Queue the callbacks without holding the mutex to avoid deadlocks
1093
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
211✔
1094
}
146✔
1095

1096
void MavsdkImpl::notify_on_timeout()
5✔
1097
{
1098
    // Queue the callbacks without holding the mutex to avoid deadlocks
1099
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
5✔
1100
}
5✔
1101

1102
Mavsdk::NewSystemHandle
1103
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
64✔
1104
{
1105
    std::lock_guard lock(_mutex);
64✔
1106

1107
    const auto handle = _new_system_callbacks.subscribe(callback);
64✔
1108

1109
    if (is_any_system_connected()) {
64✔
1110
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1111
    }
1112

1113
    return handle;
128✔
1114
}
64✔
1115

1116
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
63✔
1117
{
1118
    _new_system_callbacks.unsubscribe(handle);
63✔
1119
}
63✔
1120

1121
bool MavsdkImpl::is_any_system_connected() const
64✔
1122
{
1123
    std::vector<std::shared_ptr<System>> connected_systems = systems();
64✔
1124
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
64✔
1125
        return system->is_connected();
×
1126
    });
64✔
1127
}
64✔
1128

1129
void MavsdkImpl::work_thread()
138✔
1130
{
1131
    while (!_should_exit) {
32,088✔
1132
        // Process incoming messages
1133
        process_messages();
31,540✔
1134

1135
        // Process incoming libmav messages
1136
        process_libmav_messages();
31,579✔
1137

1138
        // Run timers
1139
        timeout_handler.run_once();
31,550✔
1140
        call_every_handler.run_once();
31,894✔
1141

1142
        // Do component work
1143
        {
1144
            std::lock_guard lock(_server_components_mutex);
31,917✔
1145
            for (auto& it : _server_components) {
63,678✔
1146
                if (it.second != nullptr) {
32,163✔
1147
                    it.second->_impl->do_work();
31,742✔
1148
                }
1149
            }
1150
        }
31,535✔
1151

1152
        // Deliver outgoing messages
1153
        deliver_messages();
31,672✔
1154

1155
        // If no messages to send, check if there are messages to receive
1156
        std::unique_lock lock_received(_received_messages_mutex);
31,752✔
1157
        if (_received_messages.empty()) {
31,778✔
1158
            // No messages to process, wait for a signal or timeout
1159
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
31,441✔
1160
                return !_received_messages.empty() || _should_exit;
63,673✔
1161
            });
1162
        }
1163
    }
32,093✔
1164
}
187✔
1165

1166
void MavsdkImpl::call_user_callback_located(
1,239✔
1167
    const std::string& filename, const int linenumber, const std::function<void()>& func)
1168
{
1169
    // Don't enqueue callbacks if we're shutting down
1170
    if (_should_exit) {
1,239✔
1171
        return;
×
1172
    }
1173

1174
    auto callback_size = _user_callback_queue.size();
1,239✔
1175

1176
    if (_callback_tracker) {
1,239✔
1177
        _callback_tracker->record_queued(filename, linenumber);
×
1178
        _callback_tracker->maybe_print_stats(callback_size);
×
1179
    }
1180

1181
    if (callback_size >= 100) {
1,239✔
1182
        return;
×
1183

1184
    } else if (callback_size == 99) {
1,239✔
1185
        LogErr()
×
1186
            << "User callback queue overflown\n"
1187
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1188
        return;
×
1189

1190
    } else if (callback_size >= 10) {
1,239✔
1191
        LogWarn()
×
1192
            << "User callback queue slow (queue size: " << callback_size
×
1193
            << ").\n"
1194
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1195
    }
1196

1197
    // We only need to keep track of filename and linenumber if we're actually debugging this.
1198
    UserCallback user_callback =
1199
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,478✔
1200

1201
    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
1,239✔
1202
}
1,239✔
1203

1204
void MavsdkImpl::process_user_callbacks_thread()
138✔
1205
{
1206
    while (!_should_exit) {
1,515✔
1207
        UserCallback callback;
1,377✔
1208
        {
1209
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
1,377✔
1210
            auto ptr = guard.wait_and_pop_front();
1,377✔
1211
            if (!ptr) {
1,377✔
1212
                continue;
138✔
1213
            }
1214
            // We need to get a copy instead of just a shared_ptr because the queue might
1215
            // be invalidated when the lock is released.
1216
            callback = *ptr;
1,239✔
1217
        }
1,515✔
1218

1219
        // Check if we're in the process of shutting down before executing the callback
1220
        if (_should_exit) {
1,239✔
1221
            continue;
×
1222
        }
1223

1224
        const double timeout_s = 1.0;
1,239✔
1225
        auto cookie = timeout_handler.add(
1,239✔
1226
            [&]() {
×
1227
                if (_callback_debugging) {
×
1228
                    LogWarn() << "Callback called from " << callback.filename << ":"
×
1229
                              << callback.linenumber << " took more than " << timeout_s
×
1230
                              << " second to run.";
×
1231
                    fflush(stdout);
×
1232
                    fflush(stderr);
×
1233
                    abort();
×
1234
                } else {
1235
                    LogWarn()
×
1236
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
1237
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1238
                }
1239
            },
×
1240
            timeout_s);
1241
        auto callback_start = std::chrono::steady_clock::now();
1,239✔
1242
        callback.func();
1,239✔
1243
        auto callback_end = std::chrono::steady_clock::now();
1,239✔
1244
        timeout_handler.remove(cookie);
1,239✔
1245

1246
        if (_callback_tracker) {
1,239✔
1247
            auto callback_duration_us =
1248
                std::chrono::duration_cast<std::chrono::microseconds>(callback_end - callback_start)
×
1249
                    .count();
×
1250
            _callback_tracker->record_executed(
×
1251
                callback.filename, callback.linenumber, callback_duration_us);
1252
        }
1253
    }
1,377✔
1254
}
138✔
1255

1256
void MavsdkImpl::start_sending_heartbeats()
361✔
1257
{
1258
    // Check if we're in the process of shutting down
1259
    if (_should_exit) {
361✔
1260
        return;
×
1261
    }
1262

1263
    // Before sending out first heartbeats we need to make sure we have a
1264
    // default server component.
1265
    default_server_component_impl();
361✔
1266

1267
    {
1268
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
361✔
1269
        call_every_handler.remove(_heartbeat_send_cookie);
361✔
1270
        _heartbeat_send_cookie =
361✔
1271
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
818✔
1272
    }
361✔
1273
}
1274

1275
void MavsdkImpl::stop_sending_heartbeats()
5✔
1276
{
1277
    if (!_configuration.get_always_send_heartbeats()) {
5✔
1278
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
1✔
1279
        call_every_handler.remove(_heartbeat_send_cookie);
1✔
1280
    }
1✔
1281
}
5✔
1282

1283
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,416✔
1284
{
1285
    std::lock_guard lock(_server_components_mutex);
1,416✔
1286
    return default_server_component_with_lock();
1,416✔
1287
}
1,416✔
1288

1289
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,416✔
1290
{
1291
    if (_default_server_component == nullptr) {
1,416✔
1292
        _default_server_component = server_component_by_id_with_lock(_our_component_id);
×
1293
    }
1294
    return *_default_server_component->_impl;
1,416✔
1295
}
1296

1297
void MavsdkImpl::send_heartbeats()
457✔
1298
{
1299
    std::lock_guard lock(_server_components_mutex);
457✔
1300

1301
    for (auto& it : _server_components) {
915✔
1302
        if (it.second != nullptr) {
458✔
1303
            it.second->_impl->send_heartbeat();
457✔
1304
        }
1305
    }
1306
}
454✔
1307

1308
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
24✔
1309
{
1310
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
24✔
1311
    _intercept_incoming_messages_callback = callback;
24✔
1312
}
24✔
1313

1314
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
14✔
1315
{
1316
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
14✔
1317
    _intercept_outgoing_messages_callback = callback;
14✔
1318
}
14✔
1319

1320
bool MavsdkImpl::call_json_interception_callbacks(
5,192✔
1321
    const Mavsdk::MavlinkMessage& json_message,
1322
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
1323
        callback_list)
1324
{
1325
    bool keep_message = true;
5,192✔
1326

1327
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
5,192✔
1328
    for (const auto& subscription : callback_list) {
5,194✔
1329
        if (!subscription.second(json_message)) {
2✔
1330
            keep_message = false;
×
1331
        }
1332
    }
1333

1334
    return keep_message;
10,374✔
1335
}
5,183✔
1336

1337
Mavsdk::InterceptJsonHandle
1338
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1339
{
1340
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1341
    auto handle = _json_handle_factory.create();
1✔
1342
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1343
    return handle;
2✔
1344
}
1✔
1345

1346
void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1347
{
1348
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1349
    auto it = std::find_if(
1✔
1350
        _incoming_json_message_subscriptions.begin(),
1351
        _incoming_json_message_subscriptions.end(),
1352
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1353
    if (it != _incoming_json_message_subscriptions.end()) {
1✔
1354
        _incoming_json_message_subscriptions.erase(it);
1✔
1355
    }
1356
}
1✔
1357

1358
Mavsdk::InterceptJsonHandle
1359
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1360
{
1361
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1362
    auto handle = _json_handle_factory.create();
1✔
1363
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1364
    return handle;
2✔
1365
}
1✔
1366

1367
void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1368
{
1369
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1370
    auto it = std::find_if(
1✔
1371
        _outgoing_json_message_subscriptions.begin(),
1372
        _outgoing_json_message_subscriptions.end(),
1373
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1374
    if (it != _outgoing_json_message_subscriptions.end()) {
1✔
1375
        _outgoing_json_message_subscriptions.erase(it);
1✔
1376
    }
1377
}
1✔
1378

1379
RawConnection* MavsdkImpl::find_raw_connection()
4✔
1380
{
1381
    std::lock_guard lock(_mutex);
4✔
1382

1383
    for (auto& entry : _connections) {
4✔
1384
        auto* raw_conn = dynamic_cast<RawConnection*>(entry.connection.get());
2✔
1385
        if (raw_conn != nullptr) {
2✔
1386
            return raw_conn;
2✔
1387
        }
1388
    }
1389
    return nullptr;
2✔
1390
}
4✔
1391

1392
void MavsdkImpl::pass_received_raw_bytes(const char* bytes, size_t length)
1✔
1393
{
1394
    auto* raw_conn = find_raw_connection();
1✔
1395
    if (raw_conn == nullptr) {
1✔
1396
        LogErr()
×
1397
            << "No raw connection available. Please add one using add_any_connection(\"raw://\")";
×
1398
        return;
×
1399
    }
1400

1401
    raw_conn->receive(bytes, length);
1✔
1402
}
1403

1404
Mavsdk::RawBytesHandle
1405
MavsdkImpl::subscribe_raw_bytes_to_be_sent(const Mavsdk::RawBytesCallback& callback)
1✔
1406
{
1407
    if (find_raw_connection() == nullptr) {
1✔
1408
        LogWarn() << "No raw connection available. Subscription will only receive bytes after you "
×
1409
                     "add a connection using add_any_connection(\"raw://\")";
×
1410
    }
1411
    return _raw_bytes_subscriptions.subscribe(callback);
1✔
1412
}
1413

1414
void MavsdkImpl::unsubscribe_raw_bytes_to_be_sent(Mavsdk::RawBytesHandle handle)
1✔
1415
{
1416
    _raw_bytes_subscriptions.unsubscribe(handle);
1✔
1417
}
1✔
1418

1419
bool MavsdkImpl::notify_raw_bytes_sent(const char* bytes, size_t length)
3✔
1420
{
1421
    if (_raw_bytes_subscriptions.empty()) {
3✔
1422
        return false;
2✔
1423
    }
1424

1425
    _raw_bytes_subscriptions(bytes, length);
1✔
1426

1427
    return true;
1✔
1428
}
1429

1430
Mavsdk::ConnectionErrorHandle
1431
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1432
{
1433
    std::lock_guard lock(_mutex);
×
1434

1435
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1436

1437
    return handle;
×
1438
}
×
1439

1440
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1441
{
1442
    std::lock_guard lock(_mutex);
×
1443
    _connections_errors_subscriptions.unsubscribe(handle);
×
1444
}
×
1445

1446
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
2,627✔
1447
{
1448
    // Checks whether connection knows target system ID by extracting target system if set.
1449
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
2,627✔
1450

1451
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
2,626✔
1452
        return 0;
685✔
1453
    }
1454

1455
    // Don't look at the target system offset if it is outside the payload length.
1456
    // This can happen if the fields are trimmed.
1457
    if (meta->target_system_ofs >= message.len) {
1,941✔
1458
        return 0;
32✔
1459
    }
1460

1461
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
1,909✔
1462
}
1463

1464
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
58✔
1465
{
1466
    // Checks whether connection knows target system ID by extracting target system if set.
1467
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
58✔
1468

1469
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
58✔
1470
        return 0;
32✔
1471
    }
1472

1473
    // Don't look at the target component offset if it is outside the payload length.
1474
    // This can happen if the fields are trimmed.
1475
    if (meta->target_component_ofs >= message.len) {
26✔
1476
        return 0;
2✔
1477
    }
1478

1479
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
24✔
1480
}
1481

1482
Sender& MavsdkImpl::sender()
×
1483
{
1484
    std::lock_guard lock(_server_components_mutex);
×
1485
    return default_server_component_with_lock().sender();
×
1486
}
×
1487

1488
std::vector<Connection*> MavsdkImpl::get_connections() const
2,628✔
1489
{
1490
    std::lock_guard lock(_mutex);
2,628✔
1491
    std::vector<Connection*> connections;
2,626✔
1492
    for (const auto& connection_entry : _connections) {
5,194✔
1493
        connections.push_back(connection_entry.connection.get());
2,569✔
1494
    }
1495
    return connections;
2,619✔
1496
}
2,619✔
1497

1498
mav::MessageSet& MavsdkImpl::get_message_set() const
23✔
1499
{
1500
    // Note: This returns a reference to MessageSet without locking.
1501
    // Thread safety for MessageSet operations must be ensured by:
1502
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
1503
    // 2. libmav MessageSet should be internally thread-safe for read operations
1504
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
1505
    return *_message_set;
23✔
1506
}
1507

1508
bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
6✔
1509
{
1510
    std::lock_guard<std::mutex> lock(_message_set_mutex);
6✔
1511
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
6✔
1512
    return result == ::mav::MessageSetResult::Success;
12✔
1513
}
6✔
1514

1515
// Thread-safe MessageSet read operations
1516
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
×
1517
{
1518
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1519
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
×
1520
    if (message_def) {
×
1521
        return message_def.get().name();
×
1522
    }
1523
    return std::nullopt;
×
1524
}
×
1525

1526
std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
×
1527
{
1528
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1529
    return _message_set->idForMessage(name);
×
1530
}
×
1531

1532
std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
×
1533
{
1534
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1535
    return _message_set->create(message_name);
×
1536
}
×
1537

1538
// Thread-safe parsing for LibmavReceiver
1539
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
5,210✔
1540
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
1541
{
1542
    std::lock_guard<std::mutex> lock(_message_set_mutex);
5,210✔
1543
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
5,213✔
1544
}
5,208✔
1545

1546
mav::OptionalReference<const mav::MessageDefinition>
1547
MavsdkImpl::get_message_definition_safe(int message_id) const
5,128✔
1548
{
1549
    std::lock_guard<std::mutex> lock(_message_set_mutex);
5,128✔
1550
    return _message_set->getMessageDefinition(message_id);
5,124✔
1551
}
5,128✔
1552

1553
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc