• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 16354301719

17 Jul 2025 07:35PM UTC coverage: 88.169%. First build
16354301719

Pull #365

github

web-flow
Merge 10a21a572 into f45ca948b
Pull Request #365: coro::shared_mutex use coro::mutex instead of std::mutex

40 of 46 new or added lines in 1 file covered. (86.96%)

1647 of 1868 relevant lines covered (88.17%)

13239744.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.6
/include/coro/shared_mutex.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <coroutine>
9

10
namespace coro
11
{
12
template<concepts::executor executor_type>
13
class shared_mutex;
14

15
/**
16
 * A scoped RAII lock holder for a coro::shared_mutex.  It will call the appropriate unlock() or
17
 * unlock_shared() based on how the coro::shared_mutex was originally acquired, either shared or
18
 * exclusive modes.
19
 */
20
template<concepts::executor executor_type>
21
class shared_scoped_lock
22
{
23
public:
24
    shared_scoped_lock(shared_mutex<executor_type>& sm, bool exclusive) : m_shared_mutex(&sm), m_exclusive(exclusive) {}
86✔
25

26
    /**
27
     * Unlocks the mutex upon this shared scoped lock destructing.
28
     */
29
    ~shared_scoped_lock() { unlock(); }
258✔
30

31
    shared_scoped_lock(const shared_scoped_lock&) = delete;
32
    shared_scoped_lock(shared_scoped_lock&& other)
172✔
33
        : m_shared_mutex(std::exchange(other.m_shared_mutex, nullptr)),
172✔
34
          m_exclusive(other.m_exclusive)
172✔
35
    {
36
    }
172✔
37

38
    auto operator=(const shared_scoped_lock&) -> shared_scoped_lock& = delete;
39
    auto operator=(shared_scoped_lock&& other) noexcept -> shared_scoped_lock&
40
    {
41
        if (std::addressof(other) != this)
42
        {
43
            m_shared_mutex = std::exchange(other.m_shared_mutex, nullptr);
44
            m_exclusive    = other.m_exclusive;
45
        }
46
        return *this;
47
    }
48

49
    /**
50
     * Unlocks the shared mutex prior to this lock going out of scope.
51
     */
52
    auto unlock() -> void
258✔
53
    {
54
        if (m_shared_mutex != nullptr)
258✔
55
        {
56
            if (!m_shared_mutex->m_executor->spawn(make_unlock_task(m_shared_mutex, m_exclusive)))
86✔
57
            {
58
                // If for some reason it fails to spawn block so this mutex isn't unusable.
NEW
59
                coro::sync_wait(make_unlock_task(m_shared_mutex, m_exclusive));
×
60
            }
61
        }
62
    }
258✔
63

64
private:
65
    shared_mutex<executor_type>* m_shared_mutex{nullptr};
66
    bool                         m_exclusive{false};
67

68
    static auto make_unlock_task(shared_mutex<executor_type>* shared_mutex, bool exclusive) -> coro::task<void>
86✔
69
    {
70
        std::cerr << "make_unlock_task() start\n";
71
        // This function is spawned and detached from the lifetime of the unlocking scoped lock.
72

73
        if (shared_mutex != nullptr)
74
        {
75
            if (exclusive)
76
            {
77
                std::cerr << "unlock() exclusive\n";
78
                co_await shared_mutex->unlock();
79
            }
80
            else
81
            {
82
                std::cerr << "unlock() shared\n";
83
                co_await shared_mutex->unlock_shared();
84
            }
85

86
            shared_mutex = nullptr;
87
        }
88
        else
89
        {
90
            std::cerr << "unlock() nullptr\n";
91
        }
92
        std::cerr << "make_unlock_task() end\n";
93
    }
172✔
94
};
95

96
template<concepts::executor executor_type>
97
class shared_mutex
98
{
99
public:
100
    /**
101
     * @param e The executor for when multiple shared waiters can be woken up at the same time,
102
     *          each shared waiter will be scheduled to immediately run on this executor in
103
     *          parallel.
104
     */
105
    explicit shared_mutex(std::shared_ptr<executor_type> e) : m_executor(std::move(e))
3✔
106
    {
107
        if (m_executor == nullptr)
3✔
108
        {
NEW
109
            throw std::runtime_error{"coro::shared_mutex cannot have a nullptr executor"};
×
110
        }
111
    }
3✔
112
    ~shared_mutex() = default;
3✔
113

114
    shared_mutex(const shared_mutex&)                    = delete;
115
    shared_mutex(shared_mutex&&)                         = delete;
116
    auto operator=(const shared_mutex&) -> shared_mutex& = delete;
117
    auto operator=(shared_mutex&&) -> shared_mutex&      = delete;
118

119
    struct lock_operation
120
    {
121
        lock_operation(shared_mutex& sm, bool exclusive) : m_shared_mutex(sm), m_exclusive(exclusive) {}
86✔
122

123
        auto await_ready() const noexcept -> bool
86✔
124
        {
125
            // If either mode can be acquired, unlock the internal mutex and resume.
126

127
            if (m_exclusive && m_shared_mutex.try_lock_locked())
86✔
128
            {
129
                m_shared_mutex.m_mutex.unlock();
2✔
130
                return true;
2✔
131
            }
132
            else if (m_shared_mutex.try_lock_shared_locked())
84✔
133
            {
134
                m_shared_mutex.m_mutex.unlock();
43✔
135
                return true;
43✔
136
            }
137

138
            return false;
41✔
139
        }
140

141
        auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
41✔
142
        {
143
            // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
144
            // at the end of the waiter list.
145

146
            auto* tail_waiter = m_shared_mutex.m_tail_waiter.load(std::memory_order::acquire);
41✔
147

148
            if (tail_waiter == nullptr)
41✔
149
            {
150
                m_shared_mutex.m_head_waiter = this;
1✔
151
                m_shared_mutex.m_tail_waiter = this;
1✔
152
            }
153
            else
154
            {
155
                tail_waiter->m_next = this;
40✔
156
                m_shared_mutex.m_tail_waiter         = this;
40✔
157
            }
158

159
            // If this is an exclusive lock acquire then mark it as so so that shared locks after this
160
            // exclusive one will also suspend so this exclusive lock doens't get starved.
161
            if (m_exclusive)
41✔
162
            {
163
                ++m_shared_mutex.m_exclusive_waiters;
×
164
            }
165

166
            m_awaiting_coroutine = awaiting_coroutine;
41✔
167
            m_shared_mutex.m_mutex.unlock();
41✔
168
            return true;
41✔
169
        }
170

171
        auto await_resume() noexcept -> shared_scoped_lock<executor_type>
86✔
172
        {
173
            return shared_scoped_lock{m_shared_mutex, m_exclusive};
86✔
174
        }
175

176
    private:
177
        friend class shared_mutex;
178

179
        shared_mutex&           m_shared_mutex;
180
        bool                    m_exclusive{false};
181
        std::coroutine_handle<> m_awaiting_coroutine;
182
        lock_operation*         m_next{nullptr};
183
    };
184

185
    /**
186
     * Locks the mutex in a shared state.  If there are any exclusive waiters then the shared waiters
187
     * will also wait so the exclusive waiters are not starved.
188
     */
189
    [[nodiscard]] auto lock_shared() -> coro::task<shared_scoped_lock<executor_type>>
84✔
190
    {
191
        co_await m_mutex.lock();
192
        co_return co_await lock_operation{*this, false};
193
    }
168✔
194

195
    /**
196
     * Locks the mutex in an exclusive state.
197
     */
198
    [[nodiscard]] auto lock() -> coro::task<shared_scoped_lock<executor_type>>
2✔
199
    {
200
        co_await m_mutex.lock();
201
        co_return co_await lock_operation{*this, true};
202
    }
4✔
203

204
    /**
205
     * @return True if the lock could immediately be acquired in a shared state.
206
     */
207
    [[nodiscard]] auto try_lock_shared() -> bool
3✔
208
    {
209
        // To acquire the shared lock the state must be one of two states:
210
        //   1) unlocked
211
        //   2) shared locked with zero exclusive waiters
212
        //          Zero exclusive waiters prevents exclusive starvation if shared locks are
213
        //          always continuously happening.
214

215
        if (m_mutex.try_lock())
3✔
216
        {
217
            coro::scoped_lock lk{m_mutex};
3✔
218
            return try_lock_shared_locked();
3✔
219
        }
3✔
NEW
220
        return false;
×
221
    }
222

223
    /**
224
     * @return True if the lock could immediately be acquired in an exclusive state.
225
     */
226
    [[nodiscard]] auto try_lock() -> bool
8✔
227
    {
228
        // To acquire the exclusive lock the state must be unlocked.
229
        if (m_mutex.try_lock())
8✔
230
        {
231
            coro::scoped_lock lk{m_mutex};
8✔
232
            return try_lock_locked();
8✔
233
        }
8✔
NEW
234
        return false;
×
235
    }
236

237
    /**
238
     * Unlocks a single shared state user.  *REQUIRES* that the lock was first acquired exactly once
239
     * via `lock_shared()` or `try_lock_shared() -> True` before being called, otherwise undefined
240
     * behavior.
241
     *
242
     * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
243
     * waiter acquires the lock.
244
     */
245
    [[nodiscard]] auto unlock_shared() -> coro::task<void>
86✔
246
    {
247
        auto lk = co_await m_mutex.scoped_lock();
248
        --m_shared_users;
249

250
        // Only wake waiters from shared state if all shared users have completed.
251
        if (m_shared_users == 0)
252
        {
253
            if (m_head_waiter != nullptr)
254
            {
255
                wake_waiters(lk);
256
            }
257
            else
258
            {
259
                m_state = state::unlocked;
260
            }
261
        }
262

263
        co_return;
264
    }
172✔
265

266
    /**
267
     * Unlocks the mutex from its exclusive state.  If there is a following exclusive watier then
268
     * that exclusive waiter acquires the lock.  If there are 1 or more shared waiters then all the
269
     * shared waiters acquire the lock in a shared state in parallel and are resumed on the original
270
     * executor this shared mutex was created with.
271
     */
272
    [[nodiscard]] auto unlock() -> coro::task<void>
6✔
273
    {
274
        auto lk = co_await m_mutex.scoped_lock();
275
        if (m_head_waiter != nullptr)
276
        {
277
            wake_waiters(lk);
278
        }
279
        else
280
        {
281
            m_state = state::unlocked;
282
        }
283

284
        co_return;
285
    }
12✔
286

287
    /**
288
     * @brief Gets the executor that drives the shared mutex.
289
     *
290
     * @return std::shared_ptr<executor_type>
291
     */
292
    [[nodiscard]] auto executor() -> std::shared_ptr<executor_type>
2✔
293
    {
294
        return m_executor;
2✔
295
    }
296

297
private:
298
    friend struct lock_operation;
299
    friend struct shared_scoped_lock<executor_type>;
300

301
    enum class state
302
    {
303
        /// @brief The shared mutex is unlocked.
304
        unlocked,
305
        /// @brief The shared mutex is locked in shared mode.
306
        locked_shared,
307
        /// @brief The shared mutex is locked in exclusive mode.
308
        locked_exclusive
309
    };
310

311
    /// @brief This executor is for resuming multiple shared waiters.
312
    std::shared_ptr<executor_type> m_executor{nullptr};
313
    /// @brief Exclusive access for mutating the shared mutex's state.
314
    coro::mutex m_mutex;
315
    /// @brief The current state of the shared mutex.
316
    std::atomic<state> m_state{state::unlocked};
317

318
    /// @brief The current number of shared users that have acquired the lock.
319
    std::atomic<uint64_t> m_shared_users{0};
320
    /// @brief The current number of exclusive waiters waiting to acquire the lock.  This is used to block
321
    ///        new incoming shared lock attempts so the exclusive waiter is not starved.
322
    std::atomic<uint64_t> m_exclusive_waiters{0};
323

324
    std::atomic<lock_operation*> m_head_waiter{nullptr};
325
    std::atomic<lock_operation*> m_tail_waiter{nullptr};
326

327
    auto try_lock_shared_locked() -> bool
87✔
328
    {
329
        if (m_state == state::unlocked)
87✔
330
        {
331
            // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
332
            m_state = state::locked_shared;
44✔
333
            ++m_shared_users;
44✔
334
            return true;
44✔
335
        }
336
        else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
43✔
337
        {
338
            // If the shared mutex is in a shared locked state and there are no exclusive waiters
339
            // the add ourself as using the lock.
340
            ++m_shared_users;
1✔
341
            return true;
1✔
342
        }
343

344
        // If the lock is in shared mode but there are exclusive waiters then we will also wait so
345
        // the writers are not starved.
346

347
        // If the lock is in exclusive mode already then we need to wait.
348

349
        return false;
42✔
350
    }
351

352
    auto try_lock_locked() -> bool
10✔
353
    {
354
        if (m_state == state::unlocked)
10✔
355
        {
356
            m_state = state::locked_exclusive;
6✔
357
            return true;
6✔
358
        }
359
        return false;
4✔
360
    }
361

362
    auto wake_waiters(coro::scoped_lock& lk) -> void
1✔
363
    {
364
        // First determine what the next lock state will be based on the first waiter.
365
        if (m_head_waiter.load()->m_exclusive)
1✔
366
        {
367
            // If its exclusive then only this waiter can be woken up.
368
            m_state                   = state::locked_exclusive;
×
NEW
369
            lock_operation* to_resume = m_head_waiter.load();
×
NEW
370
            m_head_waiter             = m_head_waiter.load()->m_next;
×
371
            --m_exclusive_waiters;
×
372
            if (m_head_waiter == nullptr)
×
373
            {
374
                m_tail_waiter = nullptr;
×
375
            }
376

377
            // Since this is an exclusive lock waiting we can resume it directly.
378
            lk.unlock();
×
379
            to_resume->m_awaiting_coroutine.resume();
×
380
        }
381
        else
382
        {
383
            // If its shared then we will scan forward and awake all shared waiters onto the given
384
            // thread pool so they can run in parallel.
385
            m_state = state::locked_shared;
1✔
386
            do
387
            {
388
                lock_operation* to_resume = m_head_waiter.load();
41✔
389
                m_head_waiter             = m_head_waiter.load()->m_next;
41✔
390
                if (m_head_waiter == nullptr)
41✔
391
                {
392
                    m_tail_waiter = nullptr;
1✔
393
                }
394
                ++m_shared_users;
41✔
395

396
                m_executor->resume(to_resume->m_awaiting_coroutine);
41✔
397
            } while (m_head_waiter != nullptr && !m_head_waiter.load()->m_exclusive);
41✔
398

399
            // Cannot unlock until the entire set of shared waiters has been traversed.  I think this
400
            // makes more sense than allocating space for all the shared waiters, unlocking, and then
401
            // resuming in a batch?
402
            lk.unlock();
1✔
403
        }
404
    }
1✔
405
};
406

407
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc