• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 19215993270

09 Nov 2025 11:20PM UTC coverage: 48.296% (-0.03%) from 48.329%
19215993270

push

github

web-flow
Merge pull request #2708 from mavlink/pr-mavsdk-server-usage

Fix connection usage for mavsdk_server

17630 of 36504 relevant lines covered (48.3%)

464.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.24
/src/mavsdk/core/mavsdk_impl.cpp
1
#include "mavsdk_impl.h"
2

3
#include <algorithm>
4
#include <mutex>
5
#include <tcp_server_connection.h>
6

7
#include "connection.h"
8
#include "log.h"
9
#include "tcp_client_connection.h"
10
#include "tcp_server_connection.h"
11
#include "udp_connection.h"
12
#include "raw_connection.h"
13
#include "system.h"
14
#include "system_impl.h"
15
#include "serial_connection.h"
16
#include "version.h"
17
#include "server_component_impl.h"
18
#include "overloaded.h"
19
#include "mavlink_channels.h"
20
#include "callback_list.tpp"
21
#include "hostname_to_ip.h"
22
#include "embedded_mavlink_xml.h"
23
#include <mav/MessageSet.h>
24

25
namespace mavsdk {
26

27
template class CallbackList<>;
28

29
MavsdkImpl::MavsdkImpl(const Mavsdk::Configuration& configuration) :
134✔
30
    timeout_handler(time),
134✔
31
    call_every_handler(time)
268✔
32
{
33
    LogInfo() << "MAVSDK version: " << mavsdk_version;
134✔
34

35
    if (const char* env_p = std::getenv("MAVSDK_CALLBACK_DEBUGGING")) {
134✔
36
        if (std::string(env_p) == "1") {
×
37
            LogDebug() << "Callback debugging is on.";
×
38
            _callback_debugging = true;
×
39
            _callback_tracker = std::make_unique<CallbackTracker>();
×
40
        }
41
    }
42

43
    if (const char* env_p = std::getenv("MAVSDK_MESSAGE_DEBUGGING")) {
134✔
44
        if (std::string(env_p) == "1") {
×
45
            LogDebug() << "Message debugging is on.";
×
46
            _message_logging_on = true;
×
47
        }
48
    }
49

50
    if (const char* env_p = std::getenv("MAVSDK_SYSTEM_DEBUGGING")) {
134✔
51
        if (std::string(env_p) == "1") {
×
52
            LogDebug() << "System debugging is on.";
×
53
            _system_debugging = true;
×
54
        }
55
    }
56

57
    set_configuration(configuration);
134✔
58

59
    // Initialize MessageSet with embedded XML content in dependency order
60
    // This happens at startup before any connections are created, so no synchronization needed
61
    _message_set = std::make_unique<mav::MessageSet>();
134✔
62
    _message_set->addFromXMLString(mav_embedded::MINIMAL_XML);
134✔
63
    _message_set->addFromXMLString(mav_embedded::STANDARD_XML);
134✔
64
    _message_set->addFromXMLString(mav_embedded::COMMON_XML);
134✔
65
    _message_set->addFromXMLString(mav_embedded::ARDUPILOTMEGA_XML);
134✔
66

67
    // Initialize BufferParser for thread-safe parsing
68
    _buffer_parser = std::make_unique<mav::BufferParser>(*_message_set);
134✔
69

70
    // Start the user callback thread first, so it is ready for anything generated by
71
    // the work thread.
72

73
    _process_user_callbacks_thread =
268✔
74
        new std::thread(&MavsdkImpl::process_user_callbacks_thread, this);
134✔
75

76
    _work_thread = new std::thread(&MavsdkImpl::work_thread, this);
134✔
77
}
134✔
78

79
MavsdkImpl::~MavsdkImpl()
134✔
80
{
81
    {
82
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
134✔
83
        call_every_handler.remove(_heartbeat_send_cookie);
134✔
84
    }
134✔
85

86
    _should_exit = true;
134✔
87

88
    // Stop work first because we don't want to trigger anything that would
89
    // potentially want to call into user code.
90

91
    if (_work_thread != nullptr) {
134✔
92
        _work_thread->join();
134✔
93
        delete _work_thread;
134✔
94
        _work_thread = nullptr;
134✔
95
    }
96

97
    if (_process_user_callbacks_thread != nullptr) {
134✔
98
        _user_callback_queue.stop();
134✔
99
        _process_user_callbacks_thread->join();
134✔
100
        delete _process_user_callbacks_thread;
134✔
101
        _process_user_callbacks_thread = nullptr;
134✔
102
    }
103

104
    std::lock_guard lock(_mutex);
134✔
105

106
    _systems.clear();
134✔
107
    _connections.clear();
134✔
108
}
134✔
109

110
std::string MavsdkImpl::version()
1✔
111
{
112
    static unsigned version_counter = 0;
113

114
    ++version_counter;
1✔
115

116
    switch (version_counter) {
1✔
117
        case 10:
×
118
            return "You were wondering about the name of this library?";
×
119
        case 11:
×
120
            return "Let's look at the history:";
×
121
        case 12:
×
122
            return "DroneLink";
×
123
        case 13:
×
124
            return "DroneCore";
×
125
        case 14:
×
126
            return "DronecodeSDK";
×
127
        case 15:
×
128
            return "MAVSDK";
×
129
        case 16:
×
130
            return "And that's it...";
×
131
        case 17:
×
132
            return "At least for now ¯\\_(ツ)_/¯.";
×
133
        default:
1✔
134
            return mavsdk_version;
1✔
135
    }
136
}
137

138
std::vector<std::shared_ptr<System>> MavsdkImpl::systems() const
173✔
139
{
140
    std::vector<std::shared_ptr<System>> systems_result{};
173✔
141

142
    std::lock_guard lock(_mutex);
173✔
143
    for (auto& system : _systems) {
274✔
144
        // We ignore the 0 entry because it's just a null system.
145
        // It's only created because the older, deprecated API needs a
146
        // reference.
147
        if (system.first == 0) {
101✔
148
            continue;
×
149
        }
150
        systems_result.push_back(system.second);
101✔
151
    }
152

153
    return systems_result;
173✔
154
}
173✔
155

156
std::optional<std::shared_ptr<System>> MavsdkImpl::first_autopilot(double timeout_s)
55✔
157
{
158
    {
159
        std::lock_guard lock(_mutex);
55✔
160
        for (auto system : _systems) {
56✔
161
            if (system.second->is_connected() && system.second->has_autopilot()) {
4✔
162
                return system.second;
3✔
163
            }
164
        }
4✔
165
    }
55✔
166

167
    if (timeout_s == 0.0) {
52✔
168
        // Don't wait at all.
169
        return {};
×
170
    }
171

172
    auto prom = std::make_shared<std::promise<std::shared_ptr<System>>>();
52✔
173
    auto fut = prom->get_future();
52✔
174

175
    auto flag = std::make_shared<std::once_flag>();
52✔
176
    auto handle = subscribe_on_new_system([this, prom, flag]() {
52✔
177
        // Check all systems, not just the first one
178
        auto all_systems = systems();
53✔
179
        for (auto& system : all_systems) {
55✔
180
            if (system->is_connected() && system->has_autopilot()) {
54✔
181
                std::call_once(*flag, [prom, system]() { prom->set_value(system); });
104✔
182
                break;
52✔
183
            }
184
        }
185
    });
53✔
186

187
    if (timeout_s > 0.0) {
52✔
188
        if (fut.wait_for(std::chrono::milliseconds(int64_t(timeout_s * 1e3))) ==
52✔
189
            std::future_status::ready) {
190
            unsubscribe_on_new_system(handle);
52✔
191
            return fut.get();
52✔
192

193
        } else {
194
            unsubscribe_on_new_system(handle);
×
195
            return std::nullopt;
×
196
        }
197
    } else {
198
        fut.wait();
×
199
        unsubscribe_on_new_system(handle);
×
200
        return std::optional(fut.get());
×
201
    }
202
}
52✔
203

204
std::shared_ptr<ServerComponent> MavsdkImpl::server_component(unsigned instance)
52✔
205
{
206
    std::lock_guard lock(_mutex);
52✔
207

208
    auto component_type = _configuration.get_component_type();
52✔
209
    switch (component_type) {
52✔
210
        case ComponentType::Autopilot:
52✔
211
        case ComponentType::GroundStation:
212
        case ComponentType::CompanionComputer:
213
        case ComponentType::Camera:
214
        case ComponentType::Gimbal:
215
        case ComponentType::RemoteId:
216
        case ComponentType::Custom:
217
            return server_component_by_type(component_type, instance);
52✔
218
        default:
×
219
            LogErr() << "Unknown component type";
×
220
            return {};
×
221
    }
222
}
52✔
223

224
std::shared_ptr<ServerComponent>
225
MavsdkImpl::server_component_by_type(ComponentType server_component_type, unsigned instance)
52✔
226
{
227
    switch (server_component_type) {
52✔
228
        case ComponentType::Autopilot:
38✔
229
            if (instance == 0) {
38✔
230
                return server_component_by_id(MAV_COMP_ID_AUTOPILOT1);
38✔
231
            } else {
232
                LogErr() << "Only autopilot instance 0 is valid";
×
233
                return {};
×
234
            }
235

236
        case ComponentType::GroundStation:
×
237
            if (instance == 0) {
×
238
                return server_component_by_id(MAV_COMP_ID_MISSIONPLANNER);
×
239
            } else {
240
                LogErr() << "Only one ground station supported at this time";
×
241
                return {};
×
242
            }
243

244
        case ComponentType::CompanionComputer:
1✔
245
            if (instance == 0) {
1✔
246
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER);
1✔
247
            } else if (instance == 1) {
×
248
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER2);
×
249
            } else if (instance == 2) {
×
250
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER3);
×
251
            } else if (instance == 3) {
×
252
                return server_component_by_id(MAV_COMP_ID_ONBOARD_COMPUTER4);
×
253
            } else {
254
                LogErr() << "Only companion computer 0..3 are supported";
×
255
                return {};
×
256
            }
257

258
        case ComponentType::Camera:
13✔
259
            if (instance == 0) {
13✔
260
                return server_component_by_id(MAV_COMP_ID_CAMERA);
13✔
261
            } else if (instance == 1) {
×
262
                return server_component_by_id(MAV_COMP_ID_CAMERA2);
×
263
            } else if (instance == 2) {
×
264
                return server_component_by_id(MAV_COMP_ID_CAMERA3);
×
265
            } else if (instance == 3) {
×
266
                return server_component_by_id(MAV_COMP_ID_CAMERA4);
×
267
            } else if (instance == 4) {
×
268
                return server_component_by_id(MAV_COMP_ID_CAMERA5);
×
269
            } else if (instance == 5) {
×
270
                return server_component_by_id(MAV_COMP_ID_CAMERA6);
×
271
            } else {
272
                LogErr() << "Only camera 0..5 are supported";
×
273
                return {};
×
274
            }
275

276
        default:
×
277
            LogErr() << "Unknown server component type";
×
278
            return {};
×
279
    }
280
}
281

282
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id(uint8_t component_id)
52✔
283
{
284
    if (component_id == 0) {
52✔
285
        LogErr() << "Server component with component ID 0 not allowed";
×
286
        return nullptr;
×
287
    }
288

289
    std::lock_guard lock(_server_components_mutex);
52✔
290

291
    return server_component_by_id_with_lock(component_id);
52✔
292
}
52✔
293

294
std::shared_ptr<ServerComponent> MavsdkImpl::server_component_by_id_with_lock(uint8_t component_id)
253✔
295
{
296
    for (auto& it : _server_components) {
254✔
297
        if (it.first == component_id) {
119✔
298
            if (it.second != nullptr) {
118✔
299
                return it.second;
118✔
300
            } else {
301
                it.second = std::make_shared<ServerComponent>(*this, component_id);
×
302
            }
303
        }
304
    }
305

306
    _server_components.emplace_back(std::pair<uint8_t, std::shared_ptr<ServerComponent>>(
135✔
307
        component_id, std::make_shared<ServerComponent>(*this, component_id)));
270✔
308

309
    return _server_components.back().second;
135✔
310
}
311

312
void MavsdkImpl::forward_message(mavlink_message_t& message, Connection* connection)
43✔
313
{
314
    // Forward_message Function implementing Mavlink routing rules.
315
    // See https://mavlink.io/en/guide/routing.html
316

317
    bool forward_heartbeats_enabled = true;
43✔
318
    const uint8_t target_system_id = get_target_system_id(message);
43✔
319
    const uint8_t target_component_id = get_target_component_id(message);
43✔
320

321
    // If it's a message only for us, we keep it, otherwise, we forward it.
322
    const bool targeted_only_at_us =
323
        (target_system_id == get_own_system_id() && target_component_id == get_own_component_id());
43✔
324

325
    // We don't forward heartbeats unless it's specifically enabled.
326
    const bool heartbeat_check_ok =
43✔
327
        (message.msgid != MAVLINK_MSG_ID_HEARTBEAT || forward_heartbeats_enabled);
43✔
328

329
    if (!targeted_only_at_us && heartbeat_check_ok) {
43✔
330
        unsigned successful_emissions = 0;
36✔
331
        for (auto& entry : _connections) {
108✔
332
            // Check whether the connection is not the one from which we received the message.
333
            // And also check if the connection was set to forward messages.
334
            if (entry.connection.get() == connection ||
108✔
335
                !entry.connection->should_forward_messages()) {
36✔
336
                continue;
36✔
337
            }
338
            auto result = (*entry.connection).send_message(message);
36✔
339
            if (result.first) {
36✔
340
                successful_emissions++;
35✔
341
            } else {
342
                _connections_errors_subscriptions.queue(
2✔
343
                    Mavsdk::ConnectionError{result.second, entry.handle},
1✔
344
                    [this](const auto& func) { call_user_callback(func); });
×
345
            }
346
        }
36✔
347
        if (successful_emissions == 0) {
36✔
348
            LogErr() << "Message forwarding failed";
1✔
349
        }
350
    }
351
}
43✔
352

353
void MavsdkImpl::receive_message(mavlink_message_t& message, Connection* connection)
2,370✔
354
{
355
    {
356
        std::lock_guard lock(_received_messages_mutex);
2,370✔
357
        _received_messages.emplace(ReceivedMessage{std::move(message), connection});
2,378✔
358
    }
2,377✔
359
    _received_messages_cv.notify_one();
2,374✔
360
}
2,380✔
361

362
void MavsdkImpl::receive_libmav_message(
2,372✔
363
    const Mavsdk::MavlinkMessage& message, Connection* connection)
364
{
365
    {
366
        std::lock_guard lock(_received_libmav_messages_mutex);
2,372✔
367
        _received_libmav_messages.emplace(ReceivedLibmavMessage{message, connection});
2,372✔
368
    }
2,372✔
369
    _received_libmav_messages_cv.notify_one();
2,371✔
370
}
2,376✔
371

372
void MavsdkImpl::process_messages()
31,841✔
373
{
374
    std::lock_guard lock(_received_messages_mutex);
31,841✔
375
    while (!_received_messages.empty()) {
33,747✔
376
        auto message_copied = _received_messages.front();
2,356✔
377
        process_message(message_copied.message, message_copied.connection_ptr);
2,360✔
378
        _received_messages.pop();
2,364✔
379
    }
380
}
30,756✔
381

382
void MavsdkImpl::process_libmav_messages()
30,081✔
383
{
384
    std::lock_guard lock(_received_libmav_messages_mutex);
30,081✔
385
    while (!_received_libmav_messages.empty()) {
33,210✔
386
        auto message_copied = _received_libmav_messages.front();
2,354✔
387
        process_libmav_message(message_copied.message, message_copied.connection_ptr);
2,357✔
388
        _received_libmav_messages.pop();
2,357✔
389
    }
2,355✔
390
}
30,641✔
391

392
void MavsdkImpl::process_message(mavlink_message_t& message, Connection* connection)
2,360✔
393
{
394
    // Assumes _received_messages_mutex
395

396
    if (_message_logging_on) {
2,360✔
397
        LogDebug() << "Processing message " << message.msgid << " from "
×
398
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid);
×
399
    }
400

401
    if (_should_exit) {
2,360✔
402
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
403
        return;
×
404
    }
405

406
    {
407
        std::lock_guard lock(_mutex);
2,360✔
408

409
        /** @note: Forward message FIRST (before intercept) if option is enabled and multiple
410
         * interfaces are connected. This ensures that forwarded messages are not affected by
411
         * intercept modifications. Performs message forwarding checks for every messages if message
412
         * forwarding is enabled on at least one connection, and in case of a single forwarding
413
         * connection, we check that it is not the one which received the current message.
414
         *
415
         * Conditions:
416
         * 1. At least 2 connections.
417
         * 2. At least 1 forwarding connection.
418
         * 3. At least 2 forwarding connections or current connection is not forwarding.
419
         */
420

421
        if (_connections.size() > 1 && mavsdk::Connection::forwarding_connections_count() > 0 &&
2,407✔
422
            (mavsdk::Connection::forwarding_connections_count() > 1 ||
43✔
423
             !connection->should_forward_messages())) {
424
            if (_message_logging_on) {
43✔
425
                LogDebug() << "Forwarding message " << message.msgid << " from "
×
426
                           << static_cast<int>(message.sysid) << "/"
×
427
                           << static_cast<int>(message.compid);
×
428
            }
429
            forward_message(message, connection);
43✔
430
        }
431

432
        if (_should_exit) {
2,362✔
433
            // If we're meant to clean up, let's not try to acquire any more locks but bail.
434
            return;
×
435
        }
436

437
        // This is a low level interface where incoming messages can be tampered
438
        // with or even dropped FOR LOCAL PROCESSING ONLY (after forwarding).
439
        {
440
            bool keep = true;
2,356✔
441
            {
442
                std::lock_guard<std::mutex> intercept_lock(_intercept_callbacks_mutex);
2,356✔
443
                if (_intercept_incoming_messages_callback != nullptr) {
2,364✔
444
                    keep = _intercept_incoming_messages_callback(message);
243✔
445
                }
446
            }
2,355✔
447

448
            if (!keep) {
2,360✔
449
                LogDebug() << "Dropped incoming message: " << int(message.msgid);
30✔
450
                return;
32✔
451
            }
452
        }
453

454
        // Don't ever create a system with sysid 0.
455
        if (message.sysid == 0) {
2,330✔
456
            if (_message_logging_on) {
×
457
                LogDebug() << "Ignoring message with sysid == 0";
×
458
            }
459
            return;
×
460
        }
461

462
        // Filter out messages by QGroundControl, however, only do that if MAVSDK
463
        // is also implementing a ground station and not if it is used in another
464
        // configuration, e.g. on a companion.
465
        //
466
        // This is a workaround because PX4 started forwarding messages between
467
        // mavlink instances which leads to existing implementations (including
468
        // examples and integration tests) to connect to QGroundControl by accident
469
        // instead of PX4 because the check `has_autopilot()` is not used.
470

471
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,330✔
472
            message.sysid == 255 && message.compid == MAV_COMP_ID_MISSIONPLANNER) {
2,330✔
473
            if (_message_logging_on) {
×
474
                LogDebug() << "Ignoring messages from QGC as we are also a ground station";
×
475
            }
476
            return;
×
477
        }
478

479
        bool found_system = false;
2,330✔
480
        for (auto& system : _systems) {
2,362✔
481
            if (system.first == message.sysid) {
2,232✔
482
                system.second->system_impl()->add_new_component(message.compid);
2,200✔
483
                found_system = true;
2,198✔
484
                break;
2,198✔
485
            }
486
        }
487

488
        if (!found_system) {
2,335✔
489
            if (_system_debugging) {
132✔
490
                LogWarn() << "Create new system/component " << (int)message.sysid << "/"
×
491
                          << (int)message.compid;
×
492
                LogWarn() << "From message " << (int)message.msgid << " with len "
×
493
                          << (int)message.len;
×
494
                std::string bytes = "";
×
495
                for (unsigned i = 0; i < 12 + message.len; ++i) {
×
496
                    bytes += std::to_string(reinterpret_cast<uint8_t*>(&message)[i]) + ' ';
×
497
                }
498
                LogWarn() << "Bytes: " << bytes;
×
499
            }
×
500
            make_system_with_component(message.sysid, message.compid);
132✔
501

502
            // We now better talk back.
503
            start_sending_heartbeats();
132✔
504
        }
505

506
        if (_should_exit) {
2,335✔
507
            // Don't try to call at() if systems have already been destroyed
508
            // in destructor.
509
            return;
×
510
        }
511
    }
2,360✔
512

513
    mavlink_message_handler.process_message(message);
2,328✔
514

515
    for (auto& system : _systems) {
2,363✔
516
        if (system.first == message.sysid) {
2,362✔
517
            system.second->system_impl()->process_mavlink_message(message);
2,330✔
518
            break;
2,332✔
519
        }
520
    }
521
}
522

523
void MavsdkImpl::process_libmav_message(
2,357✔
524
    const Mavsdk::MavlinkMessage& message, Connection* /* connection */)
525
{
526
    // Assumes _received_libmav_messages_mutex
527

528
    if (_message_logging_on) {
2,357✔
529
        LogDebug() << "MavsdkImpl::process_libmav_message: " << message.message_name << " from "
×
530
                   << static_cast<int>(message.system_id) << "/"
×
531
                   << static_cast<int>(message.component_id);
×
532
    }
533

534
    // JSON message interception for incoming messages
535
    if (!call_json_interception_callbacks(message, _incoming_json_message_subscriptions)) {
2,357✔
536
        // Message was dropped by interception callback
537
        if (_message_logging_on) {
×
538
            LogDebug() << "Incoming JSON message " << message.message_name
×
539
                       << " dropped by interception";
×
540
        }
541
        return;
×
542
    }
543

544
    if (_message_logging_on) {
2,355✔
545
        LogDebug() << "Processing libmav message " << message.message_name << " from "
×
546
                   << static_cast<int>(message.system_id) << "/"
×
547
                   << static_cast<int>(message.component_id);
×
548
    }
549

550
    if (_should_exit) {
2,355✔
551
        // If we're meant to clean up, let's not try to acquire any more locks but bail.
552
        return;
×
553
    }
554

555
    {
556
        std::lock_guard lock(_mutex);
2,352✔
557

558
        // Don't ever create a system with sysid 0.
559
        if (message.system_id == 0) {
2,355✔
560
            if (_message_logging_on) {
×
561
                LogDebug() << "Ignoring libmav message with sysid == 0";
×
562
            }
563
            return;
×
564
        }
565

566
        // Filter out QGroundControl messages similar to regular mavlink processing
567
        if (_configuration.get_component_type() == ComponentType::GroundStation &&
2,355✔
568
            message.system_id == 255 && message.component_id == MAV_COMP_ID_MISSIONPLANNER) {
2,353✔
569
            if (_message_logging_on) {
×
570
                LogDebug() << "Ignoring libmav messages from QGC as we are also a ground station";
×
571
            }
572
            return;
×
573
        }
574

575
        bool found_system = false;
2,353✔
576
        for (auto& system : _systems) {
2,384✔
577
            if (system.first == message.system_id) {
2,381✔
578
                system.second->system_impl()->add_new_component(message.component_id);
2,350✔
579
                found_system = true;
2,349✔
580
                break;
2,349✔
581
            }
582
        }
583

584
        if (!found_system) {
2,355✔
585
            if (_system_debugging) {
6✔
586
                LogWarn() << "Create new system/component from libmav " << (int)message.system_id
×
587
                          << "/" << (int)message.component_id;
×
588
            }
589
            make_system_with_component(message.system_id, message.component_id);
6✔
590

591
            // We now better talk back.
592
            start_sending_heartbeats();
6✔
593
        }
594

595
        if (_should_exit) {
2,355✔
596
            // Don't try to call at() if systems have already been destroyed
597
            // in destructor.
598
            return;
×
599
        }
600
    }
2,350✔
601

602
    // Distribute libmav message to systems for libmav-specific handling
603
    bool found_system = false;
2,354✔
604
    for (auto& system : _systems) {
4,767✔
605
        if (system.first == message.system_id) {
2,413✔
606
            if (_message_logging_on) {
2,355✔
607
                LogDebug() << "Distributing libmav message " << message.message_name
×
608
                           << " to SystemImpl for system " << system.first;
×
609
            }
610
            system.second->system_impl()->process_libmav_message(message);
2,355✔
611
            found_system = true;
2,355✔
612
            // Don't break - distribute to all matching system instances
613
        }
614
    }
615

616
    if (!found_system) {
2,351✔
617
        LogWarn() << "No system found for libmav message " << message.message_name
×
618
                  << " from system " << message.system_id;
×
619
    }
620
}
621

622
bool MavsdkImpl::send_message(mavlink_message_t& message)
2,519✔
623
{
624
    // Create a copy of the message to avoid reference issues
625
    mavlink_message_t message_copy = message;
2,519✔
626

627
    {
628
        std::lock_guard lock(_messages_to_send_mutex);
2,519✔
629
        _messages_to_send.push(std::move(message_copy));
2,516✔
630
    }
2,518✔
631

632
    // For heartbeat messages, we want to process them immediately to speed up system discovery
633
    if (message.msgid == MAVLINK_MSG_ID_HEARTBEAT) {
2,513✔
634
        // Trigger message processing in the work thread
635
        // This is a hint to process messages sooner, but doesn't block
636
        std::this_thread::yield();
439✔
637
    }
638

639
    return true;
2,515✔
640
}
641

642
void MavsdkImpl::deliver_messages()
34,056✔
643
{
644
    // Process messages one at a time to avoid holding the mutex while delivering
645
    while (true) {
646
        mavlink_message_t message;
647
        {
648
            std::lock_guard lock(_messages_to_send_mutex);
34,056✔
649
            if (_messages_to_send.empty()) {
33,657✔
650
                break;
31,241✔
651
            }
652
            message = _messages_to_send.front();
2,517✔
653
            _messages_to_send.pop();
2,517✔
654
        }
33,759✔
655
        deliver_message(message);
2,516✔
656
    }
2,518✔
657
}
31,034✔
658

659
void MavsdkImpl::deliver_message(mavlink_message_t& message)
2,519✔
660
{
661
    if (_message_logging_on) {
2,519✔
662
        LogDebug() << "Sending message " << message.msgid << " from "
×
663
                   << static_cast<int>(message.sysid) << "/" << static_cast<int>(message.compid)
×
664
                   << " to " << static_cast<int>(get_target_system_id(message)) << "/"
×
665
                   << static_cast<int>(get_target_component_id(message));
×
666
    }
667

668
    // This is a low level interface where outgoing messages can be tampered
669
    // with or even dropped.
670
    bool keep = true;
2,519✔
671
    {
672
        std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
2,519✔
673
        if (_intercept_outgoing_messages_callback != nullptr) {
2,518✔
674
            keep = _intercept_outgoing_messages_callback(message);
228✔
675
        }
676
    }
2,515✔
677

678
    if (!keep) {
2,518✔
679
        // We fake that everything was sent as instructed because
680
        // a potential loss would happen later, and we would not be informed
681
        // about it.
682
        LogDebug() << "Dropped outgoing message: " << int(message.msgid);
88✔
683
        return;
173✔
684
    }
685

686
    // JSON message interception for outgoing messages
687
    // Convert mavlink_message_t to Mavsdk::MavlinkMessage for JSON interception
688
    uint8_t buffer[MAVLINK_MAX_PACKET_LEN];
689
    uint16_t len = mavlink_msg_to_send_buffer(buffer, &message);
2,430✔
690

691
    size_t bytes_consumed = 0;
2,427✔
692
    auto libmav_msg_opt = parse_message_safe(buffer, len, bytes_consumed);
2,427✔
693

694
    if (libmav_msg_opt) {
2,429✔
695
        // Create Mavsdk::MavlinkMessage directly for JSON interception
696
        Mavsdk::MavlinkMessage json_message;
2,423✔
697
        json_message.message_name = libmav_msg_opt.value().name();
2,420✔
698
        json_message.system_id = message.sysid;
2,426✔
699
        json_message.component_id = message.compid;
2,426✔
700

701
        // Extract target_system and target_component if present
702
        uint8_t target_system_id = 0;
2,426✔
703
        uint8_t target_component_id = 0;
2,426✔
704
        if (libmav_msg_opt.value().get("target_system", target_system_id) ==
2,426✔
705
            mav::MessageResult::Success) {
706
            json_message.target_system_id = target_system_id;
1,755✔
707
        } else {
708
            json_message.target_system_id = 0;
673✔
709
        }
710
        if (libmav_msg_opt.value().get("target_component", target_component_id) ==
2,428✔
711
            mav::MessageResult::Success) {
712
            json_message.target_component_id = target_component_id;
1,755✔
713
        } else {
714
            json_message.target_component_id = 0;
670✔
715
        }
716

717
        // Generate JSON using LibmavReceiver's public method
718
        auto connections = get_connections();
2,425✔
719
        if (!connections.empty() && connections[0]->get_libmav_receiver()) {
2,424✔
720
            json_message.fields_json =
721
                connections[0]->get_libmav_receiver()->libmav_message_to_json(
7,035✔
722
                    libmav_msg_opt.value());
4,690✔
723
        } else {
724
            // Fallback: create minimal JSON if no receiver available
725
            json_message.fields_json =
726
                "{\"message_id\":" + std::to_string(libmav_msg_opt.value().id()) +
170✔
727
                ",\"message_name\":\"" + libmav_msg_opt.value().name() + "\"}";
169✔
728
        }
729

730
        if (!call_json_interception_callbacks(json_message, _outgoing_json_message_subscriptions)) {
2,430✔
731
            // Message was dropped by JSON interception callback
732
            if (_message_logging_on) {
2✔
733
                LogDebug() << "Outgoing JSON message " << json_message.message_name
×
734
                           << " dropped by interception";
×
735
            }
736
            return;
×
737
        }
738
    }
2,427✔
739

740
    std::lock_guard lock(_mutex);
2,430✔
741

742
    if (_connections.empty()) {
2,423✔
743
        // We obviously can't send any messages without a connection added, so
744
        // we silently ignore this.
745
        return;
85✔
746
    }
747

748
    uint8_t successful_emissions = 0;
2,344✔
749
    for (auto& _connection : _connections) {
4,700✔
750
        const uint8_t target_system_id = get_target_system_id(message);
2,361✔
751

752
        if (target_system_id != 0 && !(*_connection.connection).has_system_id(target_system_id)) {
2,352✔
753
            continue;
4✔
754
        }
755
        const auto result = (*_connection.connection).send_message(message);
2,341✔
756
        if (result.first) {
2,353✔
757
            successful_emissions++;
2,346✔
758
        } else {
759
            _connections_errors_subscriptions.queue(
14✔
760
                Mavsdk::ConnectionError{result.second, _connection.handle},
7✔
761
                [this](const auto& func) { call_user_callback(func); });
×
762
        }
763
    }
2,353✔
764

765
    if (successful_emissions == 0) {
2,343✔
766
        LogErr() << "Sending message failed";
5✔
767
    }
768
}
2,428✔
769

770
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_any_connection(
140✔
771
    const std::string& connection_url, ForwardingOption forwarding_option)
772
{
773
    CliArg cli_arg;
140✔
774
    if (!cli_arg.parse(connection_url)) {
140✔
775
        return {ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle{}};
×
776
    }
777

778
    return std::visit(
140✔
779
        overloaded{
280✔
780
            [](std::monostate) {
×
781
                // Should not happen anyway.
782
                return std::pair<ConnectionResult, Mavsdk::ConnectionHandle>{
×
783
                    ConnectionResult::ConnectionUrlInvalid, Mavsdk::ConnectionHandle()};
×
784
            },
785
            [this, forwarding_option](const CliArg::Udp& udp) {
130✔
786
                return add_udp_connection(udp, forwarding_option);
130✔
787
            },
788
            [this, forwarding_option](const CliArg::Tcp& tcp) {
8✔
789
                return add_tcp_connection(tcp, forwarding_option);
8✔
790
            },
791
            [this, forwarding_option](const CliArg::Serial& serial) {
×
792
                return add_serial_connection(
×
793
                    serial.path, serial.baudrate, serial.flow_control_enabled, forwarding_option);
×
794
            },
795
            [this, forwarding_option](const CliArg::Raw&) {
2✔
796
                return add_raw_connection(forwarding_option);
2✔
797
            }},
798
        cli_arg.protocol);
140✔
799
}
140✔
800

801
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
802
MavsdkImpl::add_udp_connection(const CliArg::Udp& udp, ForwardingOption forwarding_option)
130✔
803
{
804
    auto new_conn = std::make_unique<UdpConnection>(
805
        [this](mavlink_message_t& message, Connection* connection) {
2,319✔
806
            receive_message(message, connection);
2,319✔
807
        },
2,319✔
808
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
2,317✔
809
            receive_libmav_message(message, connection);
2,317✔
810
        },
2,317✔
811
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
812
        udp.mode == CliArg::Udp::Mode::In ? udp.host : "0.0.0.0",
260✔
813
        udp.mode == CliArg::Udp::Mode::In ? udp.port : 0,
130✔
814
        forwarding_option);
130✔
815

816
    if (!new_conn) {
130✔
817
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
818
    }
819

820
    ConnectionResult ret = new_conn->start();
130✔
821

822
    if (ret != ConnectionResult::Success) {
130✔
823
        return {ret, Mavsdk::ConnectionHandle{}};
×
824
    }
825

826
    if (udp.mode == CliArg::Udp::Mode::Out) {
130✔
827
        // We need to add the IP rather than a hostname, otherwise we end up with two remotes:
828
        // one for the IP, and one for a hostname.
829
        auto remote_ip = resolve_hostname_to_ip(udp.host);
65✔
830

831
        if (!remote_ip) {
65✔
832
            return {ConnectionResult::DestinationIpUnknown, Mavsdk::ConnectionHandle{}};
×
833
        }
834

835
        new_conn->add_remote_to_keep(remote_ip.value(), udp.port);
65✔
836
        std::lock_guard lock(_mutex);
65✔
837

838
        // With a UDP remote, we need to initiate the connection by sending heartbeats.
839
        auto new_configuration = get_configuration();
65✔
840
        new_configuration.set_always_send_heartbeats(true);
65✔
841
        set_configuration(new_configuration);
65✔
842
    }
65✔
843

844
    auto handle = add_connection(std::move(new_conn));
130✔
845

846
    return {ConnectionResult::Success, handle};
130✔
847
}
130✔
848

849
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
850
MavsdkImpl::add_tcp_connection(const CliArg::Tcp& tcp, ForwardingOption forwarding_option)
8✔
851
{
852
    if (tcp.mode == CliArg::Tcp::Mode::Out) {
8✔
853
        auto new_conn = std::make_unique<TcpClientConnection>(
854
            [this](mavlink_message_t& message, Connection* connection) {
31✔
855
                receive_message(message, connection);
31✔
856
            },
31✔
857
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
30✔
858
                receive_libmav_message(message, connection);
30✔
859
            },
30✔
860
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
861
            tcp.host,
4✔
862
            tcp.port,
4✔
863
            forwarding_option);
4✔
864
        if (!new_conn) {
4✔
865
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
866
        }
867
        ConnectionResult ret = new_conn->start();
4✔
868
        if (ret == ConnectionResult::Success) {
4✔
869
            return {ret, add_connection(std::move(new_conn))};
4✔
870
        } else {
871
            return {ret, Mavsdk::ConnectionHandle{}};
×
872
        }
873
    } else {
4✔
874
        auto new_conn = std::make_unique<TcpServerConnection>(
875
            [this](mavlink_message_t& message, Connection* connection) {
29✔
876
                receive_message(message, connection);
29✔
877
            },
29✔
878
            [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
27✔
879
                receive_libmav_message(message, connection);
27✔
880
            },
27✔
881
            *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
882
            tcp.host,
4✔
883
            tcp.port,
4✔
884
            forwarding_option);
4✔
885
        if (!new_conn) {
4✔
886
            return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
887
        }
888
        ConnectionResult ret = new_conn->start();
4✔
889
        if (ret == ConnectionResult::Success) {
4✔
890
            return {ret, add_connection(std::move(new_conn))};
4✔
891
        } else {
892
            return {ret, Mavsdk::ConnectionHandle{}};
×
893
        }
894
    }
4✔
895
}
896

897
std::pair<ConnectionResult, Mavsdk::ConnectionHandle> MavsdkImpl::add_serial_connection(
×
898
    const std::string& dev_path,
899
    int baudrate,
900
    bool flow_control,
901
    ForwardingOption forwarding_option)
902
{
903
    auto new_conn = std::make_unique<SerialConnection>(
904
        [this](mavlink_message_t& message, Connection* connection) {
×
905
            receive_message(message, connection);
×
906
        },
×
907
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
×
908
            receive_libmav_message(message, connection);
×
909
        },
×
910
        *this, // Pass MavsdkImpl reference for thread-safe MessageSet access
911
        dev_path,
912
        baudrate,
913
        flow_control,
914
        forwarding_option);
×
915
    if (!new_conn) {
×
916
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
917
    }
918
    ConnectionResult ret = new_conn->start();
×
919
    if (ret == ConnectionResult::Success) {
×
920
        auto handle = add_connection(std::move(new_conn));
×
921

922
        auto new_configuration = get_configuration();
×
923

924
        // PX4 starting with v1.13 does not send heartbeats by default, so we need
925
        // to initiate the MAVLink connection by sending heartbeats.
926
        // Therefore, we override the default here and enable sending heartbeats.
927
        new_configuration.set_always_send_heartbeats(true);
×
928
        set_configuration(new_configuration);
×
929

930
        return {ret, handle};
×
931

932
    } else {
933
        return {ret, Mavsdk::ConnectionHandle{}};
×
934
    }
935
}
×
936

937
std::pair<ConnectionResult, Mavsdk::ConnectionHandle>
938
MavsdkImpl::add_raw_connection(ForwardingOption forwarding_option)
2✔
939
{
940
    // Check if a raw connection already exists
941
    if (find_raw_connection() != nullptr) {
2✔
942
        LogErr() << "Raw connection already exists. Only one raw connection is allowed.";
×
943
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
944
    }
945

946
    auto new_conn = std::make_unique<RawConnection>(
947
        [this](mavlink_message_t& message, Connection* connection) {
1✔
948
            receive_message(message, connection);
1✔
949
        },
1✔
950
        [this](const Mavsdk::MavlinkMessage& message, Connection* connection) {
1✔
951
            receive_libmav_message(message, connection);
1✔
952
        },
1✔
953
        *this,
954
        forwarding_option);
2✔
955

956
    if (!new_conn) {
2✔
957
        return {ConnectionResult::ConnectionError, Mavsdk::ConnectionHandle{}};
×
958
    }
959

960
    ConnectionResult ret = new_conn->start();
2✔
961
    if (ret != ConnectionResult::Success) {
2✔
962
        return {ret, Mavsdk::ConnectionHandle{}};
×
963
    }
964

965
    auto handle = add_connection(std::move(new_conn));
2✔
966

967
    // Enable heartbeats for raw connection
968
    auto new_configuration = get_configuration();
2✔
969
    new_configuration.set_always_send_heartbeats(true);
2✔
970
    set_configuration(new_configuration);
2✔
971

972
    return {ConnectionResult::Success, handle};
2✔
973
}
2✔
974

975
Mavsdk::ConnectionHandle MavsdkImpl::add_connection(std::unique_ptr<Connection>&& new_connection)
140✔
976
{
977
    std::lock_guard lock(_mutex);
140✔
978
    auto handle = _connections_handle_factory.create();
140✔
979
    _connections.emplace_back(ConnectionEntry{std::move(new_connection), handle});
140✔
980

981
    return handle;
280✔
982
}
140✔
983

984
void MavsdkImpl::remove_connection(Mavsdk::ConnectionHandle handle)
12✔
985
{
986
    std::lock_guard lock(_mutex);
12✔
987

988
    _connections.erase(std::remove_if(_connections.begin(), _connections.end(), [&](auto&& entry) {
12✔
989
        return (entry.handle == handle);
12✔
990
    }));
991
}
12✔
992

993
Mavsdk::Configuration MavsdkImpl::get_configuration() const
67✔
994
{
995
    std::lock_guard configuration_lock(_mutex);
67✔
996
    return _configuration;
134✔
997
}
67✔
998

999
void MavsdkImpl::set_configuration(Mavsdk::Configuration new_configuration)
201✔
1000
{
1001
    std::lock_guard server_components_lock(_server_components_mutex);
201✔
1002
    // We just point the default to the newly created component. This means
1003
    // that the previous default component will be deleted if it is not
1004
    // used/referenced anywhere.
1005
    _default_server_component =
1006
        server_component_by_id_with_lock(new_configuration.get_component_id());
201✔
1007

1008
    if (new_configuration.get_always_send_heartbeats() &&
335✔
1009
        !_configuration.get_always_send_heartbeats()) {
134✔
1010
        start_sending_heartbeats();
71✔
1011
    } else if (
130✔
1012
        !new_configuration.get_always_send_heartbeats() &&
197✔
1013
        _configuration.get_always_send_heartbeats() && !is_any_system_connected()) {
197✔
1014
        stop_sending_heartbeats();
×
1015
    }
1016

1017
    _configuration = new_configuration;
201✔
1018
    // We cache these values as atomic to avoid having to lock any mutex for them.
1019
    _our_system_id = new_configuration.get_system_id();
201✔
1020
    _our_component_id = new_configuration.get_component_id();
201✔
1021
}
201✔
1022

1023
uint8_t MavsdkImpl::get_own_system_id() const
5,733✔
1024
{
1025
    return _our_system_id;
5,733✔
1026
}
1027

1028
uint8_t MavsdkImpl::get_own_component_id() const
1,410✔
1029
{
1030
    return _our_component_id;
1,410✔
1031
}
1032

1033
uint8_t MavsdkImpl::channel() const
×
1034
{
1035
    // TODO
1036
    return 0;
×
1037
}
1038

1039
Autopilot MavsdkImpl::autopilot() const
×
1040
{
1041
    // TODO
1042
    return Autopilot::Px4;
×
1043
}
1044

1045
// FIXME: this should be per component
1046
uint8_t MavsdkImpl::get_mav_type() const
439✔
1047
{
1048
    return _configuration.get_mav_type();
439✔
1049
}
1050

1051
void MavsdkImpl::make_system_with_component(uint8_t system_id, uint8_t comp_id)
138✔
1052
{
1053
    // Needs _systems_lock
1054

1055
    if (_should_exit) {
138✔
1056
        // When the system got destroyed in the destructor, we have to give up.
1057
        return;
×
1058
    }
1059

1060
    if (static_cast<int>(system_id) == 0 && static_cast<int>(comp_id) == 0) {
138✔
1061
        LogDebug() << "Initializing connection to remote system...";
×
1062
    } else {
1063
        LogDebug() << "New system ID: " << static_cast<int>(system_id)
276✔
1064
                   << " Comp ID: " << static_cast<int>(comp_id);
138✔
1065
    }
1066

1067
    // Make a system with its first component
1068
    auto new_system = std::make_shared<System>(*this);
138✔
1069
    new_system->init(system_id, comp_id);
138✔
1070

1071
    _systems.emplace_back(system_id, new_system);
138✔
1072
}
138✔
1073

1074
void MavsdkImpl::notify_on_discover()
143✔
1075
{
1076
    // Queue the callbacks without holding the mutex to avoid deadlocks
1077
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
206✔
1078
}
143✔
1079

1080
void MavsdkImpl::notify_on_timeout()
6✔
1081
{
1082
    // Queue the callbacks without holding the mutex to avoid deadlocks
1083
    _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
6✔
1084
}
6✔
1085

1086
Mavsdk::NewSystemHandle
1087
MavsdkImpl::subscribe_on_new_system(const Mavsdk::NewSystemCallback& callback)
62✔
1088
{
1089
    std::lock_guard lock(_mutex);
62✔
1090

1091
    const auto handle = _new_system_callbacks.subscribe(callback);
62✔
1092

1093
    if (is_any_system_connected()) {
62✔
1094
        _new_system_callbacks.queue([this](const auto& func) { call_user_callback(func); });
×
1095
    }
1096

1097
    return handle;
124✔
1098
}
62✔
1099

1100
void MavsdkImpl::unsubscribe_on_new_system(Mavsdk::NewSystemHandle handle)
61✔
1101
{
1102
    _new_system_callbacks.unsubscribe(handle);
61✔
1103
}
61✔
1104

1105
bool MavsdkImpl::is_any_system_connected() const
62✔
1106
{
1107
    std::vector<std::shared_ptr<System>> connected_systems = systems();
62✔
1108
    return std::any_of(connected_systems.cbegin(), connected_systems.cend(), [](auto& system) {
62✔
1109
        return system->is_connected();
×
1110
    });
62✔
1111
}
62✔
1112

1113
void MavsdkImpl::work_thread()
134✔
1114
{
1115
    while (!_should_exit) {
31,672✔
1116
        // Process incoming messages
1117
        process_messages();
30,893✔
1118

1119
        // Process incoming libmav messages
1120
        process_libmav_messages();
31,025✔
1121

1122
        // Run timers
1123
        timeout_handler.run_once();
30,811✔
1124
        call_every_handler.run_once();
31,555✔
1125

1126
        // Do component work
1127
        {
1128
            std::lock_guard lock(_server_components_mutex);
31,701✔
1129
            for (auto& it : _server_components) {
63,046✔
1130
                if (it.second != nullptr) {
31,784✔
1131
                    it.second->_impl->do_work();
31,592✔
1132
                }
1133
            }
1134
        }
30,929✔
1135

1136
        // Deliver outgoing messages
1137
        deliver_messages();
31,080✔
1138

1139
        // If no messages to send, check if there are messages to receive
1140
        std::unique_lock lock_received(_received_messages_mutex);
30,998✔
1141
        if (_received_messages.empty()) {
31,081✔
1142
            // No messages to process, wait for a signal or timeout
1143
            _received_messages_cv.wait_for(lock_received, std::chrono::milliseconds(10), [this]() {
30,752✔
1144
                return !_received_messages.empty() || _should_exit;
62,702✔
1145
            });
1146
        }
1147
    }
31,741✔
1148
}
100✔
1149

1150
void MavsdkImpl::call_user_callback_located(
1,168✔
1151
    const std::string& filename, const int linenumber, const std::function<void()>& func)
1152
{
1153
    // Don't enqueue callbacks if we're shutting down
1154
    if (_should_exit) {
1,168✔
1155
        return;
×
1156
    }
1157

1158
    auto callback_size = _user_callback_queue.size();
1,168✔
1159

1160
    if (_callback_tracker) {
1,168✔
1161
        _callback_tracker->record_queued(filename, linenumber);
×
1162
        _callback_tracker->maybe_print_stats(callback_size);
×
1163
    }
1164

1165
    if (callback_size >= 100) {
1,168✔
1166
        return;
×
1167

1168
    } else if (callback_size == 99) {
1,168✔
1169
        LogErr()
×
1170
            << "User callback queue overflown\n"
1171
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1172
        return;
×
1173

1174
    } else if (callback_size >= 10) {
1,168✔
1175
        LogWarn()
×
1176
            << "User callback queue slow (queue size: " << callback_size
×
1177
            << ").\n"
1178
               "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1179
    }
1180

1181
    // We only need to keep track of filename and linenumber if we're actually debugging this.
1182
    UserCallback user_callback =
1183
        _callback_debugging ? UserCallback{func, filename, linenumber} : UserCallback{func};
2,335✔
1184

1185
    _user_callback_queue.push_back(std::make_shared<UserCallback>(user_callback));
1,168✔
1186
}
1,168✔
1187

1188
void MavsdkImpl::process_user_callbacks_thread()
134✔
1189
{
1190
    while (!_should_exit) {
1,436✔
1191
        UserCallback callback;
1,302✔
1192
        {
1193
            LockedQueue<UserCallback>::Guard guard(_user_callback_queue);
1,302✔
1194
            auto ptr = guard.wait_and_pop_front();
1,302✔
1195
            if (!ptr) {
1,302✔
1196
                continue;
134✔
1197
            }
1198
            // We need to get a copy instead of just a shared_ptr because the queue might
1199
            // be invalidated when the lock is released.
1200
            callback = *ptr;
1,168✔
1201
        }
1,436✔
1202

1203
        // Check if we're in the process of shutting down before executing the callback
1204
        if (_should_exit) {
1,168✔
1205
            continue;
×
1206
        }
1207

1208
        const double timeout_s = 1.0;
1,168✔
1209
        auto cookie = timeout_handler.add(
1,168✔
1210
            [&]() {
×
1211
                if (_callback_debugging) {
×
1212
                    LogWarn() << "Callback called from " << callback.filename << ":"
×
1213
                              << callback.linenumber << " took more than " << timeout_s
×
1214
                              << " second to run.";
×
1215
                    fflush(stdout);
×
1216
                    fflush(stderr);
×
1217
                    abort();
×
1218
                } else {
1219
                    LogWarn()
×
1220
                        << "Callback took more than " << timeout_s << " second to run.\n"
×
1221
                        << "See: https://mavsdk.mavlink.io/main/en/cpp/troubleshooting.html#user_callbacks";
×
1222
                }
1223
            },
×
1224
            timeout_s);
1225
        auto callback_start = std::chrono::steady_clock::now();
1,168✔
1226
        callback.func();
1,168✔
1227
        auto callback_end = std::chrono::steady_clock::now();
1,168✔
1228
        timeout_handler.remove(cookie);
1,168✔
1229

1230
        if (_callback_tracker) {
1,168✔
1231
            auto callback_duration_us =
1232
                std::chrono::duration_cast<std::chrono::microseconds>(callback_end - callback_start)
×
1233
                    .count();
×
1234
            _callback_tracker->record_executed(
×
1235
                callback.filename, callback.linenumber, callback_duration_us);
1236
        }
1237
    }
1,302✔
1238
}
134✔
1239

1240
void MavsdkImpl::start_sending_heartbeats()
352✔
1241
{
1242
    // Check if we're in the process of shutting down
1243
    if (_should_exit) {
352✔
1244
        return;
×
1245
    }
1246

1247
    // Before sending out first heartbeats we need to make sure we have a
1248
    // default server component.
1249
    default_server_component_impl();
352✔
1250

1251
    {
1252
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
352✔
1253
        call_every_handler.remove(_heartbeat_send_cookie);
352✔
1254
        _heartbeat_send_cookie =
352✔
1255
            call_every_handler.add([this]() { send_heartbeats(); }, HEARTBEAT_SEND_INTERVAL_S);
789✔
1256
    }
352✔
1257
}
1258

1259
void MavsdkImpl::stop_sending_heartbeats()
6✔
1260
{
1261
    if (!_configuration.get_always_send_heartbeats()) {
6✔
1262
        std::lock_guard<std::mutex> lock(_heartbeat_mutex);
1✔
1263
        call_every_handler.remove(_heartbeat_send_cookie);
1✔
1264
    }
1✔
1265
}
6✔
1266

1267
ServerComponentImpl& MavsdkImpl::default_server_component_impl()
1,328✔
1268
{
1269
    std::lock_guard lock(_server_components_mutex);
1,328✔
1270
    return default_server_component_with_lock();
1,328✔
1271
}
1,328✔
1272

1273
ServerComponentImpl& MavsdkImpl::default_server_component_with_lock()
1,328✔
1274
{
1275
    if (_default_server_component == nullptr) {
1,328✔
1276
        _default_server_component = server_component_by_id_with_lock(_our_component_id);
×
1277
    }
1278
    return *_default_server_component->_impl;
1,328✔
1279
}
1280

1281
void MavsdkImpl::send_heartbeats()
433✔
1282
{
1283
    std::lock_guard lock(_server_components_mutex);
433✔
1284

1285
    for (auto& it : _server_components) {
875✔
1286
        if (it.second != nullptr) {
440✔
1287
            it.second->_impl->send_heartbeat();
440✔
1288
        }
1289
    }
1290
}
431✔
1291

1292
void MavsdkImpl::intercept_incoming_messages_async(std::function<bool(mavlink_message_t&)> callback)
24✔
1293
{
1294
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
24✔
1295
    _intercept_incoming_messages_callback = callback;
24✔
1296
}
24✔
1297

1298
void MavsdkImpl::intercept_outgoing_messages_async(std::function<bool(mavlink_message_t&)> callback)
14✔
1299
{
1300
    std::lock_guard<std::mutex> lock(_intercept_callbacks_mutex);
14✔
1301
    _intercept_outgoing_messages_callback = callback;
14✔
1302
}
14✔
1303

1304
bool MavsdkImpl::call_json_interception_callbacks(
4,783✔
1305
    const Mavsdk::MavlinkMessage& json_message,
1306
    std::vector<std::pair<Mavsdk::InterceptJsonHandle, Mavsdk::InterceptJsonCallback>>&
1307
        callback_list)
1308
{
1309
    bool keep_message = true;
4,783✔
1310

1311
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
4,783✔
1312
    for (const auto& subscription : callback_list) {
4,784✔
1313
        if (!subscription.second(json_message)) {
3✔
1314
            keep_message = false;
×
1315
        }
1316
    }
1317

1318
    return keep_message;
9,551✔
1319
}
4,768✔
1320

1321
Mavsdk::InterceptJsonHandle
1322
MavsdkImpl::subscribe_incoming_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1323
{
1324
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1325
    auto handle = _json_handle_factory.create();
1✔
1326
    _incoming_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1327
    return handle;
2✔
1328
}
1✔
1329

1330
void MavsdkImpl::unsubscribe_incoming_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1331
{
1332
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1333
    auto it = std::find_if(
1✔
1334
        _incoming_json_message_subscriptions.begin(),
1335
        _incoming_json_message_subscriptions.end(),
1336
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1337
    if (it != _incoming_json_message_subscriptions.end()) {
1✔
1338
        _incoming_json_message_subscriptions.erase(it);
1✔
1339
    }
1340
}
1✔
1341

1342
Mavsdk::InterceptJsonHandle
1343
MavsdkImpl::subscribe_outgoing_messages_json(const Mavsdk::InterceptJsonCallback& callback)
1✔
1344
{
1345
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1346
    auto handle = _json_handle_factory.create();
1✔
1347
    _outgoing_json_message_subscriptions.push_back(std::make_pair(handle, callback));
1✔
1348
    return handle;
2✔
1349
}
1✔
1350

1351
void MavsdkImpl::unsubscribe_outgoing_messages_json(Mavsdk::InterceptJsonHandle handle)
1✔
1352
{
1353
    std::lock_guard<std::mutex> lock(_json_subscriptions_mutex);
1✔
1354
    auto it = std::find_if(
1✔
1355
        _outgoing_json_message_subscriptions.begin(),
1356
        _outgoing_json_message_subscriptions.end(),
1357
        [handle](const auto& subscription) { return subscription.first == handle; });
1✔
1358
    if (it != _outgoing_json_message_subscriptions.end()) {
1✔
1359
        _outgoing_json_message_subscriptions.erase(it);
1✔
1360
    }
1361
}
1✔
1362

1363
RawConnection* MavsdkImpl::find_raw_connection()
4✔
1364
{
1365
    std::lock_guard lock(_mutex);
4✔
1366

1367
    for (auto& entry : _connections) {
4✔
1368
        auto* raw_conn = dynamic_cast<RawConnection*>(entry.connection.get());
2✔
1369
        if (raw_conn != nullptr) {
2✔
1370
            return raw_conn;
2✔
1371
        }
1372
    }
1373
    return nullptr;
2✔
1374
}
4✔
1375

1376
void MavsdkImpl::pass_received_raw_bytes(const char* bytes, size_t length)
1✔
1377
{
1378
    auto* raw_conn = find_raw_connection();
1✔
1379
    if (raw_conn == nullptr) {
1✔
1380
        LogErr()
×
1381
            << "No raw connection available. Please add one using add_any_connection(\"raw://\")";
×
1382
        return;
×
1383
    }
1384

1385
    raw_conn->receive(bytes, length);
1✔
1386
}
1387

1388
Mavsdk::RawBytesHandle
1389
MavsdkImpl::subscribe_raw_bytes_to_be_sent(const Mavsdk::RawBytesCallback& callback)
1✔
1390
{
1391
    if (find_raw_connection() == nullptr) {
1✔
1392
        LogWarn() << "No raw connection available. Subscription will only receive bytes after you "
×
1393
                     "add a connection using add_any_connection(\"raw://\")";
×
1394
    }
1395
    return _raw_bytes_subscriptions.subscribe(callback);
1✔
1396
}
1397

1398
void MavsdkImpl::unsubscribe_raw_bytes_to_be_sent(Mavsdk::RawBytesHandle handle)
1✔
1399
{
1400
    _raw_bytes_subscriptions.unsubscribe(handle);
1✔
1401
}
1✔
1402

1403
bool MavsdkImpl::notify_raw_bytes_sent(const char* bytes, size_t length)
3✔
1404
{
1405
    if (_raw_bytes_subscriptions.empty()) {
3✔
1406
        return false;
2✔
1407
    }
1408

1409
    _raw_bytes_subscriptions(bytes, length);
1✔
1410

1411
    return true;
1✔
1412
}
1413

1414
Mavsdk::ConnectionErrorHandle
1415
MavsdkImpl::subscribe_connection_errors(Mavsdk::ConnectionErrorCallback callback)
×
1416
{
1417
    std::lock_guard lock(_mutex);
×
1418

1419
    const auto handle = _connections_errors_subscriptions.subscribe(callback);
×
1420

1421
    return handle;
×
1422
}
×
1423

1424
void MavsdkImpl::unsubscribe_connection_errors(Mavsdk::ConnectionErrorHandle handle)
×
1425
{
1426
    std::lock_guard lock(_mutex);
×
1427
    _connections_errors_subscriptions.unsubscribe(handle);
×
1428
}
×
1429

1430
uint8_t MavsdkImpl::get_target_system_id(const mavlink_message_t& message)
2,404✔
1431
{
1432
    // Checks whether connection knows target system ID by extracting target system if set.
1433
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
2,404✔
1434

1435
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_SYSTEM)) {
2,402✔
1436
        return 0;
632✔
1437
    }
1438

1439
    // Don't look at the target system offset if it is outside the payload length.
1440
    // This can happen if the fields are trimmed.
1441
    if (meta->target_system_ofs >= message.len) {
1,770✔
1442
        return 0;
24✔
1443
    }
1444

1445
    return (_MAV_PAYLOAD(&message))[meta->target_system_ofs];
1,746✔
1446
}
1447

1448
uint8_t MavsdkImpl::get_target_component_id(const mavlink_message_t& message)
43✔
1449
{
1450
    // Checks whether connection knows target system ID by extracting target system if set.
1451
    const mavlink_msg_entry_t* meta = mavlink_get_msg_entry(message.msgid);
43✔
1452

1453
    if (meta == nullptr || !(meta->flags & MAV_MSG_ENTRY_FLAG_HAVE_TARGET_COMPONENT)) {
43✔
1454
        return 0;
30✔
1455
    }
1456

1457
    // Don't look at the target component offset if it is outside the payload length.
1458
    // This can happen if the fields are trimmed.
1459
    if (meta->target_component_ofs >= message.len) {
13✔
1460
        return 0;
×
1461
    }
1462

1463
    return (_MAV_PAYLOAD(&message))[meta->target_component_ofs];
13✔
1464
}
1465

1466
Sender& MavsdkImpl::sender()
×
1467
{
1468
    std::lock_guard lock(_server_components_mutex);
×
1469
    return default_server_component_with_lock().sender();
×
1470
}
×
1471

1472
std::vector<Connection*> MavsdkImpl::get_connections() const
2,425✔
1473
{
1474
    std::lock_guard lock(_mutex);
2,425✔
1475
    std::vector<Connection*> connections;
2,425✔
1476
    for (const auto& connection_entry : _connections) {
4,784✔
1477
        connections.push_back(connection_entry.connection.get());
2,359✔
1478
    }
1479
    return connections;
2,424✔
1480
}
2,424✔
1481

1482
mav::MessageSet& MavsdkImpl::get_message_set() const
23✔
1483
{
1484
    // Note: This returns a reference to MessageSet without locking.
1485
    // Thread safety for MessageSet operations must be ensured by:
1486
    // 1. Using load_custom_xml_to_message_set() for write operations (XML loading)
1487
    // 2. libmav MessageSet should be internally thread-safe for read operations
1488
    // 3. If race conditions persist, consider implementing a thread-safe MessageSet wrapper
1489
    return *_message_set;
23✔
1490
}
1491

1492
bool MavsdkImpl::load_custom_xml_to_message_set(const std::string& xml_content)
6✔
1493
{
1494
    std::lock_guard<std::mutex> lock(_message_set_mutex);
6✔
1495
    auto result = _message_set->addFromXMLString(xml_content, false /* recursive_open_includes */);
6✔
1496
    return result == ::mav::MessageSetResult::Success;
12✔
1497
}
6✔
1498

1499
// Thread-safe MessageSet read operations
1500
std::optional<std::string> MavsdkImpl::message_id_to_name_safe(uint32_t id) const
×
1501
{
1502
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1503
    auto message_def = _message_set->getMessageDefinition(static_cast<int>(id));
×
1504
    if (message_def) {
×
1505
        return message_def.get().name();
×
1506
    }
1507
    return std::nullopt;
×
1508
}
×
1509

1510
std::optional<int> MavsdkImpl::message_name_to_id_safe(const std::string& name) const
×
1511
{
1512
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1513
    return _message_set->idForMessage(name);
×
1514
}
×
1515

1516
std::optional<mav::Message> MavsdkImpl::create_message_safe(const std::string& message_name) const
×
1517
{
1518
    std::lock_guard<std::mutex> lock(_message_set_mutex);
×
1519
    return _message_set->create(message_name);
×
1520
}
×
1521

1522
// Thread-safe parsing for LibmavReceiver
1523
std::optional<mav::Message> MavsdkImpl::parse_message_safe(
4,797✔
1524
    const uint8_t* buffer, size_t buffer_len, size_t& bytes_consumed) const
1525
{
1526
    std::lock_guard<std::mutex> lock(_message_set_mutex);
4,797✔
1527
    return _buffer_parser->parseMessage(buffer, buffer_len, bytes_consumed);
4,797✔
1528
}
4,801✔
1529

1530
mav::OptionalReference<const mav::MessageDefinition>
1531
MavsdkImpl::get_message_definition_safe(int message_id) const
4,713✔
1532
{
1533
    std::lock_guard<std::mutex> lock(_message_set_mutex);
4,713✔
1534
    return _message_set->getMessageDefinition(message_id);
4,710✔
1535
}
4,716✔
1536

1537
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc