• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 16697765559

02 Aug 2025 09:09PM UTC coverage: 87.996%. First build
16697765559

Pull #365

github

web-flow
Merge 761522df6 into 10ca7de0a
Pull Request #365: coro::shared_mutex use coro::mutex instead of std::mutex

25 of 30 new or added lines in 2 files covered. (83.33%)

1642 of 1866 relevant lines covered (88.0%)

13453934.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.95
/include/coro/shared_mutex.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <coroutine>
9

10
namespace coro
11
{
12
template<concepts::executor executor_type>
13
class shared_mutex;
14

15
/**
16
 * A scoped RAII lock holder for a coro::shared_mutex.  It will call the appropriate unlock() or
17
 * unlock_shared() based on how the coro::shared_mutex was originally acquired, either shared or
18
 * exclusive modes.
19
 */
20
template<concepts::executor executor_type>
21
class shared_scoped_lock
22
{
23
public:
24
    shared_scoped_lock(shared_mutex<executor_type>& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {}
25

26
    /**
27
     * Unlocks the mutex upon this shared scoped lock destructing.
28
     */
29
    ~shared_scoped_lock() { unlock(); }
30

31
    shared_scoped_lock(const shared_scoped_lock&) = delete;
32
    shared_scoped_lock(shared_scoped_lock&& other)
33
        : m_shared_mutex(std::exchange(other.m_shared_mutex, nullptr)),
34
          m_exclusive(other.m_exclusive)
35
    {
36
    }
37

38
    auto operator=(const shared_scoped_lock&) -> shared_scoped_lock& = delete;
39
    auto operator=(shared_scoped_lock&& other) noexcept -> shared_scoped_lock&
40
    {
41
        if (std::addressof(other) != this)
42
        {
43
            m_shared_mutex = std::exchange(other.m_shared_mutex, nullptr);
44
            m_exclusive    = other.m_exclusive;
45
        }
46
        return *this;
47
    }
48

49
    /**
50
     * Unlocks the shared mutex prior to this lock going out of scope.
51
     */
52
    auto unlock() -> void
53
    {
54
        if (m_shared_mutex != nullptr)
55
        {
56
            if (!m_shared_mutex->m_executor->spawn(make_unlock_task(m_shared_mutex, m_exclusive)))
57
            {
58
                // If for some reason it fails to spawn block so this mutex isn't unusable.
59
                coro::sync_wait(make_unlock_task(m_shared_mutex, m_exclusive));
60
            }
61
        }
62
    }
63

64
private:
65
    shared_mutex<executor_type>* m_shared_mutex{nullptr};
66
    bool                         m_exclusive{false};
67

68
    static auto make_unlock_task(shared_mutex<executor_type>* shared_mutex, bool exclusive) -> coro::task<void>
69
    {
70
        std::cerr << "make_unlock_task() start\n";
71
        // This function is spawned and detached from the lifetime of the unlocking scoped lock.
72

73
        if (shared_mutex != nullptr)
74
        {
75
            if (exclusive)
76
            {
77
                std::cerr << "unlock() exclusive\n";
78
                co_await shared_mutex->unlock();
79
            }
80
            else
81
            {
82
                std::cerr << "unlock() shared\n";
83
                co_await shared_mutex->unlock_shared();
84
            }
85

86
            shared_mutex = nullptr;
87
        }
88
        else
89
        {
90
            std::cerr << "unlock() nullptr\n";
91
        }
92
        std::cerr << "make_unlock_task() end\n";
93
    }
94
};
95

96
namespace detail
97
{
98
template<concepts::executor executor_type>
99
struct shared_lock_operation_base
100
{
101
    explicit shared_lock_operation_base(coro::shared_mutex<executor_type>& shared_mutex, bool exclusive)
81✔
102
        : m_shared_mutex(shared_mutex),
81✔
103
          m_exclusive(exclusive)
81✔
104
    {}
81✔
105
    virtual ~shared_lock_operation_base() = default;
81✔
106

107
    shared_lock_operation_base(const shared_lock_operation_base&) = delete;
108
    shared_lock_operation_base(shared_lock_operation_base&&) = delete;
109
    auto operator=(const shared_lock_operation_base&) -> shared_lock_operation_base& = delete;
110
    auto operator=(shared_lock_operation_base&&) -> shared_lock_operation_base& = delete;
111

112
    auto await_ready() const noexcept -> bool
81✔
113
    {
114
        // If either mode can be acquired, unlock the internal mutex and resume.
115

116
        if (m_exclusive)
81✔
117
        {
118
            if (m_shared_mutex.try_lock_locked())
2✔
119
            {
120
                m_shared_mutex.m_mutex.unlock();
2✔
121
                return true;
2✔
122
            }
123
        }
124
        else if (m_shared_mutex.try_lock_shared_locked())
79✔
125
        {
126
            m_shared_mutex.m_mutex.unlock();
38✔
127
            return true;
38✔
128
        }
129

130
        return false;
41✔
131
    }
132

133
    auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
41✔
134
    {
135
        // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
136
        // at the end of the waiter list.
137

138
        auto* tail_waiter = m_shared_mutex.m_tail_waiter.load(std::memory_order::acquire);
41✔
139

140
        if (tail_waiter == nullptr)
41✔
141
        {
142
            m_shared_mutex.m_head_waiter = this;
1✔
143
            m_shared_mutex.m_tail_waiter = this;
1✔
144
        }
145
        else
146
        {
147
            tail_waiter->m_next = this;
40✔
148
            m_shared_mutex.m_tail_waiter         = this;
40✔
149
        }
150

151
        // If this is an exclusive lock acquire then mark it as so so that shared locks after this
152
        // exclusive one will also suspend so this exclusive lock doesn't get starved.
153
        if (m_exclusive)
41✔
154
        {
NEW
155
            ++m_shared_mutex.m_exclusive_waiters;
×
156
        }
157

158
        m_awaiting_coroutine = awaiting_coroutine;
41✔
159
        m_shared_mutex.m_mutex.unlock();
41✔
160
        return true;
41✔
161
    }
162

163
protected:
164
    friend class coro::shared_mutex<executor_type>;
165

166
    std::coroutine_handle<> m_awaiting_coroutine;
167
    shared_lock_operation_base* m_next{nullptr};
168
    coro::shared_mutex<executor_type>& m_shared_mutex;
169
    bool m_exclusive{false};
170
};
171

172
template<typename return_type, concepts::executor executor_type>
173
struct shared_lock_operation : public shared_lock_operation_base<executor_type>
174
{
175
    explicit shared_lock_operation(coro::shared_mutex<executor_type>& shared_mutex, bool exclusive)
81✔
176
        : shared_lock_operation_base<executor_type>(shared_mutex, exclusive)
81✔
177
    {}
81✔
178
    ~shared_lock_operation() override = default;
81✔
179

180
    shared_lock_operation(const shared_lock_operation&) = delete;
181
    shared_lock_operation(shared_lock_operation&&) = delete;
182
    auto operator=(const shared_lock_operation&) -> shared_lock_operation& = delete;
183
    auto operator=(shared_lock_operation&&) -> shared_lock_operation& = delete;
184

185
    auto await_resume() noexcept -> return_type
81✔
186
    {
187
        if constexpr (std::is_same_v<shared_scoped_lock<executor_type>, return_type>)
188
        {
189
            return shared_scoped_lock<executor_type>{this->m_shared_mutex, this->m_exclusive};
190
        }
191
        else
192
        {
193
            return;
81✔
194
        }
195
    }
196
};
197
} // namespace detail
198

199
template<concepts::executor executor_type>
200
class shared_mutex
201
{
202
public:
203
    /**
204
     * @param e The executor for when multiple shared waiters can be woken up at the same time,
205
     *          each shared waiter will be scheduled to immediately run on this executor in
206
     *          parallel.
207
     */
208
    explicit shared_mutex(std::shared_ptr<executor_type> e) : m_executor(std::move(e))
3✔
209
    {
210
        if (m_executor == nullptr)
3✔
211
        {
212
            throw std::runtime_error{"coro::shared_mutex cannot have a nullptr executor"};
×
213
        }
214
    }
3✔
215
    ~shared_mutex() = default;
3✔
216

217
    shared_mutex(const shared_mutex&)                    = delete;
218
    shared_mutex(shared_mutex&&)                         = delete;
219
    auto operator=(const shared_mutex&) -> shared_mutex& = delete;
220
    auto operator=(shared_mutex&&) -> shared_mutex&      = delete;
221

222
    /**
223
     * Locks the mutex in a shared state.  If there are any exclusive waiters then the shared waiters
224
     * will also wait so the exclusive waiters are not starved.
225
     */
226
    // [[nodiscard]] auto scoped_lock_shared() -> coro::task<detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>>
227
    // {
228
    //     co_await m_mutex.lock();
229
    //     co_return detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>{*this, false};
230
    // }
231
    //
232
    // /**
233
    //  * Locks the mutex in an exclusive state.
234
    //  */
235
    // [[nodiscard]] auto scoped_lock() -> coro::task<detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>>
236
    // {
237
    //     co_await m_mutex.lock();
238
    //     co_return detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>{*this, true};
239
    // }
240

241
    [[nodiscard]] auto lock_shared() -> coro::task<void>
79✔
242
    {
243
        co_await m_mutex.lock();
244
        co_await detail::shared_lock_operation<void, executor_type>{*this, false};
245
        co_return;
246

247
    }
158✔
248

249
    [[nodiscard]] auto lock() -> coro::task<void>
2✔
250
    {
251
        co_await m_mutex.lock();
252
        co_await detail::shared_lock_operation<void, executor_type>{*this, true};
253
        co_return;
254
    }
4✔
255

256
    /**
257
     * @return True if the lock could immediately be acquired in a shared state.
258
     */
259
    [[nodiscard]] auto try_lock_shared() -> bool
3✔
260
    {
261
        // To acquire the shared lock the state must be one of two states:
262
        //   1) unlocked
263
        //   2) shared locked with zero exclusive waiters
264
        //          Zero exclusive waiters prevents exclusive starvation if shared locks are
265
        //          always continuously happening.
266

267
        if (m_mutex.try_lock())
3✔
268
        {
269
            coro::scoped_lock lk{m_mutex};
3✔
270
            return try_lock_shared_locked();
3✔
271
        }
3✔
NEW
272
        return false;
×
273
    }
274

275
    /**
276
     * @return True if the lock could immediately be acquired in an exclusive state.
277
     */
278
    [[nodiscard]] auto try_lock() -> bool
6✔
279
    {
280
        // To acquire the exclusive lock the state must be unlocked.
281
        if (m_mutex.try_lock())
6✔
282
        {
283
            coro::scoped_lock lk{m_mutex};
6✔
284
            return try_lock_locked();
6✔
285
        }
6✔
NEW
286
        return false;
×
287
    }
288

289
    /**
290
     * Unlocks a single shared state user.  *REQUIRES* that the lock was first acquired exactly once
291
     * via `lock_shared()` or `try_lock_shared() -> True` before being called, otherwise undefined
292
     * behavior.
293
     *
294
     * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
295
     * waiter acquires the lock.
296
     */
297
    [[nodiscard]] auto unlock_shared() -> coro::task<void>
81✔
298
    {
299
        auto lk = co_await m_mutex.scoped_lock();
300
        --m_shared_users;
301

302
        // Only wake waiters from shared state if all shared users have completed.
303
        if (m_shared_users == 0)
304
        {
305
            if (m_head_waiter != nullptr)
306
            {
307
                wake_waiters(lk);
308
            }
309
            else
310
            {
311
                m_state = state::unlocked;
312
            }
313
        }
314

315
        co_return;
316
    }
162✔
317

318
    /**
319
     * Unlocks the mutex from its exclusive state.  If there is a following exclusive watier then
320
     * that exclusive waiter acquires the lock.  If there are 1 or more shared waiters then all the
321
     * shared waiters acquire the lock in a shared state in parallel and are resumed on the original
322
     * executor this shared mutex was created with.
323
     */
324
    [[nodiscard]] auto unlock() -> coro::task<void>
6✔
325
    {
326
        auto lk = co_await m_mutex.scoped_lock();
327
        if (m_head_waiter != nullptr)
328
        {
329
            wake_waiters(lk);
330
        }
331
        else
332
        {
333
            m_state = state::unlocked;
334
        }
335

336
        co_return;
337
    }
12✔
338

339
    /**
340
     * @brief Gets the executor that drives the shared mutex.
341
     *
342
     * @return std::shared_ptr<executor_type>
343
     */
NEW
344
    [[nodiscard]] auto executor() -> std::shared_ptr<executor_type>
×
345
    {
NEW
346
        return m_executor;
×
347
    }
348

349
private:
350
    friend struct detail::shared_lock_operation_base<executor_type>;
351
    friend class shared_scoped_lock<executor_type>;
352

353
    enum class state
354
    {
355
        /// @brief The shared mutex is unlocked.
356
        unlocked,
357
        /// @brief The shared mutex is locked in shared mode.
358
        locked_shared,
359
        /// @brief The shared mutex is locked in exclusive mode.
360
        locked_exclusive
361
    };
362

363
    /// @brief This executor is for resuming multiple shared waiters.
364
    std::shared_ptr<executor_type> m_executor{nullptr};
365
    /// @brief Exclusive access for mutating the shared mutex's state.
366
    coro::mutex m_mutex;
367
    /// @brief The current state of the shared mutex.
368
    std::atomic<state> m_state{state::unlocked};
369

370
    /// @brief The current number of shared users that have acquired the lock.
371
    std::atomic<uint64_t> m_shared_users{0};
372
    /// @brief The current number of exclusive waiters waiting to acquire the lock.  This is used to block
373
    ///        new incoming shared lock attempts so the exclusive waiter is not starved.
374
    std::atomic<uint64_t> m_exclusive_waiters{0};
375

376
    std::atomic<detail::shared_lock_operation_base<executor_type>*> m_head_waiter{nullptr};
377
    std::atomic<detail::shared_lock_operation_base<executor_type>*> m_tail_waiter{nullptr};
378

379
    auto try_lock_shared_locked() -> bool
82✔
380
    {
381
        if (m_state == state::unlocked)
82✔
382
        {
383
            // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
384
            m_state = state::locked_shared;
36✔
385
            ++m_shared_users;
36✔
386
            return true;
36✔
387
        }
388
        else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
46✔
389
        {
390
            // If the shared mutex is in a shared locked state and there are no exclusive waiters
391
            // the add ourself as using the lock.
392
            ++m_shared_users;
4✔
393
            return true;
4✔
394
        }
395

396
        // If the lock is in shared mode but there are exclusive waiters then we will also wait so
397
        // the writers are not starved.
398

399
        // If the lock is in exclusive mode already then we need to wait.
400

401
        return false;
42✔
402
    }
403

404
    auto try_lock_locked() -> bool
8✔
405
    {
406
        if (m_state == state::unlocked)
8✔
407
        {
408
            m_state = state::locked_exclusive;
6✔
409
            return true;
6✔
410
        }
411
        return false;
2✔
412
    }
413

414
    auto wake_waiters(coro::scoped_lock& lk) -> void
1✔
415
    {
416
        // First determine what the next lock state will be based on the first waiter.
417
        if (m_head_waiter.load()->m_exclusive)
1✔
418
        {
419
            // If its exclusive then only this waiter can be woken up.
420
            m_state                   = state::locked_exclusive;
×
421
            detail::shared_lock_operation_base<executor_type>* to_resume = m_head_waiter.load();
×
422
            m_head_waiter             = m_head_waiter.load()->m_next;
×
423
            --m_exclusive_waiters;
×
424
            if (m_head_waiter == nullptr)
×
425
            {
426
                m_tail_waiter = nullptr;
×
427
            }
428

429
            // Since this is an exclusive lock waiting we can resume it directly.
430
            lk.unlock();
×
431
            to_resume->m_awaiting_coroutine.resume();
×
432
        }
433
        else
434
        {
435
            // If its shared then we will scan forward and awake all shared waiters onto the given
436
            // thread pool so they can run in parallel.
437
            m_state = state::locked_shared;
1✔
438
            do
439
            {
440
                detail::shared_lock_operation_base<executor_type>* to_resume = m_head_waiter.load();
41✔
441
                m_head_waiter             = m_head_waiter.load()->m_next;
41✔
442
                if (m_head_waiter == nullptr)
41✔
443
                {
444
                    m_tail_waiter = nullptr;
1✔
445
                }
446
                ++m_shared_users;
41✔
447

448
                m_executor->resume(to_resume->m_awaiting_coroutine);
41✔
449
            } while (m_head_waiter != nullptr && !m_head_waiter.load()->m_exclusive);
41✔
450

451
            // Cannot unlock until the entire set of shared waiters has been traversed.  I think this
452
            // makes more sense than allocating space for all the shared waiters, unlocking, and then
453
            // resuming in a batch?
454
            lk.unlock();
1✔
455
        }
456
    }
1✔
457
};
458

459
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc