• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 19916797358

04 Dec 2025 03:36AM UTC coverage: 48.004%. First build
19916797358

Pull #2728

github

web-flow
Merge 2158a0ede into de36b0d7d
Pull Request #2728: log_streaming: add ArduPilot support

0 of 425 new or added lines in 6 files covered. (0.0%)

17653 of 36774 relevant lines covered (48.0%)

463.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mavsdk/plugins/log_streaming/log_streaming_backend_px4.cpp
1
#include "log_streaming_backend_px4.h"
2

3
#include <limits>
4

5
// Note: the implementation below is mostly inspired by the pymavlink
6
//       implementation written by Beat Küng:
7
//       https://github.com/PX4/PX4-Autopilot/blob/main/Tools/mavlink_ulog_streaming.py
8
//
9
// For more information about the ulog protocol, visit:
10
// https://docs.px4.io/main/en/dev_log/ulog_file_format.html
11

12
namespace mavsdk {
13

NEW
14
void LogStreamingBackendPx4::init(SystemImpl* system_impl)
×
15
{
NEW
16
    _system_impl = system_impl;
×
17

NEW
18
    _system_impl->register_mavlink_message_handler(
×
19
        MAVLINK_MSG_ID_LOGGING_DATA,
NEW
20
        [this](const mavlink_message_t& message) { process_logging_data(message); },
×
21
        this);
NEW
22
    _system_impl->register_mavlink_message_handler(
×
23
        MAVLINK_MSG_ID_LOGGING_DATA_ACKED,
NEW
24
        [this](const mavlink_message_t& message) { process_logging_data_acked(message); },
×
25
        this);
NEW
26
}
×
27

NEW
28
void LogStreamingBackendPx4::deinit()
×
29
{
NEW
30
    auto is_active = [this]() {
×
NEW
31
        std::lock_guard<std::mutex> lock(_mutex);
×
NEW
32
        return _active;
×
NEW
33
    }();
×
34

NEW
35
    if (is_active) {
×
36
        // We try to stop log streaming before we leave but without waiting for a result.
NEW
37
        stop_log_streaming_async(nullptr);
×
38
    }
39

NEW
40
    if (_system_impl) {
×
NEW
41
        _system_impl->unregister_all_mavlink_message_handlers(this);
×
42
    }
NEW
43
}
×
44

NEW
45
void LogStreamingBackendPx4::set_data_callback(DataCallback callback)
×
46
{
NEW
47
    std::lock_guard<std::mutex> lock(_mutex);
×
NEW
48
    _data_callback = std::move(callback);
×
NEW
49
}
×
50

NEW
51
void LogStreamingBackendPx4::set_debugging(bool debugging)
×
52
{
NEW
53
    _debugging = debugging;
×
NEW
54
}
×
55

NEW
56
void LogStreamingBackendPx4::start_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
57
{
58
    {
NEW
59
        std::lock_guard<std::mutex> lock(_mutex);
×
NEW
60
        reset();
×
NEW
61
        _active = true;
×
NEW
62
    }
×
63

NEW
64
    MavlinkCommandSender::CommandLong command{};
×
NEW
65
    command.command = MAV_CMD_LOGGING_START;
×
NEW
66
    command.params.maybe_param1 = 0.0f; // ulog
×
67

NEW
68
    _system_impl->send_command_async(command, [=](MavlinkCommandSender::Result result, float) {
×
NEW
69
        if (result != MavlinkCommandSender::Result::InProgress) {
×
NEW
70
            if (callback) {
×
NEW
71
                _system_impl->call_user_callback(
×
72
                    [=]() { callback(log_streaming_result_from_command_result(result)); });
73
            }
74
        }
NEW
75
    });
×
NEW
76
}
×
77

NEW
78
void LogStreamingBackendPx4::stop_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
79
{
NEW
80
    MavlinkCommandSender::CommandLong command{};
×
NEW
81
    command.command = MAV_CMD_LOGGING_STOP;
×
82

NEW
83
    _system_impl->send_command_async(command, [=](MavlinkCommandSender::Result result, float) {
×
NEW
84
        if (result != MavlinkCommandSender::Result::InProgress) {
×
NEW
85
            if (callback) {
×
NEW
86
                _system_impl->call_user_callback(
×
87
                    [=]() { callback(log_streaming_result_from_command_result(result)); });
88
            }
89
        }
NEW
90
    });
×
91

NEW
92
    std::lock_guard<std::mutex> lock(_mutex);
×
NEW
93
    reset();
×
NEW
94
    _active = false;
×
NEW
95
}
×
96

NEW
97
void LogStreamingBackendPx4::reset()
×
98
{
99
    // Assume we have lock
NEW
100
    _current_sequence = 0;
×
NEW
101
    _drop_state = DropState::Unknown;
×
NEW
102
}
×
103

NEW
104
void LogStreamingBackendPx4::process_logging_data(const mavlink_message_t& message)
×
105
{
NEW
106
    if (!_active) {
×
NEW
107
        if (_debugging) {
×
NEW
108
            LogDebug() << "Ignoring logging data because we're not active";
×
109
        }
NEW
110
        return;
×
111
    }
112

113
    mavlink_logging_data_t logging_data;
NEW
114
    mavlink_msg_logging_data_decode(&message, &logging_data);
×
115

NEW
116
    if (logging_data.target_system != _system_impl->get_own_system_id() ||
×
NEW
117
        logging_data.target_component != _system_impl->get_own_component_id()) {
×
NEW
118
        LogWarn() << "Logging data with wrong target " << std::to_string(logging_data.target_system)
×
NEW
119
                  << '/' << std::to_string(logging_data.target_component) << " instead of "
×
NEW
120
                  << std::to_string(_system_impl->get_own_system_id()) << '/'
×
NEW
121
                  << std::to_string(_system_impl->get_own_component_id());
×
NEW
122
        return;
×
123
    }
124

NEW
125
    if (_debugging) {
×
NEW
126
        LogDebug() << "Received logging data with len: " << std::to_string(logging_data.length)
×
NEW
127
                   << ", first message: " << std::to_string(logging_data.first_message_offset)
×
NEW
128
                   << ", sequence: " << logging_data.sequence;
×
129
    }
130

NEW
131
    if (logging_data.length > sizeof(logging_data.data)) {
×
NEW
132
        LogWarn() << "Invalid length";
×
NEW
133
        return;
×
134
    }
135

NEW
136
    if (logging_data.first_message_offset != 255 &&
×
NEW
137
        logging_data.first_message_offset > sizeof(logging_data.data)) {
×
NEW
138
        LogWarn() << "Invalid first_message_offset";
×
NEW
139
        return;
×
140
    }
141

NEW
142
    std::lock_guard<std::mutex> lock(_mutex);
×
NEW
143
    if (is_duplicate(logging_data.sequence)) {
×
NEW
144
        return;
×
145
    }
NEW
146
    check_drop_state(logging_data.sequence, logging_data.first_message_offset);
×
147

NEW
148
    switch (_drop_state) {
×
NEW
149
        case DropState::Ok:
×
150
            // All data can be used.
NEW
151
            if (logging_data.first_message_offset == 255) {
×
152
                // This is just part that we add for now.
NEW
153
                _ulog_data.insert(
×
NEW
154
                    _ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);
×
155

156
            } else {
157
                // Finish the previous message.
NEW
158
                _ulog_data.insert(
×
NEW
159
                    _ulog_data.end(),
×
160
                    logging_data.data,
NEW
161
                    logging_data.data + logging_data.first_message_offset);
×
NEW
162
                process_message();
×
163

NEW
164
                _ulog_data.clear();
×
165
                // Then start the next one.
NEW
166
                _ulog_data.insert(
×
NEW
167
                    _ulog_data.end(),
×
NEW
168
                    logging_data.data + logging_data.first_message_offset,
×
NEW
169
                    logging_data.data + logging_data.length);
×
170
            }
NEW
171
            break;
×
172

NEW
173
        case DropState::Dropped:
×
174
            // Nothing to do with the partial message.
NEW
175
            _ulog_data.clear();
×
NEW
176
            break;
×
177

NEW
178
        case DropState::RecoveringFromDropped:
×
179
            // Nothing to do with any partial message.
NEW
180
            _ulog_data.clear();
×
181
            // Now start fresh.
NEW
182
            _ulog_data.insert(
×
NEW
183
                _ulog_data.end(),
×
NEW
184
                logging_data.data + logging_data.first_message_offset,
×
NEW
185
                logging_data.data + logging_data.length);
×
NEW
186
            break;
×
NEW
187
        case DropState::Unknown:
×
NEW
188
            LogErr() << "Logical error";
×
NEW
189
            break;
×
190
    }
NEW
191
}
×
192

NEW
193
void LogStreamingBackendPx4::process_logging_data_acked(const mavlink_message_t& message)
×
194
{
NEW
195
    if (!_active) {
×
NEW
196
        if (_debugging) {
×
NEW
197
            LogDebug() << "Ignoring logging data acked because we're not active";
×
198
        }
NEW
199
        return;
×
200
    }
201

202
    mavlink_logging_data_acked_t logging_data_acked;
NEW
203
    mavlink_msg_logging_data_acked_decode(&message, &logging_data_acked);
×
204

NEW
205
    if (logging_data_acked.target_system != _system_impl->get_own_system_id() ||
×
NEW
206
        logging_data_acked.target_component != _system_impl->get_own_component_id()) {
×
NEW
207
        LogWarn() << "Logging data acked with wrong target "
×
NEW
208
                  << std::to_string(logging_data_acked.target_system) << '/'
×
NEW
209
                  << std::to_string(logging_data_acked.target_component) << " instead of "
×
NEW
210
                  << std::to_string(_system_impl->get_own_system_id()) << '/'
×
NEW
211
                  << std::to_string(_system_impl->get_own_component_id());
×
NEW
212
        return;
×
213
    }
214

NEW
215
    _system_impl->queue_message(
×
NEW
216
        [target_system = message.sysid,
×
NEW
217
         target_component = message.compid,
×
NEW
218
         sequence = logging_data_acked.sequence](MavlinkAddress mavlink_address, uint8_t channel) {
×
219
            mavlink_message_t response_message;
NEW
220
            mavlink_msg_logging_ack_pack_chan(
×
NEW
221
                mavlink_address.system_id,
×
NEW
222
                mavlink_address.component_id,
×
223
                channel,
224
                &response_message,
225
                target_system,
226
                target_component,
227
                sequence);
NEW
228
            return response_message;
×
229
        });
230

NEW
231
    if (_debugging) {
×
NEW
232
        LogInfo() << "Received logging data acked with len: "
×
NEW
233
                  << std::to_string(logging_data_acked.length)
×
NEW
234
                  << ", first message: " << std::to_string(logging_data_acked.first_message_offset)
×
NEW
235
                  << ", sequence: " << logging_data_acked.sequence;
×
236
    }
237

NEW
238
    if (logging_data_acked.length > sizeof(logging_data_acked.data)) {
×
NEW
239
        LogWarn() << "Invalid length";
×
NEW
240
        return;
×
241
    }
242

NEW
243
    if (logging_data_acked.first_message_offset != 255 &&
×
NEW
244
        logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
×
NEW
245
        LogWarn() << "Invalid first_message_offset";
×
NEW
246
        return;
×
247
    }
248

NEW
249
    std::lock_guard<std::mutex> lock(_mutex);
×
NEW
250
    if (is_duplicate(logging_data_acked.sequence)) {
×
NEW
251
        return;
×
252
    }
NEW
253
    check_drop_state(logging_data_acked.sequence, logging_data_acked.first_message_offset);
×
254

NEW
255
    switch (_drop_state) {
×
NEW
256
        case DropState::Ok:
×
257
            // All data can be used.
NEW
258
            if (logging_data_acked.first_message_offset == 255) {
×
259
                // This is just part that we add for now.
NEW
260
                _ulog_data.insert(
×
NEW
261
                    _ulog_data.end(),
×
262
                    logging_data_acked.data,
NEW
263
                    logging_data_acked.data + logging_data_acked.length);
×
264

265
            } else {
266
                // Finish the previous message.
NEW
267
                _ulog_data.insert(
×
NEW
268
                    _ulog_data.end(),
×
269
                    logging_data_acked.data,
NEW
270
                    logging_data_acked.data + logging_data_acked.first_message_offset);
×
NEW
271
                process_message();
×
272

NEW
273
                _ulog_data.clear();
×
274
                // Then start the next one.
NEW
275
                _ulog_data.insert(
×
NEW
276
                    _ulog_data.end(),
×
NEW
277
                    logging_data_acked.data + logging_data_acked.first_message_offset,
×
NEW
278
                    logging_data_acked.data + logging_data_acked.length);
×
279
            }
NEW
280
            break;
×
281

NEW
282
        case DropState::Dropped:
×
283
            // Nothing to do with the partial message.
NEW
284
            _ulog_data.clear();
×
NEW
285
            break;
×
286

NEW
287
        case DropState::RecoveringFromDropped:
×
288
            // Nothing to do with any partial message.
NEW
289
            _ulog_data.clear();
×
290
            // Now start fresh.
NEW
291
            _ulog_data.insert(
×
NEW
292
                _ulog_data.end(),
×
NEW
293
                logging_data_acked.data + logging_data_acked.first_message_offset,
×
NEW
294
                logging_data_acked.data + logging_data_acked.length);
×
NEW
295
            break;
×
NEW
296
        case DropState::Unknown:
×
NEW
297
            LogErr() << "Logical error";
×
NEW
298
            break;
×
299
    }
NEW
300
}
×
301

NEW
302
bool LogStreamingBackendPx4::is_duplicate(uint16_t sequence) const
×
303
{
304
    // Assume we have lock
NEW
305
    return _drop_state != DropState::Unknown && sequence == _current_sequence;
×
306
}
307

NEW
308
void LogStreamingBackendPx4::check_drop_state(uint16_t sequence, uint8_t first_message_offset)
×
309
{
310
    // Assume we have lock.
311

NEW
312
    switch (_drop_state) {
×
NEW
313
        case DropState::Dropped:
×
NEW
314
            if (first_message_offset != 255) {
×
315
                // This is the first time we use the sequence.
NEW
316
                _current_sequence = sequence;
×
NEW
317
                _drop_state = DropState::RecoveringFromDropped;
×
318
            } else {
NEW
319
                _drop_state = DropState::Dropped;
×
320
            }
NEW
321
            break;
×
322

NEW
323
        case DropState::Unknown:
×
NEW
324
            _drop_state = DropState::Ok;
×
NEW
325
            _current_sequence = sequence;
×
NEW
326
            break;
×
NEW
327
        case DropState::Ok:
×
328
        case DropState::RecoveringFromDropped:
329
            uint16_t drop;
NEW
330
            if (sequence > _current_sequence) {
×
331
                // No wrap around.
NEW
332
                drop = (sequence - 1 - _current_sequence);
×
NEW
333
                _drops += drop;
×
NEW
334
                if (drop > 0 && _debugging) {
×
NEW
335
                    LogDebug() << "Dropped: " << drop << " (no wrap around), overall: " << _drops;
×
336
                }
337

338
            } else {
339
                // Wrap around!
NEW
340
                drop = (sequence + std::numeric_limits<uint16_t>::max() - 1 - _current_sequence);
×
NEW
341
                _drops += drop;
×
NEW
342
                if (drop > 0 && _debugging) {
×
NEW
343
                    LogDebug() << "Dropped: " << drop << " (with wrap around), overall: " << _drops;
×
344
                }
345
            }
346

NEW
347
            _current_sequence = sequence;
×
348

NEW
349
            if (drop > 0) {
×
NEW
350
                if (first_message_offset == 255) {
×
NEW
351
                    _drop_state = DropState::Dropped;
×
352
                } else {
NEW
353
                    _drop_state = DropState::RecoveringFromDropped;
×
354
                }
355
            } else {
NEW
356
                _drop_state = DropState::Ok;
×
357
            }
NEW
358
            break;
×
359
    }
NEW
360
}
×
361

NEW
362
void LogStreamingBackendPx4::process_message()
×
363
{
364
    // Assumes lock.
365

NEW
366
    if (_debugging) {
×
NEW
367
        LogDebug() << "Processing ulog message with size " << _ulog_data.size();
×
368
    }
369

370
    // We don't check the magic and version. That's up to the log viewer to parse.
371

372
    // Let's pass it to the user via the callback.
NEW
373
    if (_data_callback) {
×
NEW
374
        _system_impl->call_user_callback([this, data = _ulog_data]() { _data_callback(data); });
×
375
    }
NEW
376
}
×
377

NEW
378
LogStreaming::Result LogStreamingBackendPx4::log_streaming_result_from_command_result(
×
379
    MavlinkCommandSender::Result result)
380
{
NEW
381
    switch (result) {
×
NEW
382
        case MavlinkCommandSender::Result::Success:
×
NEW
383
            return LogStreaming::Result::Success;
×
NEW
384
        case MavlinkCommandSender::Result::NoSystem:
×
NEW
385
            return LogStreaming::Result::NoSystem;
×
NEW
386
        case MavlinkCommandSender::Result::ConnectionError:
×
NEW
387
            return LogStreaming::Result::ConnectionError;
×
NEW
388
        case MavlinkCommandSender::Result::Busy:
×
NEW
389
            return LogStreaming::Result::Busy;
×
NEW
390
        case MavlinkCommandSender::Result::Denied:
×
NEW
391
            return LogStreaming::Result::CommandDenied;
×
NEW
392
        case MavlinkCommandSender::Result::Unsupported:
×
NEW
393
            return LogStreaming::Result::Unsupported;
×
NEW
394
        case MavlinkCommandSender::Result::Timeout:
×
NEW
395
            return LogStreaming::Result::Timeout;
×
NEW
396
        case MavlinkCommandSender::Result::InProgress:
×
397
        case MavlinkCommandSender::Result::TemporarilyRejected:
398
        case MavlinkCommandSender::Result::Failed:
399
        case MavlinkCommandSender::Result::Cancelled:
400
        case MavlinkCommandSender::Result::UnknownError:
401
        default:
NEW
402
            return LogStreaming::Result::Unknown;
×
403
    }
404
}
405

406
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc