• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 11767930807

10 Nov 2024 07:33PM UTC coverage: 38.608% (+0.7%) from 37.921%
11767930807

push

github

web-flow
Merge pull request #2394 from mavlink/pr-consolidate-ci

Consolidate CI

12030 of 31159 relevant lines covered (38.61%)

243.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp
1
#include <future>
2
#include <limits>
3

4
#include "log_streaming_impl.h"
5
#include "plugins/log_streaming/log_streaming.h"
6
#include "callback_list.tpp"
7
#include "base64.h"
8

9
// Note: the implementation below is mostly inspired by the pymavlink
10
//       implementation written by Beat Küng:
11
//       https://github.com/PX4/PX4-Autopilot/blob/main/Tools/mavlink_ulog_streaming.py
12
//
13
// For more information about the ulog protocol, visit:
14
// https://docs.px4.io/main/en/dev_log/ulog_file_format.html
15

16
namespace mavsdk {
17

18
template class CallbackList<LogStreaming::LogStreamingRaw>;
19

20
LogStreamingImpl::LogStreamingImpl(System& system) : PluginImplBase(system)
×
21
{
22
    _system_impl->register_plugin(this);
×
23
}
×
24

25
LogStreamingImpl::LogStreamingImpl(std::shared_ptr<System> system) :
×
26
    PluginImplBase(std::move(system))
×
27
{
28
    _system_impl->register_plugin(this);
×
29
}
×
30

31
LogStreamingImpl::~LogStreamingImpl()
×
32
{
33
    _system_impl->unregister_plugin(this);
×
34
}
×
35

36
void LogStreamingImpl::init()
×
37
{
38
    if (const char* env_p = std::getenv("MAVSDK_LOG_STREAMING_DEBUGGING")) {
×
39
        if (std::string(env_p) == "1") {
×
40
            LogDebug() << "Log streaming debugging is on.";
×
41
            _debugging = true;
×
42
        }
43
    }
44

45
    _system_impl->register_mavlink_message_handler(
×
46
        MAVLINK_MSG_ID_LOGGING_DATA,
47
        [this](const mavlink_message_t& message) { process_logging_data(message); },
×
48
        this);
49
    _system_impl->register_mavlink_message_handler(
×
50
        MAVLINK_MSG_ID_LOGGING_DATA_ACKED,
51
        [this](const mavlink_message_t& message) { process_logging_data_acked(message); },
×
52
        this);
53
}
×
54

55
void LogStreamingImpl::deinit()
×
56
{
57
    auto is_active = [this]() {
×
58
        std::lock_guard<std::mutex> lock(_mutex);
×
59
        return _active;
×
60
    }();
×
61

62
    if (is_active) {
×
63
        // We try to stop log streaming before we leave but without waiting for a result.
64
        stop_log_streaming_async(nullptr);
×
65
    }
66
}
×
67

68
void LogStreamingImpl::enable() {}
×
69

70
void LogStreamingImpl::disable() {}
×
71

72
void LogStreamingImpl::start_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
73
{
74
    {
75
        std::lock_guard<std::mutex> lock(_mutex);
×
76
        reset();
×
77
        _active = true;
×
78
    }
×
79

80
    MavlinkCommandSender::CommandLong command{};
×
81
    command.command = MAV_CMD_LOGGING_START;
×
82
    command.params.maybe_param1 = 0.0f; // ulog
×
83

84
    _system_impl->send_command_async(command, [=](MavlinkCommandSender::Result result, float) {
×
85
        if (result != MavlinkCommandSender::Result::InProgress) {
×
86
            if (callback) {
×
87
                _system_impl->call_user_callback(
×
88
                    [=]() { callback(log_streaming_result_from_command_result(result)); });
89
            }
90
        }
91
    });
×
92
}
×
93

94
LogStreaming::Result LogStreamingImpl::start_log_streaming()
×
95
{
96
    auto prom = std::promise<LogStreaming::Result>{};
×
97
    auto fut = prom.get_future();
×
98

99
    start_log_streaming_async([&](LogStreaming::Result result) { prom.set_value(result); });
×
100

101
    return fut.get();
×
102
}
×
103

104
void LogStreamingImpl::stop_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
105
{
106
    MavlinkCommandSender::CommandLong command{};
×
107
    command.command = MAV_CMD_LOGGING_STOP;
×
108

109
    _system_impl->send_command_async(command, [=](MavlinkCommandSender::Result result, float) {
×
110
        if (result != MavlinkCommandSender::Result::InProgress) {
×
111
            if (callback) {
×
112
                _system_impl->call_user_callback(
×
113
                    [=]() { callback(log_streaming_result_from_command_result(result)); });
114
            }
115
        }
116
    });
×
117

118
    std::lock_guard<std::mutex> lock(_mutex);
×
119
    reset();
×
120
    _active = false;
×
121
}
×
122

123
LogStreaming::Result LogStreamingImpl::stop_log_streaming()
×
124
{
125
    auto prom = std::promise<LogStreaming::Result>{};
×
126
    auto fut = prom.get_future();
×
127

128
    stop_log_streaming_async([&](LogStreaming::Result result) { prom.set_value(result); });
×
129

130
    return fut.get();
×
131
}
×
132

133
LogStreaming::LogStreamingRawHandle
134
LogStreamingImpl::subscribe_log_streaming_raw(const LogStreaming::LogStreamingRawCallback& callback)
×
135
{
136
    std::lock_guard<std::mutex> lock(_mutex);
×
137
    auto handle = _subscription_callbacks.subscribe(callback);
×
138

139
    return handle;
×
140
}
×
141

142
void LogStreamingImpl::unsubscribe_log_streaming_raw(LogStreaming::LogStreamingRawHandle handle)
×
143
{
144
    std::lock_guard<std::mutex> lock(_mutex);
×
145
    _subscription_callbacks.unsubscribe(handle);
×
146
}
×
147

148
void LogStreamingImpl::reset()
×
149
{
150
    // Assume we have lock
151

152
    _current_sequence = 0;
×
153
    _drop_state = DropState::Unknown;
×
154
}
×
155

156
void LogStreamingImpl::process_logging_data(const mavlink_message_t& message)
×
157
{
158
    if (!_active) {
×
159
        if (_debugging) {
×
160
            LogDebug() << "Ignoring logging data because we're not active";
×
161
        }
162
        return;
×
163
    }
164

165
    mavlink_logging_data_t logging_data;
×
166
    mavlink_msg_logging_data_decode(&message, &logging_data);
×
167

168
    if (logging_data.target_system != _system_impl->get_own_system_id() ||
×
169
        logging_data.target_component != _system_impl->get_own_component_id()) {
×
170
        LogWarn() << "Logging data with wrong target " << std::to_string(logging_data.target_system)
×
171
                  << '/' << std::to_string(logging_data.target_component) << " instead of "
×
172
                  << std::to_string(_system_impl->get_own_system_id()) << '/'
×
173
                  << std::to_string(_system_impl->get_own_component_id());
×
174
        return;
×
175
    }
176

177
    if (_debugging) {
×
178
        LogDebug() << "Received logging data with len: " << std::to_string(logging_data.length)
×
179

180
                   << ", first message: " << std::to_string(logging_data.first_message_offset)
×
181
                   << ", sequence: " << logging_data.sequence;
×
182
    }
183

184
    if (logging_data.length > sizeof(logging_data.data)) {
×
185
        LogWarn() << "Invalid length";
×
186
        return;
×
187
    }
188

189
    if (logging_data.first_message_offset != 255 &&
×
190
        logging_data.first_message_offset > sizeof(logging_data.data)) {
×
191
        LogWarn() << "Invalid first_message_offset";
×
192
        return;
×
193
    }
194

195
    std::lock_guard<std::mutex> lock(_mutex);
×
196
    if (is_duplicate(logging_data.sequence)) {
×
197
        return;
×
198
    }
199
    check_drop_state(logging_data.sequence, logging_data.first_message_offset);
×
200

201
    switch (_drop_state) {
×
202
        case DropState::Ok:
×
203
            // All data can be used.
204
            if (logging_data.first_message_offset == 255) {
×
205
                // This is just part that we add for now.
206
                _ulog_data.insert(
×
207
                    _ulog_data.end(), logging_data.data, logging_data.data + logging_data.length);
×
208

209
            } else {
210
                // Finish the previous message.
211
                _ulog_data.insert(
×
212
                    _ulog_data.end(),
×
213
                    logging_data.data,
214
                    logging_data.data + logging_data.first_message_offset);
×
215
                process_message();
×
216

217
                _ulog_data.clear();
×
218
                // Then start the next one.
219
                _ulog_data.insert(
×
220
                    _ulog_data.end(),
×
221
                    logging_data.data + logging_data.first_message_offset,
×
222
                    logging_data.data + logging_data.length);
×
223
            }
224
            break;
×
225

226
        case DropState::Dropped:
×
227
            // Nothing to do with the partial message.
228
            _ulog_data.clear();
×
229
            break;
×
230

231
        case DropState::RecoveringFromDropped:
×
232
            // Nothing to do with any partial message.
233
            _ulog_data.clear();
×
234
            // Now start fresh.
235
            _ulog_data.insert(
×
236
                _ulog_data.end(),
×
237
                logging_data.data + logging_data.first_message_offset,
×
238
                logging_data.data + logging_data.length);
×
239
            break;
×
240
        case DropState::Unknown:
×
241
            LogErr() << "Logical error";
×
242
            break;
×
243
    }
244
}
×
245

246
void LogStreamingImpl::process_logging_data_acked(const mavlink_message_t& message)
×
247
{
248
    if (!_active) {
×
249
        if (_debugging) {
×
250
            LogDebug() << "Ignoring logging data acked because we're not active";
×
251
        }
252
        return;
×
253
    }
254

255
    mavlink_logging_data_acked_t logging_data_acked;
×
256
    mavlink_msg_logging_data_acked_decode(&message, &logging_data_acked);
×
257

258
    if (logging_data_acked.target_system != _system_impl->get_own_system_id() ||
×
259
        logging_data_acked.target_component != _system_impl->get_own_component_id()) {
×
260
        LogWarn() << "Logging data acked with wrong target "
×
261
                  << std::to_string(logging_data_acked.target_system) << '/'
×
262
                  << std::to_string(logging_data_acked.target_component) << " instead of "
×
263
                  << std::to_string(_system_impl->get_own_system_id()) << '/'
×
264
                  << std::to_string(_system_impl->get_own_component_id());
×
265
        return;
×
266
    }
267

268
    _system_impl->queue_message(
×
269
        [target_system = message.sysid,
×
270
         target_component = message.compid,
×
271
         sequence = logging_data_acked.sequence](MavlinkAddress mavlink_address, uint8_t channel) {
×
272
            mavlink_message_t response_message;
273
            mavlink_msg_logging_ack_pack_chan(
×
274
                mavlink_address.system_id,
×
275
                mavlink_address.component_id,
×
276
                channel,
277
                &response_message,
278
                target_system,
279
                target_component,
280
                sequence);
281
            return response_message;
×
282
        });
283

284
    if (_debugging) {
×
285
        LogInfo() << "Received logging data acked with len: "
×
286
                  << std::to_string(logging_data_acked.length)
×
287
                  << ", first message: " << std::to_string(logging_data_acked.first_message_offset)
×
288
                  << ", sequence: " << logging_data_acked.sequence;
×
289
    }
290

291
    if (logging_data_acked.length > sizeof(logging_data_acked.data)) {
×
292
        LogWarn() << "Invalid length";
×
293
        return;
×
294
    }
295

296
    if (logging_data_acked.first_message_offset != 255 &&
×
297
        logging_data_acked.first_message_offset > sizeof(logging_data_acked.data)) {
×
298
        LogWarn() << "Invalid first_message_offset";
×
299
        return;
×
300
    }
301

302
    std::lock_guard<std::mutex> lock(_mutex);
×
303
    if (is_duplicate(logging_data_acked.sequence)) {
×
304
        return;
×
305
    }
306
    check_drop_state(logging_data_acked.sequence, logging_data_acked.first_message_offset);
×
307

308
    switch (_drop_state) {
×
309
        case DropState::Ok:
×
310
            // All data can be used.
311
            if (logging_data_acked.first_message_offset == 255) {
×
312
                // This is just part that we add for now.
313
                _ulog_data.insert(
×
314
                    _ulog_data.end(),
×
315
                    logging_data_acked.data,
316
                    logging_data_acked.data + logging_data_acked.length);
×
317

318
            } else {
319
                // Finish the previous message.
320
                _ulog_data.insert(
×
321
                    _ulog_data.end(),
×
322
                    logging_data_acked.data,
323
                    logging_data_acked.data + logging_data_acked.first_message_offset);
×
324
                process_message();
×
325

326
                _ulog_data.clear();
×
327
                // Then start the next one.
328
                _ulog_data.insert(
×
329
                    _ulog_data.end(),
×
330
                    logging_data_acked.data + logging_data_acked.first_message_offset,
×
331
                    logging_data_acked.data + logging_data_acked.length);
×
332
            }
333
            break;
×
334

335
        case DropState::Dropped:
×
336
            // Nothing to do with the partial message.
337
            _ulog_data.clear();
×
338
            break;
×
339

340
        case DropState::RecoveringFromDropped:
×
341
            // Nothing to do with any partial message.
342
            _ulog_data.clear();
×
343
            // Now start fresh.
344
            _ulog_data.insert(
×
345
                _ulog_data.end(),
×
346
                logging_data_acked.data + logging_data_acked.first_message_offset,
×
347
                logging_data_acked.data + logging_data_acked.length);
×
348
            break;
×
349
        case DropState::Unknown:
×
350
            LogErr() << "Logical error";
×
351
            break;
×
352
    }
353
}
×
354

355
bool LogStreamingImpl::is_duplicate(uint16_t sequence) const
×
356
{
357
    // Assume we have lock
358
    return _drop_state != DropState::Unknown && sequence == _current_sequence;
×
359
}
360

361
void LogStreamingImpl::check_drop_state(uint16_t sequence, uint8_t first_message_offset)
×
362
{
363
    // Assume we have lock.
364

365
    switch (_drop_state) {
×
366
        case DropState::Dropped:
×
367
            if (first_message_offset != 255) {
×
368
                // This is the first time we use the sequence.
369
                _current_sequence = sequence;
×
370
                _drop_state = DropState::RecoveringFromDropped;
×
371
            } else {
372
                _drop_state = DropState::Dropped;
×
373
            }
374
            break;
×
375

376
        case DropState::Unknown:
×
377
            _drop_state = DropState::Ok;
×
378
            _current_sequence = sequence;
×
379
            break;
×
380
        case DropState::Ok:
×
381
        case DropState::RecoveringFromDropped:
382
            uint16_t drop;
×
383
            if (sequence > _current_sequence) {
×
384
                // No wrap around.
385
                drop = (sequence - 1 - _current_sequence);
×
386
                _drops += drop;
×
387
                if (drop > 0 && _debugging) {
×
388
                    LogDebug() << "Dropped: " << drop << " (no wrap around), overall: " << _drops;
×
389
                }
390

391
            } else {
392
                // Wrap around!
393
                drop = (sequence + std::numeric_limits<uint16_t>::max() - 1 - _current_sequence);
×
394
                _drops += drop;
×
395
                if (drop > 0 && _debugging) {
×
396
                    LogDebug() << "Dropped: " << drop << " (with wrap around), overall: " << _drops;
×
397
                }
398
            }
399

400
            _current_sequence = sequence;
×
401

402
            if (drop > 0) {
×
403
                if (first_message_offset == 255) {
×
404
                    _drop_state = DropState::Dropped;
×
405
                } else {
406
                    _drop_state = DropState::RecoveringFromDropped;
×
407
                }
408
            } else {
409
                _drop_state = DropState::Ok;
×
410
            }
411
            break;
×
412
    }
413
}
×
414

415
void LogStreamingImpl::process_message()
×
416
{
417
    // Assumes lock.
418

419
    if (_debugging) {
×
420
        LogDebug() << "Processing ulog message with size " << _ulog_data.size();
×
421
    }
422

423
    // We don't check the magic and version. That's up to the log viewer to parse.
424

425
    // Convert to base64
426
    LogStreaming::LogStreamingRaw part;
×
427
    part.data_base64 = base64_encode(_ulog_data);
×
428

429
    // Let's pass it to the user.
430
    if (!_subscription_callbacks.empty()) {
×
431
        _subscription_callbacks.queue(
×
432
            part, [this](const auto& func) { _system_impl->call_user_callback(func); });
×
433
    }
434
}
×
435

436
LogStreaming::Result
437
LogStreamingImpl::log_streaming_result_from_command_result(MavlinkCommandSender::Result result)
×
438
{
439
    switch (result) {
×
440
        case MavlinkCommandSender::Result::Success:
×
441
            return LogStreaming::Result::Success;
×
442
        case MavlinkCommandSender::Result::NoSystem:
×
443
            return LogStreaming::Result::NoSystem;
×
444
        case MavlinkCommandSender::Result::ConnectionError:
×
445
            return LogStreaming::Result::ConnectionError;
×
446
        case MavlinkCommandSender::Result::Busy:
×
447
            return LogStreaming::Result::Busy;
×
448
        case MavlinkCommandSender::Result::Denied:
×
449
            return LogStreaming::Result::CommandDenied;
×
450
        case MavlinkCommandSender::Result::Unsupported:
×
451
            return LogStreaming::Result::Unsupported;
×
452
        case MavlinkCommandSender::Result::Timeout:
×
453
            return LogStreaming::Result::Timeout;
×
454
        case MavlinkCommandSender::Result::InProgress:
×
455
        case MavlinkCommandSender::Result::TemporarilyRejected:
456
        case MavlinkCommandSender::Result::Failed:
457
        case MavlinkCommandSender::Result::Cancelled:
458
        case MavlinkCommandSender::Result::UnknownError:
459
        default:
460
            return LogStreaming::Result::Unknown;
×
461
    }
462
}
463

464
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc