• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 20604212380

30 Dec 2025 07:24PM UTC coverage: 48.017% (-0.06%) from 48.078%
20604212380

Pull #2746

github

web-flow
Merge 5269e5f1e into 5d2947b34
Pull Request #2746: Configuration and component cleanup

47 of 158 new or added lines in 19 files covered. (29.75%)

21 existing lines in 9 files now uncovered.

17769 of 37006 relevant lines covered (48.02%)

483.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/mavsdk/plugins/log_streaming/log_streaming_impl.cpp
1
#include <future>
2

3
#include "log_streaming_impl.h"
4
#include "log_streaming_backend_px4.h"
5
#include "log_streaming_backend_ardupilot.h"
6
#include "plugins/log_streaming/log_streaming.h"
7
#include "callback_list.tpp"
8
#include "base64.h"
9

10
namespace mavsdk {
11

12
template class CallbackList<LogStreaming::LogStreamingRaw>;
13

14
LogStreamingImpl::LogStreamingImpl(System& system) : PluginImplBase(system)
×
15
{
16
    _system_impl->register_plugin(this);
×
17
}
×
18

19
LogStreamingImpl::LogStreamingImpl(std::shared_ptr<System> system) :
×
20
    PluginImplBase(std::move(system))
×
21
{
22
    _system_impl->register_plugin(this);
×
23
}
×
24

25
LogStreamingImpl::~LogStreamingImpl()
×
26
{
27
    _system_impl->unregister_plugin(this);
×
28
}
×
29

30
void LogStreamingImpl::init()
×
31
{
32
    if (const char* env_p = std::getenv("MAVSDK_LOG_STREAMING_DEBUGGING")) {
×
33
        if (std::string(env_p) == "1") {
×
34
            LogDebug() << "Log streaming debugging is on.";
×
35
            _debugging = true;
×
36
        }
37
    }
38
}
×
39

40
void LogStreamingImpl::deinit()
×
41
{
42
    std::lock_guard<std::mutex> lock(_mutex);
×
43

44
    // Cancel any pending autopilot type polling
45
    if (_check_autopilot_cookie) {
×
46
        _system_impl->remove_call_every(_check_autopilot_cookie);
×
47
        _check_autopilot_cookie = {};
×
48
    }
49
    _start_callback = nullptr;
×
50

51
    if (_backend) {
×
52
        _backend->deinit();
×
53
        _backend.reset();
×
54
    }
55
}
×
56

57
void LogStreamingImpl::enable() {}
×
58

59
void LogStreamingImpl::disable()
×
60
{
61
    std::lock_guard<std::mutex> lock(_mutex);
×
62
    if (_backend) {
×
63
        _backend->deinit();
×
64
        _backend.reset();
×
65
    }
66
}
×
67

68
bool LogStreamingImpl::maybe_create_backend()
×
69
{
70
    // Already have a backend
71
    if (_backend) {
×
72
        return true;
×
73
    }
74

NEW
75
    auto autopilot = _system_impl->effective_autopilot();
×
76

77
    // Don't create backend yet if autopilot type is unknown
78
    if (autopilot == Autopilot::Unknown) {
×
79
        if (_debugging) {
×
80
            LogDebug() << "Autopilot type unknown, cannot create backend yet";
×
81
        }
82
        return false;
×
83
    }
84

85
    if (autopilot == Autopilot::ArduPilot) {
×
86
        if (_debugging) {
×
87
            LogDebug() << "Creating ArduPilot log streaming backend";
×
88
        }
89
        _backend = std::make_unique<LogStreamingBackendArdupilot>();
×
90
    } else {
91
        if (_debugging) {
×
92
            LogDebug() << "Creating PX4 log streaming backend";
×
93
        }
94
        _backend = std::make_unique<LogStreamingBackendPx4>();
×
95
    }
96

97
    _backend->set_debugging(_debugging);
×
98
    _backend->init(_system_impl.get());
×
99
    _backend->set_data_callback([this](const std::vector<uint8_t>& data) { process_data(data); });
×
100
    return true;
×
101
}
102

103
void LogStreamingImpl::process_data(const std::vector<uint8_t>& data)
×
104
{
105
    std::lock_guard<std::mutex> lock(_mutex);
×
106

107
    if (_debugging) {
×
108
        LogDebug() << "Processing log data with size " << data.size();
×
109
    }
110

111
    // Convert to base64
112
    LogStreaming::LogStreamingRaw part;
×
113
    std::vector<uint8_t> data_copy = data;
×
114
    part.data_base64 = base64_encode(data_copy);
×
115

116
    // Let's pass it to the user.
117
    if (!_subscription_callbacks.empty()) {
×
118
        _subscription_callbacks.queue(
×
119
            part, [this](const auto& func) { _system_impl->call_user_callback(func); });
×
120
    }
121
}
×
122

123
void LogStreamingImpl::start_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
124
{
125
    std::lock_guard<std::mutex> lock(_mutex);
×
126

127
    // Try to create backend if not yet created
128
    if (maybe_create_backend()) {
×
129
        _backend->start_log_streaming_async(callback);
×
130
        return;
×
131
    }
132

133
    // Autopilot type unknown - wait for it with polling.
134
    // Use timeout_s() for the total wait time, polling every 0.1s.
135
    const float poll_interval_s = 0.1f;
×
136
    const unsigned max_polls = static_cast<unsigned>(_system_impl->timeout_s() / poll_interval_s);
×
137

138
    _start_callback = callback;
×
139
    _autopilot_poll_count = 0;
×
140

141
    _check_autopilot_cookie = _system_impl->add_call_every(
×
142
        [this, max_polls]() {
×
143
            std::lock_guard<std::mutex> lock2(_mutex);
×
144

145
            if (maybe_create_backend()) {
×
146
                // Success - stop polling and start streaming
147
                _system_impl->remove_call_every(_check_autopilot_cookie);
×
148
                _backend->start_log_streaming_async(_start_callback);
×
149
                _start_callback = nullptr;
×
150
                return;
×
151
            }
152

153
            _autopilot_poll_count++;
×
154
            if (_autopilot_poll_count >= max_polls) {
×
155
                // Timeout - stop polling and report failure
156
                _system_impl->remove_call_every(_check_autopilot_cookie);
×
157
                if (_start_callback) {
×
158
                    auto cb = _start_callback;
×
159
                    _start_callback = nullptr;
×
160
                    _system_impl->call_user_callback(
×
161
                        [cb]() { cb(LogStreaming::Result::NoSystem); });
162
                }
×
163
            }
164
        },
×
165
        poll_interval_s);
166
}
×
167

168
LogStreaming::Result LogStreamingImpl::start_log_streaming()
×
169
{
170
    auto prom = std::promise<LogStreaming::Result>{};
×
171
    auto fut = prom.get_future();
×
172

173
    start_log_streaming_async([&](LogStreaming::Result result) { prom.set_value(result); });
×
174

175
    return fut.get();
×
176
}
×
177

178
void LogStreamingImpl::stop_log_streaming_async(const LogStreaming::ResultCallback& callback)
×
179
{
180
    std::lock_guard<std::mutex> lock(_mutex);
×
181

182
    if (!_backend) {
×
183
        if (callback) {
×
184
            _system_impl->call_user_callback(
×
185
                [callback]() { callback(LogStreaming::Result::NoSystem); });
186
        }
187
        return;
×
188
    }
189

190
    _backend->stop_log_streaming_async(callback);
×
191
}
×
192

193
LogStreaming::Result LogStreamingImpl::stop_log_streaming()
×
194
{
195
    auto prom = std::promise<LogStreaming::Result>{};
×
196
    auto fut = prom.get_future();
×
197

198
    stop_log_streaming_async([&](LogStreaming::Result result) { prom.set_value(result); });
×
199

200
    return fut.get();
×
201
}
×
202

203
LogStreaming::LogStreamingRawHandle
204
LogStreamingImpl::subscribe_log_streaming_raw(const LogStreaming::LogStreamingRawCallback& callback)
×
205
{
206
    std::lock_guard<std::mutex> lock(_mutex);
×
207
    auto handle = _subscription_callbacks.subscribe(callback);
×
208

209
    return handle;
×
210
}
×
211

212
void LogStreamingImpl::unsubscribe_log_streaming_raw(LogStreaming::LogStreamingRawHandle handle)
×
213
{
214
    std::lock_guard<std::mutex> lock(_mutex);
×
215
    _subscription_callbacks.unsubscribe(handle);
×
216
}
×
217

218
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc