• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

jbaldwin / libcoro / 18355807267

08 Oct 2025 07:27PM UTC coverage: 88.651%. First build
18355807267

Pull #400

github

web-flow
Merge 82d22144f into 749e5b474
Pull Request #400: Executor types remove circular refs

48 of 56 new or added lines in 15 files covered. (85.71%)

1656 of 1868 relevant lines covered (88.65%)

5382608.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.78
/include/coro/shared_mutex.hpp
1
#pragma once
2

3
#include "coro/concepts/executor.hpp"
4
#include "coro/mutex.hpp"
5
#include "coro/task.hpp"
6

7
#include <atomic>
8
#include <coroutine>
9

10
namespace coro
11
{
12
template<concepts::executor executor_type>
13
class shared_mutex;
14

15
namespace detail
16
{
17
template<concepts::executor executor_type>
18
struct shared_lock_operation
19
{
20
    explicit shared_lock_operation(coro::shared_mutex<executor_type>& shared_mutex, const bool exclusive)
100✔
21
        : m_shared_mutex(shared_mutex),
100✔
22
          m_exclusive(exclusive)
100✔
23
    {}
100✔
24
    ~shared_lock_operation() = default;
25

26
    shared_lock_operation(const shared_lock_operation&) = delete;
27
    shared_lock_operation(shared_lock_operation&&) = delete;
28
    auto operator=(const shared_lock_operation&) -> shared_lock_operation& = delete;
29
    auto operator=(shared_lock_operation&&) -> shared_lock_operation& = delete;
30

31
    auto await_ready() const noexcept -> bool
100✔
32
    {
33
        // If either mode can be acquired, unlock the internal mutex and resume.
34

35
        if (m_exclusive)
100✔
36
        {
37
            if (m_shared_mutex.try_lock_locked())
3✔
38
            {
39
                m_shared_mutex.m_mutex.unlock();
3✔
40
                return true;
3✔
41
            }
42
        }
43
        else if (m_shared_mutex.try_lock_shared_locked())
97✔
44
        {
45
            m_shared_mutex.m_mutex.unlock();
50✔
46
            return true;
50✔
47
        }
48

49
        return false;
47✔
50
    }
51

52
    auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
47✔
53
    {
54
        // For sure the lock is currently held in a manner that it cannot be acquired, suspend ourself
55
        // at the end of the waiter list.
56

57
        auto* tail_waiter = m_shared_mutex.m_tail_waiter.load(std::memory_order::acquire);
47✔
58

59
        if (tail_waiter == nullptr)
47✔
60
        {
61
            m_shared_mutex.m_head_waiter = this;
1✔
62
            m_shared_mutex.m_tail_waiter = this;
1✔
63
        }
64
        else
65
        {
66
            tail_waiter->m_next = this;
46✔
67
            m_shared_mutex.m_tail_waiter         = this;
46✔
68
        }
69

70
        // If this is an exclusive lock acquire then mark it as so so that shared locks after this
71
        // exclusive one will also suspend so this exclusive lock doesn't get starved.
72
        if (m_exclusive)
47✔
73
        {
74
            ++m_shared_mutex.m_exclusive_waiters;
×
75
        }
76

77
        m_awaiting_coroutine = awaiting_coroutine;
47✔
78
        m_shared_mutex.m_mutex.unlock();
47✔
79
        return true;
47✔
80
    }
81

82
    auto await_resume() noexcept -> void { }
100✔
83

84
protected:
85
    friend class coro::shared_mutex<executor_type>;
86

87
    std::coroutine_handle<> m_awaiting_coroutine;
88
    shared_lock_operation* m_next{nullptr};
89
    coro::shared_mutex<executor_type>& m_shared_mutex;
90
    bool m_exclusive{false};
91
};
92

93
} // namespace detail
94

95
template<concepts::executor executor_type>
96
class shared_mutex
97
{
98
public:
99
    /**
100
     * @param e The executor for when multiple shared waiters can be woken up at the same time,
101
     *          each shared waiter will be scheduled to immediately run on this executor in
102
     *          parallel.
103
     */
104
    explicit shared_mutex(std::unique_ptr<executor_type>& e) : m_executor(e.get())
5✔
105
    {
106
        if (m_executor == nullptr)
5✔
107
        {
108
            throw std::runtime_error{"coro::shared_mutex cannot have a nullptr executor"};
×
109
        }
110
    }
5✔
111
    ~shared_mutex() = default;
112

113
    shared_mutex(const shared_mutex&)                    = delete;
114
    shared_mutex(shared_mutex&&)                         = delete;
115
    auto operator=(const shared_mutex&) -> shared_mutex& = delete;
116
    auto operator=(shared_mutex&&) -> shared_mutex&      = delete;
117

118
    /**
119
     * Acquires the lock in a shared state, executes the scoped task, and then unlocks the shared lock.
120
     * Because unlocking a coro::shared_mutex is a task this scoped version cannot be returned as a RAII
121
     * object due to destructors not being able to be co_await'ed.
122
     * @param scoped_task The user's scoped task to execute after acquiring the shared lock.
123
     */
124
    [[nodiscard]] auto scoped_lock_shared(coro::task<void> scoped_task) -> coro::task<void>
1✔
125
    {
126
        co_await m_mutex.lock();
127
        co_await detail::shared_lock_operation<executor_type>{*this, false};
128
        co_await scoped_task;
129
        co_await unlock_shared();
130
        co_return;
131
    }
2✔
132

133
    /**
134
     * Acquires the lock in an exclusive state, executes the scoped task, and then unlocks the exclusive lock.
135
     * Because unlocking a coro::shared_mutex is a task this scoped version cannot be returned as a RAII
136
     * object due to destructors not being able to be co_await'ed.
137
     * @param scoped_task The user's scoped task to execute after acquiring the exclusive lock.
138
     */
139
    [[nodiscard]] auto scoped_lock(coro::task<void> scoped_task) -> coro::task<void>
1✔
140
    {
141
        co_await m_mutex.lock();
142
        co_await detail::shared_lock_operation<executor_type>{*this, true};
143
        co_await scoped_task;
144
        co_await unlock();
145
        co_return;
146
    }
2✔
147

148
    /**
149
     * Acquires the lock in a shared state. The shared_mutex must be unlock_shared() to release.
150
     * @return task
151
     */
152
    [[nodiscard]] auto lock_shared() -> coro::task<void>
96✔
153
    {
154
        co_await m_mutex.lock();
155
        co_await detail::shared_lock_operation<executor_type>{*this, false};
156
        co_return;
157
    }
192✔
158

159
    /**
160
     * Acquires the lock in an exclusive state. The shared_mutex must be unlock()'ed to release.
161
     * @return task
162
     */
163
    [[nodiscard]] auto lock() -> coro::task<void>
2✔
164
    {
165
        co_await m_mutex.lock();
166
        co_await detail::shared_lock_operation<executor_type>{*this, true};
167
        co_return;
168
    }
4✔
169

170
    /**
171
     * @return True if the lock could immediately be acquired in a shared state.
172
     */
173
    [[nodiscard]] auto try_lock_shared() -> bool
6✔
174
    {
175
        // To acquire the shared lock the state must be one of two states:
176
        //   1) unlocked
177
        //   2) shared locked with zero exclusive waiters
178
        //          Zero exclusive waiters prevents exclusive starvation if shared locks are
179
        //          always continuously happening.
180

181
        if (m_mutex.try_lock())
6✔
182
        {
183
            coro::scoped_lock lk{m_mutex};
6✔
184
            return try_lock_shared_locked();
6✔
185
        }
6✔
186
        return false;
×
187
    }
188

189
    /**
190
     * @return True if the lock could immediately be acquired in an exclusive state.
191
     */
192
    [[nodiscard]] auto try_lock() -> bool
12✔
193
    {
194
        // To acquire the exclusive lock the state must be unlocked.
195
        if (m_mutex.try_lock())
12✔
196
        {
197
            coro::scoped_lock lk{m_mutex};
12✔
198
            return try_lock_locked();
12✔
199
        }
12✔
200
        return false;
×
201
    }
202

203
    /**
204
     * Unlocks a single shared state user. *REQUIRES* that the lock was first acquired exactly once
205
     * via `lock_shared()` or `try_lock_shared() -> True` before being called, otherwise undefined
206
     * behavior.
207
     *
208
     * If the shared user count drops to zero and this lock has an exclusive waiter then the exclusive
209
     * waiter acquires the lock.
210
     */
211
    [[nodiscard]] auto unlock_shared() -> coro::task<void>
101✔
212
    {
213
        auto lk = co_await m_mutex.scoped_lock();
214
        auto users = m_shared_users.fetch_sub(1, std::memory_order::acq_rel);
215

216
        // If this is the final unlock_shared() see if there is anyone to wakeup.
217
        if (users == 1)
218
        {
219
            auto* head_waiter = m_head_waiter.load(std::memory_order::acquire);
220
            if (head_waiter != nullptr)
221
            {
222
                wake_waiters(lk, head_waiter);
223
            }
224
            else
225
            {
226
                m_state = state::unlocked;
227
            }
228
        }
229

230
        co_return;
231
    }
202✔
232

233
    /**
234
     * Unlocks the mutex from its exclusive state. If there is a following exclusive waiter then
235
     * that exclusive waiter acquires the lock.  If there are 1 or more shared waiters then all the
236
     * shared waiters acquire the lock in a shared state in parallel and are resumed on the original
237
     * executor this shared mutex was created with.
238
     */
239
    [[nodiscard]] auto unlock() -> coro::task<void>
11✔
240
    {
241
        auto lk = co_await m_mutex.scoped_lock();
242
        auto* head_waiter = m_head_waiter.load(std::memory_order::acquire);
243
        if (head_waiter != nullptr)
244
        {
245
            wake_waiters(lk, head_waiter);
246
        }
247
        else
248
        {
249
            m_state = state::unlocked;
250
        }
251

252
        co_return;
253
    }
22✔
254

255
    /**
256
     * @brief Gets the executor that drives the shared mutex.
257
     *
258
     * @return executor_type>&
259
     * */
NEW
260
    [[nodiscard]] auto executor() -> executor_type&
×
261
    {
NEW
262
        return *m_executor;
×
263
    }
264

265
private:
266
    friend struct detail::shared_lock_operation<executor_type>;
267

268
    enum class state
269
    {
270
        /// @brief The shared mutex is unlocked.
271
        unlocked,
272
        /// @brief The shared mutex is locked in shared mode.
273
        locked_shared,
274
        /// @brief The shared mutex is locked in exclusive mode.
275
        locked_exclusive
276
    };
277

278
    /// @brief This executor is for resuming multiple shared waiters.
279
    executor_type* m_executor{nullptr};
280
    /// @brief Exclusive access for mutating the shared mutex's state.
281
    coro::mutex m_mutex;
282
    /// @brief The current state of the shared mutex.
283
    std::atomic<state> m_state{state::unlocked};
284

285
    /// @brief The current number of shared users that have acquired the lock.
286
    std::atomic<uint64_t> m_shared_users{0};
287
    /// @brief The current number of exclusive waiters waiting to acquire the lock.  This is used to block
288
    ///        new incoming shared lock attempts so the exclusive waiter is not starved.
289
    std::atomic<uint64_t> m_exclusive_waiters{0};
290

291
    std::atomic<detail::shared_lock_operation<executor_type>*> m_head_waiter{nullptr};
292
    std::atomic<detail::shared_lock_operation<executor_type>*> m_tail_waiter{nullptr};
293

294
    auto try_lock_shared_locked() -> bool
103✔
295
    {
296
        if (m_state == state::unlocked)
103✔
297
        {
298
            // If the shared mutex is unlocked put it into shared mode and add ourself as using the lock.
299
            m_state = state::locked_shared;
52✔
300
            ++m_shared_users;
52✔
301
            return true;
52✔
302
        }
303
        else if (m_state == state::locked_shared && m_exclusive_waiters == 0)
51✔
304
        {
305
            // If the shared mutex is in a shared locked state and there are no exclusive waiters
306
            // the add ourself as using the lock.
307
            ++m_shared_users;
2✔
308
            return true;
2✔
309
        }
310

311
        // If the lock is in shared mode but there are exclusive waiters then we will also wait so
312
        // the writers are not starved.
313

314
        // If the lock is in exclusive mode already then we need to wait.
315

316
        return false;
49✔
317
    }
318

319
    auto try_lock_locked() -> bool
15✔
320
    {
321
        if (m_state == state::unlocked)
15✔
322
        {
323
            m_state = state::locked_exclusive;
11✔
324
            return true;
11✔
325
        }
326
        return false;
4✔
327
    }
328

329
    auto wake_waiters(coro::scoped_lock& lk, detail::shared_lock_operation<executor_type>* head_waiter) -> void
1✔
330
    {
331
        // First determine what the next lock state will be based on the first waiter.
332
        if (head_waiter->m_exclusive)
1✔
333
        {
334
            // If its exclusive then only this waiter can be woken up.
335
            m_state.store(state::locked_exclusive, std::memory_order::release);
×
336
            if (head_waiter->m_next == nullptr)
×
337
            {
338
                // This is the final waiter, set the list to null.
339
                m_head_waiter.store(nullptr, std::memory_order::release);
×
340
                m_tail_waiter.store(nullptr, std::memory_order::release);
×
341
            }
342
            else
343
            {
344
                // Advance the head waiter to next.
345
                m_head_waiter.store(head_waiter->m_next, std::memory_order::release);
×
346
            }
347

348
            m_exclusive_waiters.fetch_sub(1, std::memory_order::release);
×
349

350
            // Since this is an exclusive lock waiting we can resume it directly.
351
            lk.unlock();
×
352
            head_waiter->m_awaiting_coroutine.resume();
×
353
        }
354
        else
355
        {
356
            // If its shared then we will scan forward and awake all shared waiters onto the given
357
            // thread pool so they can run in parallel.
358
            m_state.store(state::locked_shared, std::memory_order::release);
1✔
359
            while (true)
47✔
360
            {
361
                auto* to_resume = m_head_waiter.load(std::memory_order::acquire);
48✔
362
                if (to_resume == nullptr || to_resume->m_exclusive)
48✔
363
                {
364
                    break;
365
                }
366

367
                if (to_resume->m_next == nullptr)
47✔
368
                {
369
                    m_head_waiter.store(nullptr, std::memory_order::release);
1✔
370
                    m_tail_waiter.store(nullptr, std::memory_order::release);
1✔
371
                }
372
                else
373
                {
374
                    m_head_waiter.store(to_resume->m_next, std::memory_order::release);
46✔
375
                }
376

377
                m_shared_users.fetch_add(1, std::memory_order::release);
47✔
378

379
                m_executor->resume(to_resume->m_awaiting_coroutine);
47✔
380
            }
381

382
            // Cannot unlock until the entire set of shared waiters has been traversed. I think this
383
            // makes more sense than allocating space for all the shared waiters, unlocking, and then
384
            // resuming in a batch?
385
            lk.unlock();
1✔
386
        }
387
    }
1✔
388
};
389

390
} // namespace coro
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc