• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 15060565035

16 May 2025 04:20AM UTC coverage: 44.332% (+0.06%) from 44.27%
15060565035

push

github

web-flow
Merge pull request #2573 from mavlink/pr-threading

Add incoming and outgoing message queue, fixing threading/locking issues

159 of 185 new or added lines in 7 files covered. (85.95%)

60 existing lines in 8 files now uncovered.

14901 of 33612 relevant lines covered (44.33%)

285.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.5
/src/mavsdk/core/mavlink_request_message.cpp
1

2
#include "log.h"
3
#include "mavlink_request_message.h"
4
#include "system_impl.h"
5
#include <cstdint>
6
#include <string>
7

8
namespace mavsdk {
9

10
MavlinkRequestMessage::MavlinkRequestMessage(
86✔
11
    SystemImpl& system_impl,
12
    MavlinkCommandSender& command_sender,
13
    MavlinkMessageHandler& message_handler,
14
    TimeoutHandler& timeout_handler) :
86✔
15
    _system_impl(system_impl),
86✔
16
    _command_sender(command_sender),
86✔
17
    _message_handler(message_handler),
86✔
18
    _timeout_handler(timeout_handler)
86✔
19
{
20
    if (const char* env_p = std::getenv("MAVSDK_COMMAND_DEBUGGING")) {
86✔
21
        if (std::string(env_p) == "1") {
×
22
            LogDebug() << "Command debugging is on.";
×
23
            _debugging = true;
×
24
        }
25
    }
26
}
86✔
27

28
void MavlinkRequestMessage::request(
88✔
29
    uint32_t message_id,
30
    uint8_t target_component,
31
    MavlinkRequestMessageCallback callback,
32
    uint32_t param2)
33
{
34
    std::unique_lock<std::mutex> lock(_mutex);
88✔
35

36
    // Cleanup previous requests.
37
    for (const auto id : _deferred_message_cleanup) {
92✔
38
        _message_handler.unregister_one(id, this);
4✔
39
    }
40
    _deferred_message_cleanup.clear();
88✔
41

42
    // Respond with 'Busy' if already in progress.
43
    for (auto& item : _work_items) {
215✔
44
        if (item.message_id == message_id && item.param2 == param2) {
127✔
45
            lock.unlock();
×
46
            if (callback) {
×
47
                callback(MavlinkCommandSender::Result::Busy, {});
×
48
            }
49
            return;
×
50
        }
51
    }
52

53
    // Otherwise, schedule it.
54
    _work_items.emplace_back(WorkItem{message_id, target_component, callback, param2});
88✔
55

56
    // Register for message
57
    _message_handler.register_one(
88✔
58
        message_id,
59
        [this](const mavlink_message_t& message) { handle_any_message(message); },
63✔
60
        this);
61

62
    // And send off command
63
    send_request(_work_items.back());
88✔
64
}
88✔
65

66
void MavlinkRequestMessage::send_request(WorkItem& item)
193✔
67
{
68
    // Every second time, starting with first retry, we try the old request commands.
69
    // If no old commands exist, we of course just send the new one again.
70
    if (item.retries % 2 != 0) {
193✔
71
        if (!try_sending_request_using_old_command(item)) {
59✔
72
            send_request_using_new_command(item);
2✔
73
        }
74
    } else {
75
        send_request_using_new_command(item);
134✔
76
    }
77
}
193✔
78

79
void MavlinkRequestMessage::send_request_using_new_command(WorkItem& item)
136✔
80
{
81
    if (_debugging) {
136✔
82
        if (item.retries > 0) {
×
83
            LogDebug() << "Request message " << item.message_id
×
84
                       << " again using REQUEST_MESSAGE (retries: " << item.retries << ")";
×
85
        } else {
86
            LogDebug() << "Request message " << item.message_id << " using REQUEST_MESSAGE";
×
87
        }
88
    }
89

90
    MavlinkCommandSender::CommandLong command_request_message{};
136✔
91
    command_request_message.command = MAV_CMD_REQUEST_MESSAGE;
136✔
92
    command_request_message.target_system_id = _system_impl.get_system_id();
136✔
93
    command_request_message.target_component_id = item.target_component;
136✔
94
    command_request_message.params.maybe_param1 = {static_cast<float>(item.message_id)};
136✔
95
    _command_sender.queue_command_async(
272✔
96
        command_request_message,
97
        [this, message_id = item.message_id](MavlinkCommandSender::Result result, float) {
136✔
98
            if (result != MavlinkCommandSender::Result::InProgress) {
136✔
99
                handle_command_result(message_id, result);
136✔
100
            }
101
        },
136✔
102
        1);
103
}
136✔
104

105
bool MavlinkRequestMessage::try_sending_request_using_old_command(WorkItem& item)
59✔
106
{
107
    MavlinkCommandSender::CommandLong command_request_message{};
59✔
108
    command_request_message.target_system_id = _system_impl.get_system_id();
59✔
109
    command_request_message.target_component_id = item.target_component;
59✔
110

111
    std::string command_name;
59✔
112

113
    switch (item.message_id) {
59✔
UNCOV
114
        case MAVLINK_MSG_ID_AUTOPILOT_VERSION:
×
UNCOV
115
            command_request_message.command = MAV_CMD_REQUEST_AUTOPILOT_CAPABILITIES;
×
UNCOV
116
            command_request_message.params.maybe_param1 = {1.f};
×
UNCOV
117
            command_name = "REQUEST_AUTOPILOT_CAPABILITIES";
×
UNCOV
118
            break;
×
119

120
        case MAVLINK_MSG_ID_CAMERA_INFORMATION:
×
121
            command_request_message.command = MAV_CMD_REQUEST_CAMERA_INFORMATION;
×
122
            command_request_message.params.maybe_param1 = {1.0f};
×
123
            command_name = "REQUEST_CAMERA_INFORMATION";
×
124
            break;
×
125

126
        case MAVLINK_MSG_ID_CAMERA_SETTINGS:
9✔
127
            command_request_message.command = MAV_CMD_REQUEST_CAMERA_SETTINGS;
9✔
128
            command_request_message.params.maybe_param1 = {1.0f};
9✔
129
            command_name = "REQUEST_CAMERA_SETTINGS";
9✔
130

131
            break;
9✔
132

133
        case MAVLINK_MSG_ID_STORAGE_INFORMATION:
16✔
134
            command_request_message.command = MAV_CMD_REQUEST_STORAGE_INFORMATION;
16✔
135
            command_request_message.params.maybe_param1 = {
32✔
136
                static_cast<float>(item.param2)}; // storage ID
32✔
137
            command_request_message.params.maybe_param2 = {1.0f};
16✔
138
            command_name = "REQUEST_STORAGE_INFORMATION";
16✔
139
            break;
16✔
140

141
        case MAVLINK_MSG_ID_CAMERA_CAPTURE_STATUS:
×
142
            command_request_message.command = MAV_CMD_REQUEST_CAMERA_CAPTURE_STATUS;
×
143
            command_request_message.params.maybe_param1 = {1.0f};
×
144
            command_name = "REQUEST_CAMERA_CAPTURE_STATUS";
×
145
            break;
×
146

147
        case MAVLINK_MSG_ID_FLIGHT_INFORMATION:
×
148
            command_request_message.command = MAV_CMD_REQUEST_FLIGHT_INFORMATION;
×
149
            command_request_message.params.maybe_param1 = {1.0f};
×
150
            command_name = "REQUEST_FLIGHT_INFORMATION";
×
151
            break;
×
152

153
        case MAVLINK_MSG_ID_VIDEO_STREAM_STATUS:
17✔
154
            command_request_message.command = MAV_CMD_REQUEST_VIDEO_STREAM_STATUS;
17✔
155
            command_request_message.params.maybe_param1 = {
34✔
156
                static_cast<float>(item.param2)}; // stream ID
34✔
157
            command_name = "REQUEST_VIDEO_STREAM_STATUS";
17✔
158
            break;
17✔
159

160
        case MAVLINK_MSG_ID_VIDEO_STREAM_INFORMATION:
15✔
161
            command_request_message.command = MAV_CMD_REQUEST_VIDEO_STREAM_INFORMATION;
15✔
162
            command_request_message.params.maybe_param1 = {
30✔
163
                static_cast<float>(item.param2)}; // stream ID
30✔
164
            command_name = "REQUEST_VIDEO_STREAM_INFORMATION";
15✔
165
            break;
15✔
166

167
        case MAVLINK_MSG_ID_CAMERA_IMAGE_CAPTURED:
×
168
            command_request_message.command = MAV_CMD_REQUEST_CAMERA_IMAGE_CAPTURE;
×
169
            command_request_message.params.maybe_param1 = {
×
170
                static_cast<float>(item.param2)}; // sequence number
×
171
            command_name = "REQUEST_CAMERA_IMAGE_CAPTURE";
×
172
            break;
×
173

174
        default:
2✔
175
            // No alternative command for this message.
176
            return false;
2✔
177
    }
178

179
    if (_debugging) {
57✔
180
        LogDebug() << "Request message " << item.message_id << " again using " << command_name
×
181
                   << " (retries: " << item.retries << ")";
×
182
    }
183

184
    _command_sender.queue_command_async(
114✔
185
        command_request_message,
186
        [this, message_id = item.message_id](MavlinkCommandSender::Result result, float) {
57✔
187
            if (result != MavlinkCommandSender::Result::InProgress) {
57✔
188
                handle_command_result(message_id, result);
57✔
189
            }
190
        },
57✔
191
        1);
192

193
    return true;
57✔
194
}
59✔
195

196
void MavlinkRequestMessage::handle_any_message(const mavlink_message_t& message)
63✔
197
{
198
    std::unique_lock<std::mutex> lock(_mutex);
63✔
199

200
    for (auto it = _work_items.begin(); it != _work_items.end(); ++it) {
104✔
201
        // Check if we're waiting for this message.
202
        // TODO: check if params are correct.
203
        if (it->message_id != message.msgid && it->target_component == message.compid) {
104✔
204
            continue;
41✔
205
        }
206

207
        _timeout_handler.remove(it->timeout_cookie);
63✔
208
        // We have at least the message which is what we care about most, so we
209
        // tell the user about it and call it a day. If we receive the result
210
        // of the command later, we ignore it, and we fake the result to be successful
211
        // anyway.
212
        auto temp_callback = it->callback;
63✔
213
        // We can now get rid of the entry now.
214
        _work_items.erase(it);
63✔
215
        // We have to clean up later outside this callback, otherwise we lock ourselves out.
216
        _deferred_message_cleanup.push_back(message.msgid);
63✔
217
        lock.unlock();
63✔
218

219
        if (temp_callback) {
63✔
220
            temp_callback(MavlinkCommandSender::Result::Success, message);
1✔
221
        }
222
        return;
63✔
223
    }
63✔
224
}
63✔
225

226
void MavlinkRequestMessage::handle_command_result(
193✔
227
    uint32_t message_id, MavlinkCommandSender::Result result)
228
{
229
    std::unique_lock<std::mutex> lock(_mutex);
193✔
230

231
    for (auto it = _work_items.begin(); it != _work_items.end(); ++it) {
351✔
232
        // Check if we're waiting for this result
233
        if (it->message_id != message_id) {
306✔
234
            continue;
158✔
235
        }
236

237
        switch (result) {
148✔
238
            case MavlinkCommandSender::Result::Success:
20✔
239
                // This is promising, let's hope the message will actually arrive.
240
                // We'll set a timeout in case we need to retry.
241
                it->timeout_cookie = _timeout_handler.add(
40✔
242
                    [this, message_id, target_component = it->target_component]() {
20✔
UNCOV
243
                        handle_timeout(message_id, target_component);
×
UNCOV
244
                    },
×
245
                    1.0);
246
                return;
20✔
247

248
            case MavlinkCommandSender::Result::Busy:
128✔
249
                // FALLTHROUGH
250
            case MavlinkCommandSender::Result::Denied:
251
                // FALLTHROUGH
252
            case MavlinkCommandSender::Result::Unsupported:
253
                // FALLTHROUGH
254
            case MavlinkCommandSender::Result::Timeout:
255
                // FALLTHROUGH
256
            case MavlinkCommandSender::Result::TemporarilyRejected:
257
                // It looks like we should try again
258
                if (++it->retries > RETRIES) {
128✔
259
                    // We have already retried, let's give up.
260
                    auto temp_callback = it->callback;
23✔
261
                    _message_handler.unregister_one(it->message_id, this);
23✔
262
                    _work_items.erase(it);
23✔
263
                    lock.unlock();
23✔
264
                    if (temp_callback) {
23✔
265
                        temp_callback(MavlinkCommandSender::Result::Timeout, {});
×
266
                    }
267
                } else {
23✔
268
                    send_request(*it);
105✔
269
                }
270
                return;
128✔
271

272
            case MavlinkCommandSender::Result::NoSystem:
×
273
                // FALLTHROUGH
274
            case MavlinkCommandSender::Result::ConnectionError:
275
                // FALLTHROUGH
276
            case MavlinkCommandSender::Result::Failed:
277
                // FALLTHROUGH
278
            case MavlinkCommandSender::Result::Cancelled:
279
                // FALLTHROUGH
280
            case MavlinkCommandSender::Result::UnknownError: {
281
                // It looks like this did not work, and we can report the error
282
                // No need to try again.
283
                auto temp_callback = it->callback;
×
284
                _message_handler.unregister_one(it->message_id, this);
×
285
                _work_items.erase(it);
×
286
                lock.unlock();
×
287
                if (temp_callback) {
×
288
                    temp_callback(result, {});
×
289
                }
290
                return;
×
291
            }
×
292

293
            case MavlinkCommandSender::Result::InProgress:
×
294
                // Should not happen, as it is already filtered out.
295
                return;
×
296
        }
297
    }
298
}
193✔
299

UNCOV
300
void MavlinkRequestMessage::handle_timeout(uint32_t message_id, uint8_t target_component)
×
301
{
UNCOV
302
    std::unique_lock<std::mutex> lock(_mutex);
×
303

UNCOV
304
    for (auto it = _work_items.begin(); it != _work_items.end(); ++it) {
×
305
        // Check if we're waiting for this result
UNCOV
306
        if (it->message_id != message_id || it->target_component != target_component) {
×
307
            continue;
×
308
        }
309

UNCOV
310
        if (++it->retries > RETRIES) {
×
311
            // We have already retried, let's give up.
312
            auto temp_callback = it->callback;
×
313
            _message_handler.unregister_one(it->message_id, this);
×
314
            _work_items.erase(it);
×
315
            lock.unlock();
×
316
            if (temp_callback) {
×
317
                temp_callback(MavlinkCommandSender::Result::Timeout, {});
×
318
            }
319
            return;
×
320
        }
×
321

UNCOV
322
        send_request(*it);
×
323
    }
UNCOV
324
}
×
325

326
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc