• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

fastbean-au / fair-mutex / #8

09 Nov 2025 01:40AM UTC coverage: 97.667% (-0.5%) from 98.175%
#8

push

fastbean-au
Ensure lock state is updated before returning from unlock operations

45 of 47 new or added lines in 1 file covered. (95.74%)

4 existing lines in 1 file now uncovered.

293 of 300 relevant lines covered (97.67%)

12834.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.25
/rwmutex.go
1
package fairmutex
2

3
import (
4
        "context"
5
        "sync"
6
        "sync/atomic"
7
        "time"
8

9
        "go.opentelemetry.io/otel"
10
        "go.opentelemetry.io/otel/attribute"
11
        "go.opentelemetry.io/otel/metric"
12
)
13

14
type RWMutex struct {
15
        initialised        atomic.Bool
16
        config             *config
17
        histogram          metric.Float64Histogram
18
        exclusive          chan lockRequest
19
        shared             chan lockRequest
20
        releaseExclusive   chan chan struct{}
21
        releaseShared      chan chan struct{}
22
        stopProcessing     chan struct{}
23
        waitingOnExclusive atomic.Bool
24
        waitingOnShared    atomic.Bool
25
}
26

27
type lockRequest struct {
28
        c chan struct{}
29
        n int // The number of (shared) locks requested
30
}
31

32
// New - returns a pointer to a new Mutex with the processing process already
33
// running.
34
func New(options ...Option) *RWMutex {
33✔
35
        cfg := getConfig(options...)
33✔
36

33✔
37
        meter := otel.Meter("fair-mutex")
33✔
38

33✔
39
        histogram, err := meter.Float64Histogram(
33✔
40
                cfg.metricName,
33✔
41
                metric.WithDescription("Duration waiting for a mutex lock in seconds"),
33✔
42
                metric.WithUnit("s"),
33✔
43
                // Buckets: 1ns, 10ns, 100ns, 1µs, 10µs, 100µs, 1ms, 10ms, 100ms, 1s, 10s, 100s
33✔
44
                metric.WithExplicitBucketBoundaries(0.000000001, 0.00000001, 0.0000001, 0.000001, 0.00001, 0.0001, 0.001, 0.01, 0.1, 1, 10, 100),
33✔
45
        )
33✔
46

33✔
47
        if err != nil {
33✔
48
                panic(err)
×
49
        }
50

51
        m := &RWMutex{
33✔
52
                config:           cfg,
33✔
53
                histogram:        histogram,
33✔
54
                exclusive:        make(chan lockRequest, cfg.exclusiveMaxQueueSize),
33✔
55
                shared:           make(chan lockRequest, cfg.sharedMaxQueueSize),
33✔
56
                releaseExclusive: make(chan chan struct{}),
33✔
57
                releaseShared:    make(chan chan struct{}, cfg.sharedMaxBatchSize),
33✔
58
                stopProcessing:   make(chan struct{}),
33✔
59
        }
33✔
60

33✔
61
        go m.process()
33✔
62

33✔
63
        m.initialised.Store(true)
33✔
64

33✔
65
        return m
33✔
66
}
67

68
const (
69
        noLockTaken = iota
70
        sharedLockTaken
71
        exclusiveLockTaken
72
)
73

74
// Stop - causes the go func processing mutex requests to stop running and the
75
// cleanup method to be called after that. Calling this function is required so
76
// that resources are not leaked (e.g. zombie go processes).
77
func (m *RWMutex) Stop() {
34✔
78
        if m.initialised.Load() {
67✔
79
                m.stopProcessing <- struct{}{}
33✔
80
        }
33✔
81
}
82

83
// process - handles the batching and granting of the lock requests
84
func (m *RWMutex) process() {
33✔
85
        defer m.cleanup()
33✔
86

33✔
87
        var loopInitLock lockRequest
33✔
88
        var lockItems int
33✔
89
        lockTypeTaken := noLockTaken
33✔
90

33✔
91
        for {
28,910✔
92
                switch lockTypeTaken {
28,877✔
93

94
                // If the last batch processed was shared, give exclusive locks, if any are waiting
95
                case sharedLockTaken:
14,547✔
96
                        select {
14,547✔
97

UNCOV
98
                        case <-m.stopProcessing:
×
UNCOV
99
                                return
×
100

101
                        case loopInitLock = <-m.exclusive:
7,566✔
102
                                m.waitingOnExclusive.Store(true)
7,566✔
103
                                lockItems = min(len(m.exclusive), m.config.exclusiveMaxBatchSize)
7,566✔
104
                                lockTypeTaken = exclusiveLockTaken
7,566✔
105

106
                        default:
6,981✔
107
                                lockTypeTaken = noLockTaken
6,981✔
108

109
                        }
110

111
                // If the last batch processed was exclusive, give shared locks, if any are waiting
112
                case exclusiveLockTaken:
14,297✔
113
                        select {
14,297✔
114

UNCOV
115
                        case <-m.stopProcessing:
×
UNCOV
116
                                return
×
117

118
                        case loopInitLock = <-m.shared:
7,492✔
119
                                m.waitingOnShared.Store(true)
7,492✔
120
                                lockItems = min(len(m.shared), m.config.sharedMaxBatchSize)
7,492✔
121
                                lockTypeTaken = sharedLockTaken
7,492✔
122

123
                        default:
6,805✔
124
                                lockTypeTaken = noLockTaken
6,805✔
125

126
                        }
127

128
                }
129

130
                // If this is the initial loop or there were no locks waiting of the opposite type to the last batch, then
131
                // we'll randomly select which type of lock to grant (given one of each type is requested simultaneously).
132
                if lockTypeTaken == noLockTaken {
42,696✔
133
                        select {
13,819✔
134

135
                        case <-m.stopProcessing:
30✔
136
                                return
30✔
137

138
                        case loopInitLock = <-m.exclusive:
6,733✔
139
                                m.waitingOnExclusive.Store(true)
6,733✔
140
                                lockItems = min(len(m.exclusive), m.config.exclusiveMaxBatchSize)
6,733✔
141
                                lockTypeTaken = exclusiveLockTaken
6,733✔
142

143
                        case loopInitLock = <-m.shared:
7,056✔
144
                                m.waitingOnShared.Store(true)
7,056✔
145
                                lockItems = min(len(m.shared), m.config.sharedMaxBatchSize)
7,056✔
146
                                lockTypeTaken = sharedLockTaken
7,056✔
147

148
                        }
149
                }
150

151
                // Action the locks - grant and wait until they are released
152
                switch lockTypeTaken {
28,847✔
153

154
                case sharedLockTaken:
14,548✔
155
                        // Process shared locks - grant the locks
14,548✔
156
                        loopInitLock.c <- struct{}{} // Initial lock
14,548✔
157

14,548✔
158
                        lockCnt := loopInitLock.n
14,548✔
159

14,548✔
160
                        for range lockItems {
59,099✔
161
                                // Grab the lock request
44,551✔
162
                                l := <-m.shared
44,551✔
163

44,551✔
164
                                // Signal to the requester that they now have the lock
44,551✔
165
                                l.c <- struct{}{}
44,551✔
166

44,551✔
167
                                lockCnt += l.n
44,551✔
168
                        }
44,551✔
169

170
                        // Wait for the shared locks to be returned
171
                        for n := 1; n <= lockCnt; n++ {
73,692✔
172
                                select {
59,144✔
173
                                case <-m.stopProcessing:
1✔
174
                                        return
1✔
175
                                case u := <-m.releaseShared:
59,143✔
176
                                        if n == lockCnt {
73,690✔
177
                                                m.waitingOnShared.Store(false)
14,547✔
178
                                        }
14,547✔
179

180
                                        // Signal back that the lock has been released
181
                                        u<-struct{}{}
59,143✔
182
                                }
183
                        }
184

185

186
                case exclusiveLockTaken:
14,299✔
187
                        // Process exclusive locks - grant and wait for each lock to be returned
14,299✔
188

14,299✔
189
                        // Initial loop lock
14,299✔
190
                        // Signal to the requester that they now have the lock
14,299✔
191
                        loopInitLock.c <- struct{}{}
14,299✔
192

14,299✔
193
                        // Wait for the lock to be returned
14,299✔
194
                        select {
14,299✔
195
                        case <-m.stopProcessing:
2✔
196
                                return
2✔
197
                        case u := <-m.releaseExclusive:
14,297✔
198
                                        if lockItems == 0 {
22,713✔
199
                                                m.waitingOnExclusive.Store(false)
8,416✔
200
                                        }
8,416✔
201

202
                                        // Signal back that the lock has been released
203
                                        u<-struct{}{}
14,297✔
204

14,297✔
205
                                // Remaining exclusive lock requests
14,297✔
206
                                for n := 1; n <= lockItems; n++ {
25,037✔
207
                                        // Grab the lock request
10,740✔
208
                                        l := <-m.exclusive
10,740✔
209

10,740✔
210
                                        // Signal to the requester that they now have the lock
10,740✔
211
                                        l.c <- struct{}{}
10,740✔
212

10,740✔
213
                                        // Wait for the lock to be returned
10,740✔
214
                                        select {
10,740✔
NEW
215
                                        case <-m.stopProcessing:
×
NEW
216
                                                return
×
217
                                        case u := <-m.releaseExclusive:
10,740✔
218
                                                if n == lockItems {
16,621✔
219
                                                        m.waitingOnExclusive.Store(false)
5,881✔
220
                                                }
5,881✔
221

222
                                                // Signal back that the lock has been released
223
                                                u<-struct{}{}
10,740✔
224
                                        }
225
                                }
226

227
                        }
228
                }
229
        }
230
}
231

232
// cleanup - ensures that we don't have any resource leakage; however, this is
233
// only run after the process method has finished, and that is triggered be
234
// calling the Stop method.
235
func (m *RWMutex) cleanup() {
33✔
236
        m.initialised.Store(false)
33✔
237

33✔
238
        close(m.exclusive)
33✔
239
        close(m.shared)
33✔
240
        close(m.releaseExclusive)
33✔
241
        close(m.releaseShared)
33✔
242
}
33✔
243

244
// -----------------------------------------------------------------------------
245
// sync.RWMutex methods
246
// -----------------------------------------------------------------------------
247

248
// RLock - locks the mutex for reading.
249
//
250
// It should not be used for recursive read locking; a blocked Lock call
251
// excludes new readers from acquiring the lock. See the documentation on the
252
// RWMutex type (https://pkg.go.dev/sync#RWMutex).
253
func (m *RWMutex) RLock() {
58,795✔
254
        if !m.initialised.Load() {
58,803✔
255
                panic("attempt to use fair-mutex uninitialised")
8✔
256
        }
257

258
        start := time.Now()
58,726✔
259

58,726✔
260
        l := make(chan struct{})
58,726✔
261
        defer close(l)
58,726✔
262

58,726✔
263
        // Request the lock
58,726✔
264
        m.shared <- lockRequest{c: l, n: 1}
58,726✔
265

58,726✔
266
        // Wait for the lock to be granted
58,726✔
267
        <-l
58,726✔
268

58,726✔
269
        m.histogram.Record(context.Background(), time.Since(start).Seconds(), metric.WithAttributes(
58,726✔
270
                append(m.config.metricAttributes, attribute.String("operation", "RLock"))...,
58,726✔
271
        ))
58,726✔
272
}
273

274
// TryRLock - tries to lock rw for reading and reports whether it succeeded.
275
//
276
// Note that while correct uses of TryRLock do exist, they are rare, and use of
277
// TryRLock is often a sign of a deeper problem in a particular use of mutexes.
278
func (m *RWMutex) TryRLock() bool {
12✔
279
        if !m.initialised.Load() {
20✔
280
                panic("attempt to use fair-mutex uninitialised")
8✔
281
        }
282

283
        l := make(chan struct{})
4✔
284
        defer close(l)
4✔
285

4✔
286
        if m.waitingOnExclusive.Load() || m.waitingOnShared.Load() || len(m.exclusive) > 0 || len(m.shared) > 0 {
6✔
287
                return false
2✔
288
        }
2✔
289

290
        // Request the lock
291
        m.shared <- lockRequest{c: l, n: 1}
2✔
292

2✔
293
        // Wait for the lock to be granted
2✔
294
        <-l
2✔
295

2✔
296
        return true
2✔
297
}
298

299
// RUnlock - undoes a single fairmutex.RLock call; it does not affect other
300
// simultaneous readers. It is a run-time error if rw is not locked for reading
301
// on entry to RUnlock.
302
func (m *RWMutex) RUnlock() {
58,994✔
303
        if !m.initialised.Load() {
59,002✔
304
                panic("attempt to use fair-mutex uninitialised")
8✔
305
        }
306

307
        if !m.waitingOnShared.Load() {
58,990✔
308
                panic("fair-mutex: RUnlock of unlocked RWMutex")
1✔
309
        }
310

311
        u := make(chan struct{})
58,983✔
312

58,983✔
313
        m.releaseShared <- u
58,983✔
314

58,983✔
315
        // Wait for the lock to be released
58,983✔
316
        <-u
58,983✔
317

58,983✔
318
        close(u)
58,983✔
319
}
320

321
// Lock - locks the mutex for writing. If the mutex is already locked for
322
// reading or writing, Lock blocks until the lock is available.
323
func (m *RWMutex) Lock() {
24,806✔
324
        if !m.initialised.Load() {
24,814✔
325
                panic("attempt to use fair-mutex uninitialised")
8✔
326
        }
327

328
        start := time.Now()
24,795✔
329

24,795✔
330
        l := make(chan struct{})
24,795✔
331
        defer close(l)
24,795✔
332

24,795✔
333
        // Request the lock
24,795✔
334
        m.exclusive <- lockRequest{c: l, n: 1}
24,795✔
335

24,795✔
336
        // Wait for the lock to be granted
24,795✔
337
        <-l
24,795✔
338

24,795✔
339
        m.histogram.Record(context.Background(), time.Since(start).Seconds(), metric.WithAttributes(
24,795✔
340
                append(m.config.metricAttributes, attribute.String("operation", "Lock"))...,
24,795✔
341
        ))
24,795✔
342
}
343

344
// TryLock - tries to lock rw for writing and reports whether it succeeded.
345
//
346
// Note that while correct uses of TryLock do exist, they are rare, and use of
347
// TryLock is often a sign of a deeper problem in a particular use of mutexes.
348
func (m *RWMutex) TryLock() bool {
13✔
349
        if !m.initialised.Load() {
21✔
350
                panic("attempt to use fair-mutex uninitialised")
8✔
351
        }
352

353
        l := make(chan struct{})
5✔
354
        defer close(l)
5✔
355

5✔
356
        // See if there are any lock requests or locks active
5✔
357
        if m.waitingOnExclusive.Load() || m.waitingOnShared.Load() || len(m.exclusive) > 0 || len(m.shared) > 0 {
8✔
358
                return false
3✔
359
        }
3✔
360

361
        // Request the lock
362
        m.exclusive <- lockRequest{c: l, n: 1}
2✔
363

2✔
364
        // Wait on the lock to be granted
2✔
365
        <-l
2✔
366

2✔
367
        return true
2✔
368
}
369

370
// Unlock - unlocks rw for writing. It is a run-time error if rw is not locked
371
// for writing on entry to Unlock.
372
//
373
// As with Mutexes, a locked FairMutex is not associated with a particular
374
// goroutine. One goroutine may fairmutex.RLock (fairmutex.Lock) a FairMutex and
375
// then arrange for another goroutine to fairmutex.RUnlock (fairmutex.Unlock)
376
// it.
377
func (m *RWMutex) Unlock() {
25,046✔
378
        if !m.initialised.Load() {
25,054✔
379
                panic("attempt to use fair-mutex uninitialised")
8✔
380
        }
381

382
        if !m.waitingOnExclusive.Load() {
25,039✔
383
                panic("fair-mutex: Unlock of unlocked RWMutex")
1✔
384
        }
385

386
        u := make(chan struct{})
25,037✔
387

25,037✔
388
        m.releaseExclusive <- u
25,037✔
389

25,037✔
390
        // Wait for the lock to be released
25,037✔
391
        <-u
25,037✔
392

25,037✔
393
        close(u)
25,037✔
394
}
395

396
// RLocker - returns a [Locker] interface that implements the [Locker.Lock] and
397
// [Locker.Unlock] methods by calling m.RLock and m.RUnlock.
398
func (m *RWMutex) RLocker() sync.Locker {
1✔
399
        return (*rlocker)(m)
1✔
400
}
1✔
401

402
type rlocker RWMutex
403

404
func (r *rlocker) Lock()   { (*RWMutex)(r).RLock() }
5✔
405
func (r *rlocker) Unlock() { (*RWMutex)(r).RUnlock() }
5✔
406

407
// Extension methods
408

409
// RLockSet - locks the mutex for reading, granting the requested number of 
410
// locks. RUnlock must be called one for each of the requested number of locks.
411
//
412
// Use RLockSet when a set of read locks is required to be granted at the same
413
// time (that is, within the same batch). This might be done when read locks
414
// were granted in a loop before processing was done, and the locks unlocked.
415
//
416
// It should not be used for recursive read locking; a blocked Lock call
417
// excludes new readers from acquiring the lock. See the documentation on the
418
// RWMutex type (https://pkg.go.dev/sync#RWMutex).
419
func (m *RWMutex) RLockSet(number int) {
13✔
420
        if !m.initialised.Load() {
21✔
421
                panic("attempt to use fair-mutex uninitialised")
8✔
422
        }
423

424
        start := time.Now()
5✔
425

5✔
426
        l := make(chan struct{})
5✔
427
        defer close(l)
5✔
428

5✔
429
        // Request the lock
5✔
430
        m.shared <- lockRequest{c: l, n: number}
5✔
431

5✔
432
        // Wait for the lock to be granted
5✔
433
        <-l
5✔
434

5✔
435
        m.histogram.Record(context.Background(), time.Since(start).Seconds(), metric.WithAttributes(
5✔
436
                append(m.config.metricAttributes, attribute.String("operation", "RLockSet"))...,
5✔
437
        ))
5✔
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc