• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 16697817643

02 Aug 2025 09:15PM UTC coverage: 88.156%. First build
16697817643

Pull #365

github

web-flow
Merge 8435b6aed into 10ca7de0a
Pull Request #365: coro::shared_mutex use coro::mutex instead of std::mutex

33 of 37 new or added lines in 2 files covered. (89.19%)

1645 of 1866 relevant lines covered (88.16%)

13666902.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.95
/include/coro/shared_mutex.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <coroutine>
9

10
namespace coro
11
{
12
template<concepts::executor executor_type>
13
class shared_mutex;
14

15
/**
16
 * A scoped RAII lock holder for a coro::shared_mutex.  It will call the appropriate unlock() or
17
 * unlock_shared() based on how the coro::shared_mutex was originally acquired, either shared or
18
 * exclusive modes.
19
 */
20
template<concepts::executor executor_type>
21
class shared_scoped_lock
22
{
23
public:
24
    shared_scoped_lock(shared_mutex<executor_type>& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {}
25

26
    /**
27
     * Unlocks the mutex upon this shared scoped lock destructing.
28
     */
29
    ~shared_scoped_lock() { unlock(); }
30

31
    shared_scoped_lock(const shared_scoped_lock&) = delete;
32
    shared_scoped_lock(shared_scoped_lock&& other)
33
        : m_shared_mutex(std::exchange(other.m_shared_mutex, nullptr)),
34
          m_exclusive(other.m_exclusive)
35
    {
36
    }
37

38
    auto operator=(const shared_scoped_lock&) -> shared_scoped_lock& = delete;
39
    auto operator=(shared_scoped_lock&& other) noexcept -> shared_scoped_lock&
40
    {
41
        if (std::addressof(other) != this)
42
        {
43
            m_shared_mutex = std::exchange(other.m_shared_mutex, nullptr);
44
            m_exclusive    = other.m_exclusive;
45
        }
46
        return *this;
47
    }
48

49
    /**
50
     * Unlocks the shared mutex prior to this lock going out of scope.
51
     */
52
    auto unlock() -> void
53
    {
54
        if (m_shared_mutex != nullptr)
55
        {
56
            if (!m_shared_mutex->m_executor->spawn(make_unlock_task(m_shared_mutex, m_exclusive)))
57
            {
58
                // If for some reason it fails to spawn block so this mutex isn't unusable.
59
                coro::sync_wait(make_unlock_task(m_shared_mutex, m_exclusive));
60
            }
61
        }
62
    }
63

64
private:
65
    shared_mutex<executor_type>* m_shared_mutex{nullptr};
66
    bool                         m_exclusive{false};
67

68
    static auto make_unlock_task(shared_mutex<executor_type>* shared_mutex, bool exclusive) -> coro::task<void>
69
    {
70
        // This function is spawned and detached from the lifetime of the unlocking scoped lock.
71

72
        if (shared_mutex != nullptr)
73
        {
74
            if (exclusive)
75
            {
76
                co_await shared_mutex->unlock();
77
            }
78
            else
79
            {
80
                co_await shared_mutex->unlock_shared();
81
            }
82

83
            shared_mutex = nullptr;
84
        }
85
    }
86
};
87

88
namespace detail
89
{
90
template<concepts::executor executor_type>
91
struct shared_lock_operation_base
92
{
93
    explicit shared_lock_operation_base(coro::shared_mutex<executor_type>& shared_mutex, bool exclusive)
83✔
94
        : m_shared_mutex(shared_mutex),
83✔
95
          m_exclusive(exclusive)
83✔
96
    {}
83✔
97
    virtual ~shared_lock_operation_base() = default;
83✔
98

99
    shared_lock_operation_base(const shared_lock_operation_base&) = delete;
100
    shared_lock_operation_base(shared_lock_operation_base&&) = delete;
101
    auto operator=(const shared_lock_operation_base&) -> shared_lock_operation_base& = delete;
102
    auto operator=(shared_lock_operation_base&&) -> shared_lock_operation_base& = delete;
103

104
    auto await_ready() const noexcept -> bool
83✔
105
    {
106
        // If either mode can be acquired, unlock the internal mutex and resume.
107

108
        if (m_exclusive)
83✔
109
        {
110
            if (m_shared_mutex.try_lock_locked())
2✔
111
            {
112
                m_shared_mutex.m_mutex.unlock();
2✔
113
                return true;
2✔
114
            }
115
        }
116
        else if (m_shared_mutex.try_lock_shared_locked())
81✔
117
        {
118
            m_shared_mutex.m_mutex.unlock();
37✔
119
            return true;
37✔
120
        }
121

122
        return false;
44✔
123
    }
124

125
    auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
44✔
126
    {
127
        // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
128
        // at the end of the waiter list.
129

130
        auto* tail_waiter = m_shared_mutex.m_tail_waiter.load(std::memory_order::acquire);
44✔
131

132
        if (tail_waiter == nullptr)
44✔
133
        {
134
            m_shared_mutex.m_head_waiter = this;
1✔
135
            m_shared_mutex.m_tail_waiter = this;
1✔
136
        }
137
        else
138
        {
139
            tail_waiter->m_next = this;
43✔
140
            m_shared_mutex.m_tail_waiter         = this;
43✔
141
        }
142

143
        // If this is an exclusive lock acquire then mark it as so so that shared locks after this
144
        // exclusive one will also suspend so this exclusive lock doesn't get starved.
145
        if (m_exclusive)
44✔
146
        {
147
            ++m_shared_mutex.m_exclusive_waiters;
×
148
        }
149

150
        m_awaiting_coroutine = awaiting_coroutine;
44✔
151
        m_shared_mutex.m_mutex.unlock();
44✔
152
        return true;
44✔
153
    }
154

155
protected:
156
    friend class coro::shared_mutex<executor_type>;
157

158
    std::coroutine_handle<> m_awaiting_coroutine;
159
    shared_lock_operation_base* m_next{nullptr};
160
    coro::shared_mutex<executor_type>& m_shared_mutex;
161
    bool m_exclusive{false};
162
};
163

164
template<typename return_type, concepts::executor executor_type>
165
struct shared_lock_operation : public shared_lock_operation_base<executor_type>
166
{
167
    explicit shared_lock_operation(coro::shared_mutex<executor_type>& shared_mutex, bool exclusive)
83✔
168
        : shared_lock_operation_base<executor_type>(shared_mutex, exclusive)
83✔
169
    {}
83✔
170
    ~shared_lock_operation() override = default;
83✔
171

172
    shared_lock_operation(const shared_lock_operation&) = delete;
173
    shared_lock_operation(shared_lock_operation&&) = delete;
174
    auto operator=(const shared_lock_operation&) -> shared_lock_operation& = delete;
175
    auto operator=(shared_lock_operation&&) -> shared_lock_operation& = delete;
176

177
    auto await_resume() noexcept -> return_type
83✔
178
    {
179
        if constexpr (std::is_same_v<shared_scoped_lock<executor_type>, return_type>)
180
        {
181
            return shared_scoped_lock<executor_type>{this->m_shared_mutex, this->m_exclusive};
182
        }
183
        else
184
        {
185
            return;
83✔
186
        }
187
    }
188
};
189
} // namespace detail
190

191
template<concepts::executor executor_type>
192
class shared_mutex
193
{
194
public:
195
    /**
196
     * @param e The executor for when multiple shared waiters can be woken up at the same time,
197
     *          each shared waiter will be scheduled to immediately run on this executor in
198
     *          parallel.
199
     */
200
    explicit shared_mutex(std::shared_ptr<executor_type> e) : m_executor(std::move(e))
3✔
201
    {
202
        if (m_executor == nullptr)
3✔
203
        {
NEW
204
            throw std::runtime_error{"coro::shared_mutex cannot have a nullptr executor"};
×
205
        }
206
    }
3✔
207
    ~shared_mutex() = default;
3✔
208

209
    shared_mutex(const shared_mutex&)                    = delete;
210
    shared_mutex(shared_mutex&&)                         = delete;
211
    auto operator=(const shared_mutex&) -> shared_mutex& = delete;
212
    auto operator=(shared_mutex&&) -> shared_mutex&      = delete;
213

214
    /**
215
     * Locks the mutex in a shared state.  If there are any exclusive waiters then the shared waiters
216
     * will also wait so the exclusive waiters are not starved.
217
     */
218
    // [[nodiscard]] auto scoped_lock_shared() -> coro::task<detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>>
219
    // {
220
    //     co_await m_mutex.lock();
221
    //     co_return detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>{*this, false};
222
    // }
223
    //
224
    // /**
225
    //  * Locks the mutex in an exclusive state.
226
    //  */
227
    // [[nodiscard]] auto scoped_lock() -> coro::task<detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>>
228
    // {
229
    //     co_await m_mutex.lock();
230
    //     co_return detail::shared_lock_operation<shared_scoped_lock<executor_type>, executor_type>{*this, true};
231
    // }
232

233
    [[nodiscard]] auto lock_shared() -> coro::task<void>
81✔
234
    {
235
        co_await m_mutex.lock();
236
        co_await detail::shared_lock_operation<void, executor_type>{*this, false};
237
        co_return;
238

239
    }
162✔
240

241
    [[nodiscard]] auto lock() -> coro::task<void>
2✔
242
    {
243
        co_await m_mutex.lock();
244
        co_await detail::shared_lock_operation<void, executor_type>{*this, true};
245
        co_return;
246
    }
4✔
247

248
    /**
249
     * @return True if the lock could immediately be acquired in a shared state.
250
     */
251
    [[nodiscard]] auto try_lock_shared() -> bool
3✔
252
    {
253
        // To acquire the shared lock the state must be one of two states:
254
        //   1) unlocked
255
        //   2) shared locked with zero exclusive waiters
256
        //          Zero exclusive waiters prevents exclusive starvation if shared locks are
257
        //          always continuously happening.
258

259
        if (m_mutex.try_lock())
3✔
260
        {
261
            coro::scoped_lock lk{m_mutex};
3✔
262
            return try_lock_shared_locked();
3✔
263
        }
3✔
264
        return false;
×
265
    }
266

267
    /**
268
     * @return True if the lock could immediately be acquired in an exclusive state.
269
     */
270
    [[nodiscard]] auto try_lock() -> bool
6✔
271
    {
272
        // To acquire the exclusive lock the state must be unlocked.
273
        if (m_mutex.try_lock())
6✔
274
        {
275
            coro::scoped_lock lk{m_mutex};
6✔
276
            return try_lock_locked();
6✔
277
        }
6✔
NEW
278
        return false;
×
279
    }
280

281
    /**
282
     * Unlocks a single shared state user.  *REQUIRES* that the lock was first acquired exactly once
283
     * via `lock_shared()` or `try_lock_shared() -> True` before being called, otherwise undefined
284
     * behavior.
285
     *
286
     * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
287
     * waiter acquires the lock.
288
     */
289
    [[nodiscard]] auto unlock_shared() -> coro::task<void>
83✔
290
    {
291
        auto lk = co_await m_mutex.scoped_lock();
292
        --m_shared_users;
293

294
        // Only wake waiters from shared state if all shared users have completed.
295
        if (m_shared_users == 0)
296
        {
297
            if (m_head_waiter != nullptr)
298
            {
299
                wake_waiters(lk);
300
            }
301
            else
302
            {
303
                m_state = state::unlocked;
304
            }
305
        }
306

307
        co_return;
308
    }
166✔
309

310
    /**
311
     * Unlocks the mutex from its exclusive state.  If there is a following exclusive watier then
312
     * that exclusive waiter acquires the lock.  If there are 1 or more shared waiters then all the
313
     * shared waiters acquire the lock in a shared state in parallel and are resumed on the original
314
     * executor this shared mutex was created with.
315
     */
316
    [[nodiscard]] auto unlock() -> coro::task<void>
6✔
317
    {
318
        auto lk = co_await m_mutex.scoped_lock();
319
        if (m_head_waiter != nullptr)
320
        {
321
            wake_waiters(lk);
322
        }
323
        else
324
        {
325
            m_state = state::unlocked;
326
        }
327

328
        co_return;
329
    }
12✔
330

331
    /**
332
     * @brief Gets the executor that drives the shared mutex.
333
     *
334
     * @return std::shared_ptr<executor_type>
335
     */
NEW
336
    [[nodiscard]] auto executor() -> std::shared_ptr<executor_type>
×
337
    {
NEW
338
        return m_executor;
×
339
    }
340

341
private:
342
    friend struct detail::shared_lock_operation_base<executor_type>;
343
    friend class shared_scoped_lock<executor_type>;
344

345
    enum class state
346
    {
347
        /// @brief The shared mutex is unlocked.
348
        unlocked,
349
        /// @brief The shared mutex is locked in shared mode.
350
        locked_shared,
351
        /// @brief The shared mutex is locked in exclusive mode.
352
        locked_exclusive
353
    };
354

355
    /// @brief This executor is for resuming multiple shared waiters.
356
    std::shared_ptr<executor_type> m_executor{nullptr};
357
    /// @brief Exclusive access for mutating the shared mutex's state.
358
    coro::mutex m_mutex;
359
    /// @brief The current state of the shared mutex.
360
    std::atomic<state> m_state{state::unlocked};
361

362
    /// @brief The current number of shared users that have acquired the lock.
363
    std::atomic<uint64_t> m_shared_users{0};
364
    /// @brief The current number of exclusive waiters waiting to acquire the lock.  This is used to block
365
    ///        new incoming shared lock attempts so the exclusive waiter is not starved.
366
    std::atomic<uint64_t> m_exclusive_waiters{0};
367

368
    std::atomic<detail::shared_lock_operation_base<executor_type>*> m_head_waiter{nullptr};
369
    std::atomic<detail::shared_lock_operation_base<executor_type>*> m_tail_waiter{nullptr};
370

371
    auto try_lock_shared_locked() -> bool
84✔
372
    {
373
        if (m_state == state::unlocked)
84✔
374
        {
375
            // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
376
            m_state = state::locked_shared;
38✔
377
            ++m_shared_users;
38✔
378
            return true;
38✔
379
        }
380
        else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
46✔
381
        {
382
            // If the shared mutex is in a shared locked state and there are no exclusive waiters
383
            // the add ourself as using the lock.
384
            ++m_shared_users;
1✔
385
            return true;
1✔
386
        }
387

388
        // If the lock is in shared mode but there are exclusive waiters then we will also wait so
389
        // the writers are not starved.
390

391
        // If the lock is in exclusive mode already then we need to wait.
392

393
        return false;
45✔
394
    }
395

396
    auto try_lock_locked() -> bool
8✔
397
    {
398
        if (m_state == state::unlocked)
8✔
399
        {
400
            m_state = state::locked_exclusive;
6✔
401
            return true;
6✔
402
        }
403
        return false;
2✔
404
    }
405

406
    auto wake_waiters(coro::scoped_lock& lk) -> void
1✔
407
    {
408
        // First determine what the next lock state will be based on the first waiter.
409
        if (m_head_waiter.load()->m_exclusive)
1✔
410
        {
411
            // If its exclusive then only this waiter can be woken up.
412
            m_state                   = state::locked_exclusive;
×
413
            detail::shared_lock_operation_base<executor_type>* to_resume = m_head_waiter.load();
×
414
            m_head_waiter             = m_head_waiter.load()->m_next;
×
415
            --m_exclusive_waiters;
×
416
            if (m_head_waiter == nullptr)
×
417
            {
418
                m_tail_waiter = nullptr;
×
419
            }
420

421
            // Since this is an exclusive lock waiting we can resume it directly.
422
            lk.unlock();
×
423
            to_resume->m_awaiting_coroutine.resume();
×
424
        }
425
        else
426
        {
427
            // If its shared then we will scan forward and awake all shared waiters onto the given
428
            // thread pool so they can run in parallel.
429
            m_state = state::locked_shared;
1✔
430
            do
431
            {
432
                detail::shared_lock_operation_base<executor_type>* to_resume = m_head_waiter.load();
44✔
433
                m_head_waiter             = m_head_waiter.load()->m_next;
44✔
434
                if (m_head_waiter == nullptr)
44✔
435
                {
436
                    m_tail_waiter = nullptr;
1✔
437
                }
438
                ++m_shared_users;
44✔
439

440
                m_executor->resume(to_resume->m_awaiting_coroutine);
44✔
441
            } while (m_head_waiter != nullptr && !m_head_waiter.load()->m_exclusive);
44✔
442

443
            // Cannot unlock until the entire set of shared waiters has been traversed.  I think this
444
            // makes more sense than allocating space for all the shared waiters, unlocking, and then
445
            // resuming in a batch?
446
            lk.unlock();
1✔
447
        }
448
    }
1✔
449
};
450

451
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc