• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mavlink / MAVSDK / 11805809015

12 Nov 2024 09:13PM UTC coverage: 38.611% (+0.003%) from 38.608%
11805809015

push

github

web-flow
Merge pull request #2439 from mavlink/pr-fix-command-sender-locking

core: add missing locks in command queue

3 of 14 new or added lines in 1 file covered. (21.43%)

1 existing line in 1 file now uncovered.

12032 of 31162 relevant lines covered (38.61%)

246.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.81
/src/mavsdk/core/mavlink_command_sender.cpp
1
#include "mavlink_command_sender.h"
2
#include "mavlink_address.h"
3
#include "system_impl.h"
4
#include "unused.h"
5
#include <cmath>
6
#include <future>
7
#include <memory>
8

9
namespace mavsdk {
10

11
MavlinkCommandSender::MavlinkCommandSender(SystemImpl& system_impl) : _system_impl(system_impl)
69✔
12
{
13
    if (const char* env_p = std::getenv("MAVSDK_COMMAND_DEBUGGING")) {
69✔
14
        if (std::string(env_p) == "1") {
×
15
            LogDebug() << "Command debugging is on.";
×
16
            _command_debugging = true;
×
17
        }
18
    }
19

20
    _system_impl.register_mavlink_message_handler(
69✔
21
        MAVLINK_MSG_ID_COMMAND_ACK,
22
        [this](const mavlink_message_t& message) { receive_command_ack(message); },
40✔
23
        this);
24
}
69✔
25

26
MavlinkCommandSender::~MavlinkCommandSender()
69✔
27
{
28
    if (_command_debugging) {
69✔
29
        LogDebug() << "CommandSender destroyed";
×
30
    }
31
    _system_impl.unregister_all_mavlink_message_handlers(this);
69✔
32

33
    LockedQueue<Work>::Guard work_queue_guard(_work_queue);
69✔
34
    for (const auto& work : _work_queue) {
71✔
35
        _system_impl.unregister_timeout_handler(work->timeout_cookie);
2✔
36
    }
37
}
138✔
38

39
MavlinkCommandSender::Result
40
MavlinkCommandSender::send_command(const MavlinkCommandSender::CommandInt& command)
×
41
{
42
    // We wrap the async call with a promise and future.
43
    auto prom = std::make_shared<std::promise<Result>>();
×
44
    auto res = prom->get_future();
×
45

46
    queue_command_async(command, [prom](Result result, float progress) {
×
47
        UNUSED(progress);
×
48
        // We can only fulfill the promise once in C++11.
49
        // Therefore, we have to ignore the IN_PROGRESS state and wait
50
        // for the final result.
51
        if (result != Result::InProgress) {
×
52
            prom->set_value(result);
×
53
        }
54
    });
×
55

56
    // Block now to wait for result.
57
    return res.get();
×
58
}
×
59

60
MavlinkCommandSender::Result
61
MavlinkCommandSender::send_command(const MavlinkCommandSender::CommandLong& command)
×
62
{
63
    // We wrap the async call with a promise and future.
64
    auto prom = std::make_shared<std::promise<Result>>();
×
65
    auto res = prom->get_future();
×
66

67
    queue_command_async(command, [prom](Result result, float progress) {
×
68
        UNUSED(progress);
×
69
        // We can only fulfill the promise once in C++11.
70
        // Therefore, we have to ignore the IN_PROGRESS state and wait
71
        // for the final result.
72
        if (result != Result::InProgress) {
×
73
            prom->set_value(result);
×
74
        }
75
    });
×
76

77
    return res.get();
×
78
}
×
79

80
void MavlinkCommandSender::queue_command_async(
×
81
    const CommandInt& command, const CommandResultCallback& callback)
82
{
83
    if (_command_debugging) {
×
84
        LogDebug() << "COMMAND_INT " << static_cast<int>(command.command) << " to send to "
×
85
                   << static_cast<int>(command.target_system_id) << ", "
×
86
                   << static_cast<int>(command.target_component_id);
×
87
    }
88

89
    CommandIdentification identification = identification_from_command(command);
×
90

91
    {
NEW
92
        LockedQueue<Work>::Guard work_queue_guard(_work_queue);
×
NEW
93
        for (const auto& work : _work_queue) {
×
NEW
94
            if (work->identification == identification && callback == nullptr) {
×
NEW
95
                if (_command_debugging) {
×
NEW
96
                    LogDebug() << "Dropping command " << static_cast<int>(identification.command)
×
NEW
97
                               << " that is already being sent";
×
98
                }
NEW
99
                return;
×
100
            }
101
        }
UNCOV
102
    }
×
103

104
    auto new_work = std::make_shared<Work>();
×
105
    new_work->timeout_s = _system_impl.timeout_s();
×
106
    new_work->command = command;
×
107
    new_work->identification = identification;
×
108
    new_work->callback = callback;
×
109
    _work_queue.push_back(new_work);
×
110
}
×
111

112
void MavlinkCommandSender::queue_command_async(
42✔
113
    const CommandLong& command, const CommandResultCallback& callback)
114
{
115
    if (_command_debugging) {
42✔
116
        LogDebug() << "COMMAND_LONG " << static_cast<int>(command.command) << " to send to "
×
117
                   << static_cast<int>(command.target_system_id) << ", "
×
118
                   << static_cast<int>(command.target_component_id);
×
119
    }
120

121
    CommandIdentification identification = identification_from_command(command);
42✔
122

123
    {
124
        LockedQueue<Work>::Guard work_queue_guard(_work_queue);
42✔
125
        for (const auto& work : _work_queue) {
49✔
126
            if (work->identification == identification && callback == nullptr) {
7✔
NEW
127
                if (_command_debugging) {
×
NEW
128
                    LogDebug() << "Dropping command " << static_cast<int>(identification.command)
×
NEW
129
                               << " that is already being sent";
×
130
                }
NEW
131
                return;
×
132
            }
133
        }
134
    }
42✔
135

136
    auto new_work = std::make_shared<Work>();
42✔
137
    new_work->timeout_s = _system_impl.timeout_s();
42✔
138
    new_work->command = command;
42✔
139
    new_work->identification = identification;
42✔
140
    new_work->callback = callback;
42✔
141
    new_work->time_started = _system_impl.get_time().steady_time();
42✔
142
    _work_queue.push_back(new_work);
42✔
143
}
42✔
144

145
void MavlinkCommandSender::receive_command_ack(const mavlink_message_t& message)
40✔
146
{
147
    mavlink_command_ack_t command_ack;
40✔
148
    mavlink_msg_command_ack_decode(&message, &command_ack);
40✔
149

150
    if ((command_ack.target_system &&
40✔
151
         command_ack.target_system != _system_impl.get_own_system_id()) ||
80✔
152
        (command_ack.target_component &&
80✔
153
         command_ack.target_component != _system_impl.get_own_component_id())) {
40✔
154
        if (_command_debugging) {
×
155
            LogDebug() << "Ignoring command ack for command "
×
156
                       << static_cast<int>(command_ack.command) << " from "
×
157
                       << static_cast<int>(message.sysid) << '/' << static_cast<int>(message.compid)
×
158
                       << " to " << static_cast<int>(command_ack.target_system) << '/'
×
159
                       << static_cast<int>(command_ack.target_component);
×
160
        }
161
        return;
×
162
    }
163

164
    LockedQueue<Work>::Guard work_queue_guard(_work_queue);
40✔
165

166
    for (auto it = _work_queue.begin(); it != _work_queue.end(); ++it) {
44✔
167
        const auto& work = *it;
44✔
168

169
        if (!work) {
44✔
170
            LogErr() << "No work available! (should not happen #1)";
×
171
            return;
×
172
        }
173

174
        if (work->identification.command != command_ack.command ||
44✔
175
            (work->identification.target_system_id != 0 &&
40✔
176
             work->identification.target_system_id != message.sysid) ||
124✔
177
            (work->identification.target_component_id != 0 &&
40✔
178
             work->identification.target_component_id != message.compid)) {
40✔
179
            if (_command_debugging) {
4✔
180
                LogDebug() << "Command ack for " << command_ack.command
×
181
                           << " (from: " << std::to_string(message.sysid) << "/"
×
182
                           << std::to_string(message.compid) << ")" << " does not match command "
×
183
                           << work->identification.command
×
184
                           << " (to: " << std::to_string(work->identification.target_system_id)
×
185
                           << "/" << std::to_string(work->identification.target_component_id) << ")"
×
186
                           << " after "
×
187
                           << _system_impl.get_time().elapsed_since_s(work->time_started) << " s";
×
188
            }
189
            continue;
4✔
190
        }
191

192
        if (_command_debugging) {
40✔
193
            LogDebug() << "Received command ack for " << command_ack.command << " with result "
×
194
                       << static_cast<int>(command_ack.result) << " after "
×
195
                       << _system_impl.get_time().elapsed_since_s(work->time_started) << " s";
×
196
        }
197

198
        CommandResultCallback temp_callback = work->callback;
40✔
199
        std::pair<Result, float> temp_result{Result::UnknownError, NAN};
40✔
200

201
        switch (command_ack.result) {
40✔
202
            case MAV_RESULT_ACCEPTED:
38✔
203
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
38✔
204
                temp_result = {Result::Success, 1.0f};
38✔
205
                _work_queue.erase(it);
38✔
206
                break;
38✔
207

208
            case MAV_RESULT_DENIED:
×
209
                if (_command_debugging) {
×
210
                    LogDebug() << "command denied (" << work->identification.command << ").";
×
211
                    if (work->identification.command == 512) {
×
212
                        LogDebug() << "(message " << work->identification.maybe_param1 << ")";
×
213
                    }
214
                }
215
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
×
216
                temp_result = {Result::Denied, NAN};
×
217
                _work_queue.erase(it);
×
218
                break;
×
219

220
            case MAV_RESULT_UNSUPPORTED:
×
221
                if (_command_debugging) {
×
222
                    LogDebug() << "command unsupported (" << work->identification.command << ").";
×
223
                }
224
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
×
225
                temp_result = {Result::Unsupported, NAN};
×
226
                _work_queue.erase(it);
×
227
                break;
×
228

229
            case MAV_RESULT_TEMPORARILY_REJECTED:
2✔
230
                if (_command_debugging) {
2✔
231
                    LogDebug() << "command temporarily rejected (" << work->identification.command
×
232
                               << ").";
×
233
                }
234
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
2✔
235
                temp_result = {Result::TemporarilyRejected, NAN};
2✔
236
                _work_queue.erase(it);
2✔
237
                break;
2✔
238

239
            case MAV_RESULT_FAILED:
×
240
                if (_command_debugging) {
×
241
                    LogDebug() << "command failed (" << work->identification.command << ").";
×
242
                }
243
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
×
244
                temp_result = {Result::Failed, NAN};
×
245
                _work_queue.erase(it);
×
246
                break;
×
247

248
            case MAV_RESULT_IN_PROGRESS:
×
249
                if (_command_debugging) {
×
250
                    if (static_cast<int>(command_ack.progress) != 255) {
×
251
                        LogDebug() << "progress: " << static_cast<int>(command_ack.progress)
×
252
                                   << " % (" << work->identification.command << ").";
×
253
                    }
254
                }
255
                // If we get a progress update, we can raise the timeout
256
                // to something higher because we know the initial command
257
                // has arrived.
258
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
×
259
                work->timeout_cookie = _system_impl.register_timeout_handler(
×
260
                    [this, identification = work->identification] {
×
261
                        receive_timeout(identification);
×
262
                    },
×
263
                    3.0);
264

265
                temp_result = {
×
266
                    Result::InProgress, static_cast<float>(command_ack.progress) / 100.0f};
×
267
                break;
×
268

269
            case MAV_RESULT_CANCELLED:
×
270
                if (_command_debugging) {
×
271
                    LogDebug() << "command cancelled (" << work->identification.command << ").";
×
272
                }
273
                _system_impl.unregister_timeout_handler(work->timeout_cookie);
×
274
                temp_result = {Result::Cancelled, NAN};
×
275
                _work_queue.erase(it);
×
276
                break;
×
277

278
            default:
×
279
                LogWarn() << "Received unknown ack.";
×
280
                break;
×
281
        }
282

283
        if (temp_callback != nullptr) {
40✔
284
            call_callback(temp_callback, temp_result.first, temp_result.second);
40✔
285
        }
286

287
        return;
40✔
288
    }
40✔
289

290
    if (_command_debugging) {
×
291
        LogDebug() << "Received ack from " << static_cast<int>(message.sysid) << '/'
×
292
                   << static_cast<int>(message.compid)
×
293
                   << " for not-existing command: " << static_cast<int>(command_ack.command)
×
294
                   << "! Ignoring...";
×
295
    } else {
296
        LogWarn() << "Received ack for not-existing command: "
×
297
                  << static_cast<int>(command_ack.command) << "! Ignoring...";
×
298
    }
299
}
40✔
300

301
void MavlinkCommandSender::receive_timeout(const CommandIdentification& identification)
1✔
302
{
303
    if (_command_debugging) {
1✔
304
        LogDebug() << "Got timeout!";
×
305
    }
306
    bool found_command = false;
1✔
307
    CommandResultCallback temp_callback = nullptr;
1✔
308
    std::pair<Result, float> temp_result{Result::UnknownError, NAN};
1✔
309

310
    LockedQueue<Work>::Guard work_queue_guard(_work_queue);
1✔
311

312
    for (auto it = _work_queue.begin(); it != _work_queue.end(); ++it) {
2✔
313
        const auto& work = *it;
1✔
314

315
        if (!work) {
1✔
316
            LogErr() << "No work available! (should not happen #2)";
×
317
            return;
×
318
        }
319

320
        if (work->identification != identification) {
1✔
321
            continue;
×
322
        }
323

324
        found_command = true;
1✔
325

326
        if (work->retries_to_do > 0) {
1✔
327
            // We're not sure the command arrived, let's retransmit.
328
            LogWarn() << "sending again after "
2✔
329
                      << _system_impl.get_time().elapsed_since_s(work->time_started)
2✔
330
                      << " s, retries to do: " << work->retries_to_do << "  ("
1✔
331
                      << work->identification.command << ").";
3✔
332

333
            if (work->identification.command == MAV_CMD_REQUEST_MESSAGE) {
1✔
334
                LogWarn() << "Request was for msg ID: " << work->identification.maybe_param1;
1✔
335
            }
336

337
            if (!send_mavlink_message(work->command)) {
1✔
338
                LogErr() << "connection send error in retransmit (" << work->identification.command
×
339
                         << ").";
×
340
                temp_callback = work->callback;
×
341
                temp_result = {Result::ConnectionError, NAN};
×
342
                _work_queue.erase(it);
×
343
                break;
×
344
            } else {
345
                --work->retries_to_do;
1✔
346
                work->timeout_cookie = _system_impl.register_timeout_handler(
2✔
347
                    [this, identification = work->identification] {
1✔
348
                        receive_timeout(identification);
×
349
                    },
×
350
                    work->timeout_s);
1✔
351
            }
352
        } else {
353
            // We have tried retransmitting, giving up now.
354
            if (work->identification.command == 512) {
×
355
                uint8_t target_sysid;
356
                uint8_t target_compid;
357
                if (auto command_int = std::get_if<CommandInt>(&work->command)) {
×
358
                    target_sysid = command_int->target_system_id;
×
359
                    target_compid = command_int->target_component_id;
×
360
                } else if (auto command_long = std::get_if<CommandLong>(&work->command)) {
×
361
                    target_sysid = command_long->target_system_id;
×
362
                    target_compid = command_long->target_component_id;
×
363
                } else {
364
                    LogErr() << "No command, that's awkward";
×
365
                    continue;
×
366
                }
367
                LogErr() << "Retrying failed for REQUEST_MESSAGE command for message: "
×
368
                         << work->identification.maybe_param1 << ", to ("
×
369
                         << std::to_string(target_sysid) << "/" << std::to_string(target_compid)
×
370
                         << ")";
×
371
            } else {
372
                LogErr() << "Retrying failed for command: " << work->identification.command << ")";
×
373
            }
374

375
            temp_callback = work->callback;
×
376
            temp_result = {Result::Timeout, NAN};
×
377
            _work_queue.erase(it);
×
378
            break;
×
379
        }
380
    }
381

382
    if (temp_callback != nullptr) {
1✔
383
        call_callback(temp_callback, temp_result.first, temp_result.second);
×
384
    }
385

386
    if (!found_command) {
1✔
387
        LogWarn() << "Timeout for not-existing command: "
×
388
                  << static_cast<int>(identification.command) << "! Ignoring...";
×
389
    }
390
}
1✔
391

392
void MavlinkCommandSender::do_work()
3,054✔
393
{
394
    LockedQueue<Work>::Guard work_queue_guard(_work_queue);
3,054✔
395

396
    for (const auto& work : _work_queue) {
3,118✔
397
        if (work->already_sent) {
70✔
398
            continue;
27✔
399
        }
400

401
        bool already_being_sent = false;
43✔
402
        for (const auto& other_work : _work_queue) {
95✔
403
            // Ignore itself:
404
            if (other_work == work) {
53✔
405
                continue;
42✔
406
            }
407

408
            // Check if command with same command ID is already being sent.
409
            if (other_work->already_sent &&
18✔
410
                other_work->identification.command == work->identification.command) {
7✔
411
                if (_command_debugging) {
1✔
412
                    LogDebug() << "Command " << static_cast<int>(work->identification.command)
×
413
                               << " is already being sent, waiting...";
×
414
                }
415
                already_being_sent = true;
1✔
416
                break;
1✔
417
            }
418
        }
419

420
        if (already_being_sent) {
43✔
421
            continue;
1✔
422
        }
423

424
        // LogDebug() << "sending it the first time (" << work->mavlink_command << ")";
425
        work->time_started = _system_impl.get_time().steady_time();
42✔
426

427
        {
428
            if (!send_mavlink_message(work->command)) {
42✔
429
                LogErr() << "connection send error (" << work->identification.command << ")";
×
430
                // In this case we try again after the timeout. Chances are slim it will work next
431
                // time though.
432
            } else {
433
                if (_command_debugging) {
42✔
434
                    LogDebug() << "Sent command " << static_cast<int>(work->identification.command);
×
435
                }
436
            }
437
        }
438

439
        work->already_sent = true;
42✔
440

441
        work->timeout_cookie = _system_impl.register_timeout_handler(
84✔
442
            [this, identification = work->identification] { receive_timeout(identification); },
43✔
443
            work->timeout_s);
42✔
444
    }
445
}
3,069✔
446

447
void MavlinkCommandSender::call_callback(
40✔
448
    const CommandResultCallback& callback, Result result, float progress) const
449
{
450
    if (!callback) {
40✔
451
        return;
×
452
    }
453

454
    // It seems that we need to queue the callback on the thread pool otherwise
455
    // we lock ourselves out when we send a command in the callback receiving a command result.
456
    auto temp_callback = callback;
40✔
457
    _system_impl.call_user_callback(
80✔
458
        [temp_callback, result, progress]() { temp_callback(result, progress); });
459
}
40✔
460

461
bool MavlinkCommandSender::send_mavlink_message(const Command& command) const
43✔
462
{
463
    if (auto command_int = std::get_if<CommandInt>(&command)) {
43✔
464
        return _system_impl.queue_message([&](MavlinkAddress mavlink_address, uint8_t channel) {
×
465
            mavlink_message_t message;
466
            mavlink_msg_command_int_pack_chan(
×
467
                mavlink_address.system_id,
×
468
                mavlink_address.component_id,
×
469
                channel,
470
                &message,
471
                command_int->target_system_id,
×
472
                command_int->target_component_id,
×
473
                command_int->frame,
×
474
                command_int->command,
×
475
                command_int->current,
×
476
                command_int->autocontinue,
×
477
                maybe_reserved(command_int->params.maybe_param1),
×
478
                maybe_reserved(command_int->params.maybe_param2),
×
479
                maybe_reserved(command_int->params.maybe_param3),
×
480
                maybe_reserved(command_int->params.maybe_param4),
×
481
                command_int->params.x,
×
482
                command_int->params.y,
×
483
                maybe_reserved(command_int->params.maybe_z));
×
484
            return message;
×
485
        });
×
486

487
    } else if (auto command_long = std::get_if<CommandLong>(&command)) {
43✔
488
        return _system_impl.queue_message([&](MavlinkAddress mavlink_address, uint8_t channel) {
86✔
489
            mavlink_message_t message;
490
            mavlink_msg_command_long_pack_chan(
344✔
491
                mavlink_address.system_id,
43✔
492
                mavlink_address.component_id,
43✔
493
                channel,
494
                &message,
495
                command_long->target_system_id,
344✔
496
                command_long->target_component_id,
43✔
497
                command_long->command,
43✔
498
                command_long->confirmation,
43✔
499
                maybe_reserved(command_long->params.maybe_param1),
43✔
500
                maybe_reserved(command_long->params.maybe_param2),
43✔
501
                maybe_reserved(command_long->params.maybe_param3),
43✔
502
                maybe_reserved(command_long->params.maybe_param4),
43✔
503
                maybe_reserved(command_long->params.maybe_param5),
43✔
504
                maybe_reserved(command_long->params.maybe_param6),
43✔
505
                maybe_reserved(command_long->params.maybe_param7));
43✔
506
            return message;
43✔
507
        });
43✔
508
    } else {
509
        return false;
×
510
    }
511
}
512

513
float MavlinkCommandSender::maybe_reserved(const std::optional<float>& maybe_param) const
301✔
514
{
515
    if (maybe_param) {
301✔
516
        return maybe_param.value();
44✔
517

518
    } else {
519
        if (_system_impl.autopilot() == Autopilot::ArduPilot) {
257✔
520
            return 0.0f;
×
521
        } else {
522
            return NAN;
257✔
523
        }
524
    }
525
}
526

527
} // namespace mavsdk
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc