• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 24749973918

21 Apr 2026 10:35PM UTC coverage: 86.948%. First build
24749973918

Pull #450

github

web-flow
Merge c27bef5ba into 277467ffd
Pull Request #450: coro::thread_pool_ws (work stealing)

294 of 309 new or added lines in 17 files covered. (95.15%)

2105 of 2421 relevant lines covered (86.95%)

4453278.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.71
/include/coro/shared_mutex.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <coroutine>
9

10
namespace coro
11
{
12
template<concepts::executor executor_type>
13
class shared_mutex;
14

15
namespace detail
16
{
17
template<concepts::executor executor_type>
18
struct shared_lock_operation
19
{
20
    explicit shared_lock_operation(coro::shared_mutex<executor_type>& shared_mutex, const bool exclusive)
100✔
21
        : m_shared_mutex(shared_mutex),
100✔
22
          m_exclusive(exclusive)
100✔
23
    {
24
    }
100✔
25
    ~shared_lock_operation() = default;
26

27
    shared_lock_operation(const shared_lock_operation&)                    = delete;
28
    shared_lock_operation(shared_lock_operation&&)                         = delete;
29
    auto operator=(const shared_lock_operation&) -> shared_lock_operation& = delete;
30
    auto operator=(shared_lock_operation&&) -> shared_lock_operation&      = delete;
31

32
    auto await_ready() const noexcept -> bool
100✔
33
    {
34
        // If either mode can be acquired, unlock the internal mutex and resume.
35

36
        if (m_exclusive)
100✔
37
        {
38
            if (m_shared_mutex.try_lock_locked())
3✔
39
            {
40
                m_shared_mutex.m_mutex.unlock();
3✔
41
                return true;
3✔
42
            }
43
        }
44
        else if (m_shared_mutex.try_lock_shared_locked())
97✔
45
        {
46
            m_shared_mutex.m_mutex.unlock();
50✔
47
            return true;
50✔
48
        }
49

50
        return false;
47✔
51
    }
52

53
    auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
47✔
54
    {
55
        // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
56
        // at the end of the waiter list.
57

58
        auto* tail_waiter = m_shared_mutex.m_tail_waiter.load(std::memory_order::acquire);
47✔
59

60
        if (tail_waiter == nullptr)
47✔
61
        {
62
            m_shared_mutex.m_head_waiter = this;
1✔
63
            m_shared_mutex.m_tail_waiter = this;
1✔
64
        }
65
        else
66
        {
67
            tail_waiter->m_next          = this;
46✔
68
            m_shared_mutex.m_tail_waiter = this;
46✔
69
        }
70

71
        // If this is an exclusive lock acquire then mark it as so so that shared locks after this
72
        // exclusive one will also suspend so this exclusive lock doesn't get starved.
73
        if (m_exclusive)
47✔
74
        {
75
            ++m_shared_mutex.m_exclusive_waiters;
×
76
        }
77

78
        m_awaiting_coroutine.store(awaiting_coroutine, std::memory_order::release);
47✔
79
        m_shared_mutex.m_mutex.unlock();
47✔
80
        return true;
47✔
81
    }
82

83
    auto await_resume() noexcept -> void {}
100✔
84

85
protected:
86
    friend class coro::shared_mutex<executor_type>;
87

88
    shared_lock_operation*               m_next{nullptr};
89
    std::atomic<std::coroutine_handle<>> m_awaiting_coroutine;
90
    coro::shared_mutex<executor_type>&   m_shared_mutex;
91
    bool                                 m_exclusive{false};
92
};
93

94
} // namespace detail
95

96
template<concepts::executor executor_type>
97
class shared_mutex
98
{
99
public:
100
    /**
101
     * @param e The executor for when multiple shared waiters can be woken up at the same time,
102
     *          each shared waiter will be scheduled to immediately run on this executor in
103
     *          parallel.
104
     */
105
    explicit shared_mutex(std::unique_ptr<executor_type>& e) : m_executor(e.get())
5✔
106
    {
107
        if (m_executor == nullptr)
5✔
108
        {
109
            throw std::runtime_error{"coro::shared_mutex cannot have a nullptr executor"};
×
110
        }
111
    }
5✔
112
    ~shared_mutex() = default;
113

114
    shared_mutex(const shared_mutex&)                    = delete;
115
    shared_mutex(shared_mutex&&)                         = delete;
116
    auto operator=(const shared_mutex&) -> shared_mutex& = delete;
117
    auto operator=(shared_mutex&&) -> shared_mutex&      = delete;
118

119
    /**
120
     * Acquires the lock in a shared state, executes the scoped task, and then unlocks the shared lock.
121
     * Because unlocking a coro::shared_mutex is a task this scoped version cannot be returned as a RAII
122
     * object due to destructors not being able to be co_await'ed.
123
     * @param scoped_task The user's scoped task to execute after acquiring the shared lock.
124
     */
125
    [[nodiscard]] auto scoped_lock_shared(coro::task<void> scoped_task) -> coro::task<void>
1✔
126
    {
127
        co_await m_mutex.lock();
128
        co_await detail::shared_lock_operation<executor_type>{*this, false};
129
        co_await scoped_task;
130
        co_await unlock_shared();
131
        co_return;
132
    }
2✔
133

134
    /**
135
     * Acquires the lock in an exclusive state, executes the scoped task, and then unlocks the exclusive lock.
136
     * Because unlocking a coro::shared_mutex is a task this scoped version cannot be returned as a RAII
137
     * object due to destructors not being able to be co_await'ed.
138
     * @param scoped_task The user's scoped task to execute after acquiring the exclusive lock.
139
     */
140
    [[nodiscard]] auto scoped_lock(coro::task<void> scoped_task) -> coro::task<void>
1✔
141
    {
142
        co_await m_mutex.lock();
143
        co_await detail::shared_lock_operation<executor_type>{*this, true};
144
        co_await scoped_task;
145
        co_await unlock();
146
        co_return;
147
    }
2✔
148

149
    /**
150
     * Acquires the lock in a shared state. The shared_mutex must be unlock_shared() to release.
151
     * @return task
152
     */
153
    [[nodiscard]] auto lock_shared() -> coro::task<void>
96✔
154
    {
155
        co_await m_mutex.lock();
156
        co_await detail::shared_lock_operation<executor_type>{*this, false};
157
        co_return;
158
    }
192✔
159

160
    /**
161
     * Acquires the lock in an exclusive state. The shared_mutex must be unlock()'ed to release.
162
     * @return task
163
     */
164
    [[nodiscard]] auto lock() -> coro::task<void>
2✔
165
    {
166
        co_await m_mutex.lock();
167
        co_await detail::shared_lock_operation<executor_type>{*this, true};
168
        co_return;
169
    }
4✔
170

171
    /**
172
     * @return True if the lock could immediately be acquired in a shared state.
173
     */
174
    [[nodiscard]] auto try_lock_shared() -> bool
6✔
175
    {
176
        // To acquire the shared lock the state must be one of two states:
177
        //   1) unlocked
178
        //   2) shared locked with zero exclusive waiters
179
        //          Zero exclusive waiters prevents exclusive starvation if shared locks are
180
        //          always continuously happening.
181

182
        if (m_mutex.try_lock())
6✔
183
        {
184
            coro::scoped_lock lk{m_mutex};
6✔
185
            return try_lock_shared_locked();
6✔
186
        }
6✔
187
        return false;
×
188
    }
189

190
    /**
191
     * @return True if the lock could immediately be acquired in an exclusive state.
192
     */
193
    [[nodiscard]] auto try_lock() -> bool
12✔
194
    {
195
        // To acquire the exclusive lock the state must be unlocked.
196
        if (m_mutex.try_lock())
12✔
197
        {
198
            coro::scoped_lock lk{m_mutex};
12✔
199
            return try_lock_locked();
12✔
200
        }
12✔
201
        return false;
×
202
    }
203

204
    /**
205
     * Unlocks a single shared state user. *REQUIRES* that the lock was first acquired exactly once
206
     * via `lock_shared()` or `try_lock_shared() -> True` before being called, otherwise undefined
207
     * behavior.
208
     *
209
     * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
210
     * waiter acquires the lock.
211
     */
212
    [[nodiscard]] auto unlock_shared() -> coro::task<void>
101✔
213
    {
214
        auto lk    = co_await m_mutex.scoped_lock();
215
        auto users = m_shared_users.fetch_sub(1, std::memory_order::acq_rel);
216

217
        // If this is the final unlock_shared() see if there is anyone to wakeup.
218
        if (users == 1)
219
        {
220
            auto* head_waiter = m_head_waiter.load(std::memory_order::acquire);
221
            if (head_waiter != nullptr)
222
            {
223
                wake_waiters(lk, head_waiter);
224
            }
225
            else
226
            {
227
                m_state = state::unlocked;
228
            }
229
        }
230

231
        co_return;
232
    }
202✔
233

234
    /**
235
     * Unlocks the mutex from its exclusive state. If there is a following exclusive waiter then
236
     * that exclusive waiter acquires the lock.  If there are 1 or more shared waiters then all the
237
     * shared waiters acquire the lock in a shared state in parallel and are resumed on the original
238
     * executor this shared mutex was created with.
239
     */
240
    [[nodiscard]] auto unlock() -> coro::task<void>
11✔
241
    {
242
        auto  lk          = co_await m_mutex.scoped_lock();
243
        auto* head_waiter = m_head_waiter.load(std::memory_order::acquire);
244
        if (head_waiter != nullptr)
245
        {
246
            wake_waiters(lk, head_waiter);
247
        }
248
        else
249
        {
250
            m_state = state::unlocked;
251
        }
252

253
        co_return;
254
    }
22✔
255

256
    /**
257
     * @brief Gets the executor that drives the shared mutex.
258
     *
259
     * @return executor_type&
260
     */
NEW
261
    [[nodiscard]] auto executor() -> executor_type& { return *m_executor; }
×
262

263
private:
264
    friend struct detail::shared_lock_operation<executor_type>;
265

266
    enum class state
267
    {
268
        /// @brief The shared mutex is unlocked.
269
        unlocked,
270
        /// @brief The shared mutex is locked in shared mode.
271
        locked_shared,
272
        /// @brief The shared mutex is locked in exclusive mode.
273
        locked_exclusive
274
    };
275

276
    /// @brief This executor is for resuming multiple shared waiters.
277
    executor_type* m_executor{nullptr};
278
    /// @brief Exclusive access for mutating the shared mutex's state.
279
    coro::mutex m_mutex;
280
    /// @brief The current state of the shared mutex.
281
    std::atomic<state> m_state{state::unlocked};
282

283
    /// @brief The current number of shared users that have acquired the lock.
284
    std::atomic<uint64_t> m_shared_users{0};
285
    /// @brief The current number of exclusive waiters waiting to acquire the lock.  This is used to block
286
    ///        new incoming shared lock attempts so the exclusive waiter is not starved.
287
    std::atomic<uint64_t> m_exclusive_waiters{0};
288

289
    std::atomic<detail::shared_lock_operation<executor_type>*> m_head_waiter{nullptr};
290
    std::atomic<detail::shared_lock_operation<executor_type>*> m_tail_waiter{nullptr};
291

292
    auto try_lock_shared_locked() -> bool
103✔
293
    {
294
        if (m_state == state::unlocked)
103✔
295
        {
296
            // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
297
            m_state = state::locked_shared;
52✔
298
            ++m_shared_users;
52✔
299
            return true;
52✔
300
        }
301
        else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
51✔
302
        {
303
            // If the shared mutex is in a shared locked state and there are no exclusive waiters
304
            // the add ourself as using the lock.
305
            ++m_shared_users;
2✔
306
            return true;
2✔
307
        }
308

309
        // If the lock is in shared mode but there are exclusive waiters then we will also wait so
310
        // the writers are not starved.
311

312
        // If the lock is in exclusive mode already then we need to wait.
313

314
        return false;
49✔
315
    }
316

317
    auto try_lock_locked() -> bool
15✔
318
    {
319
        if (m_state == state::unlocked)
15✔
320
        {
321
            m_state = state::locked_exclusive;
11✔
322
            return true;
11✔
323
        }
324
        return false;
4✔
325
    }
326

327
    auto wake_waiters(coro::scoped_lock& lk, detail::shared_lock_operation<executor_type>* head_waiter) -> void
1✔
328
    {
329
        // First determine what the next lock state will be based on the first waiter.
330
        if (head_waiter->m_exclusive)
1✔
331
        {
332
            // If its exclusive then only this waiter can be woken up.
333
            m_state.store(state::locked_exclusive, std::memory_order::release);
×
334
            if (head_waiter->m_next == nullptr)
×
335
            {
336
                // This is the final waiter, set the list to null.
337
                m_head_waiter.store(nullptr, std::memory_order::release);
×
338
                m_tail_waiter.store(nullptr, std::memory_order::release);
×
339
            }
340
            else
341
            {
342
                // Advance the head waiter to next.
343
                m_head_waiter.store(head_waiter->m_next, std::memory_order::release);
×
344
            }
345

346
            m_exclusive_waiters.fetch_sub(1, std::memory_order::release);
×
347

348
            // Since this is an exclusive lock waiting we can resume it directly.
349
            lk.unlock();
×
NEW
350
            head_waiter->m_awaiting_coroutine.load(std::memory_order::acquire).resume();
×
351
        }
352
        else
353
        {
354
            // If its shared then we will scan forward and awake all shared waiters onto the given
355
            // thread pool so they can run in parallel.
356
            m_state.store(state::locked_shared, std::memory_order::release);
1✔
357
            while (true)
47✔
358
            {
359
                auto* to_resume = m_head_waiter.load(std::memory_order::acquire);
48✔
360
                if (to_resume == nullptr || to_resume->m_exclusive)
48✔
361
                {
362
                    break;
363
                }
364

365
                if (to_resume->m_next == nullptr)
47✔
366
                {
367
                    m_head_waiter.store(nullptr, std::memory_order::release);
1✔
368
                    m_tail_waiter.store(nullptr, std::memory_order::release);
1✔
369
                }
370
                else
371
                {
372
                    m_head_waiter.store(to_resume->m_next, std::memory_order::release);
46✔
373
                }
374

375
                m_shared_users.fetch_add(1, std::memory_order::release);
47✔
376

377
                m_executor->resume(to_resume->m_awaiting_coroutine.load(std::memory_order::acquire));
47✔
378
            }
379

380
            // Cannot unlock until the entire set of shared waiters has been traversed. I think this
381
            // makes more sense than allocating space for all the shared waiters, unlocking, and then
382
            // resuming in a batch?
383
            lk.unlock();
1✔
384
        }
385
    }
1✔
386
};
387

388
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc