• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

bitfaster / BitFaster.Caching / 15950782773

29 Jun 2025 02:50AM UTC coverage: 99.047% (+0.03%) from 99.02%
15950782773

Pull #693

github

web-flow
Merge d5a639860 into 019070af9
Pull Request #693: build in parallel

1100 of 1120 branches covered (98.21%)

Branch coverage included in aggregate %.

4821 of 4858 relevant lines covered (99.24%)

25081852.36 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.9
/BitFaster.Caching/Buffers/MpscBoundedBuffer.cs
1
using System;
2
using System.Diagnostics;
3
using System.Diagnostics.CodeAnalysis;
4
using System.Runtime.CompilerServices;
5
using System.Threading;
6

7
namespace BitFaster.Caching.Buffers
8
{
9
    /// <summary>
10
    /// Provides a multi-producer, single-consumer thread-safe ring buffer. When the buffer is full,
11
    /// TryAdd fails and returns false. When the buffer is empty, TryTake fails and returns false.
12
    /// </summary>
13
    /// Based on the BoundedBuffer class in the Caffeine library by ben.manes@gmail.com (Ben Manes).
14
    [DebuggerDisplay("Count = {Count}/{Capacity}")]
15
    public sealed class MpscBoundedBuffer<T> where T : class
16
    {
17
        private readonly T?[] buffer;
18
        private readonly int mask;
19
        private PaddedHeadAndTail headAndTail; // mutable struct, don't mark readonly
20

21
        /// <summary>
22
        /// Initializes a new instance of the MpscBoundedBuffer class with the specified bounded capacity.
23
        /// </summary>
24
        /// <param name="boundedLength">The bounded length.</param>
25
        /// <exception cref="ArgumentOutOfRangeException"></exception>
26
        public MpscBoundedBuffer(int boundedLength)
2,594✔
27
        {
2,594✔
28
            if (boundedLength < 0)
2,594✔
29
                Throw.ArgOutOfRange(nameof(boundedLength));
2✔
30

31
            // must be power of 2 to use & slotsMask instead of %
32
            boundedLength = BitOps.CeilingPowerOfTwo(boundedLength);
2,592✔
33

34
            buffer = new T[boundedLength];
2,592✔
35
            mask = boundedLength - 1;
2,592✔
36
        }
2,592✔
37

38
        /// <summary>
39
        /// The bounded capacity.
40
        /// </summary>
41
        public int Capacity => buffer.Length;
594✔
42

43
        /// <summary>
44
        /// Gets the number of items contained in the buffer.
45
        /// </summary>
46
        public int Count
47
        {
48
            get
49
            {
32,102✔
50
                var spinner = new SpinWait();
32,102✔
51
                while (true)
32,106✔
52
                {
32,106✔
53
                    var headNow = Volatile.Read(ref headAndTail.Head);
32,106✔
54
                    var tailNow = Volatile.Read(ref headAndTail.Tail);
32,106✔
55

56
                    if (headNow == Volatile.Read(ref headAndTail.Head) &&
32,106!
57
                        tailNow == Volatile.Read(ref headAndTail.Tail))
32,106✔
58
                    {
32,102✔
59
                        return GetCount(headNow, tailNow);
32,102✔
60
                    }
61

62
                    spinner.SpinOnce();
4✔
63
                }
4✔
64
            }
32,102✔
65
        }
66

67
        private int GetCount(int head, int tail)
68
        {
32,102✔
69
            if (head != tail)
32,102✔
70
            {
31,390✔
71
                head &= mask;
31,390✔
72
                tail &= mask;
31,390✔
73

74
                return head < tail ? tail - head : buffer.Length - head + tail;
31,390✔
75
            }
76
            return 0;
712✔
77
        }
32,102✔
78

79
        /// <summary>
80
        /// Tries to add the specified item.
81
        /// </summary>
82
        /// <param name="item">The item to be added.</param>
83
        /// <returns>A BufferStatus value indicating whether the operation succeeded.</returns>
84
        /// <remarks>
85
        /// Thread safe.
86
        /// </remarks>
87
        public BufferStatus TryAdd(T item)
88
        {
560,732,660✔
89
            int head = Volatile.Read(ref headAndTail.Head);
560,732,660✔
90
            int tail = headAndTail.Tail;
560,732,660✔
91
            int size = tail - head;
560,732,660✔
92

93
            if (size >= buffer.Length)
560,732,660✔
94
            {
450,507,872✔
95
                return BufferStatus.Full;
450,507,872✔
96
            }
97

98
            if (Interlocked.CompareExchange(ref this.headAndTail.Tail, tail + 1, tail) == tail)
110,224,788✔
99
            {
109,198,274✔
100
                int index = tail & mask;
109,198,274✔
101
                Volatile.Write(ref buffer[index], item);
109,198,274✔
102

103
                return BufferStatus.Success;
109,198,274✔
104
            }
105

106
            return BufferStatus.Contended;
1,026,514✔
107
        }
560,732,660✔
108

109

110
        /// <summary>
111
        /// Tries to remove an item.
112
        /// </summary>
113
        /// <param name="item">The item to be removed.</param>
114
        /// <returns>A BufferStatus value indicating whether the operation succeeded.</returns>
115
        /// <remarks>
116
        /// Thread safe for single try take/drain + multiple try add.
117
        /// </remarks>
118
        public BufferStatus TryTake([MaybeNull] out T item)
119
        {
18,932✔
120
            int head = Volatile.Read(ref headAndTail.Head);
18,932✔
121
            int tail = headAndTail.Tail;
18,932✔
122
            int size = tail - head;
18,932✔
123

124
            if (size == 0)
18,932✔
125
            {
910✔
126
                item = default;
910✔
127
                return BufferStatus.Empty;
910✔
128
            }
129

130
            int index = head & mask;
18,022✔
131

132
            item = Volatile.Read(ref buffer[index]);
18,022✔
133

134
            if (item == null)
18,022!
135
            {
×
136
                // not published yet
137
                return BufferStatus.Contended;
×
138
            }
139

140
            buffer[index] = null;
18,022✔
141
            Volatile.Write(ref this.headAndTail.Head, ++head);
18,022✔
142
            return BufferStatus.Success;
18,022✔
143
        }
18,932✔
144

145
        // On NETSTANDARD2_0 all code paths are internally based on ArraySegment<T>.
146
        // After NETSTANDARD2_0, all code paths are internally based on Span<T>.
147
#if NETSTANDARD2_0
148
        /// <summary>
149
        /// Drains the buffer into the specified array segment.
150
        /// </summary>
151
        /// <param name="output">The output buffer</param>
152
        /// <returns>The number of items written to the output buffer.</returns>
153
        /// <remarks>
154
        /// Thread safe for single try take/drain + multiple try add.
155
        /// </remarks>
156
        public int DrainTo(ArraySegment<T> output)
157
#else
158
        /// <summary>
159
        /// Drains the buffer into the specified array segment.
160
        /// </summary>
161
        /// <param name="output">The output buffer</param>
162
        /// <returns>The number of items written to the output buffer.</returns>
163
        /// <remarks>
164
        /// Thread safe for single try take/drain + multiple try add.
165
        /// </remarks>
166
        public int DrainTo(ArraySegment<T> output)
167
        { 
7,766✔
168
            return DrainTo(output.AsSpan());
7,766✔
169
        }
7,766✔
170

171
        /// <summary>
172
        /// Drains the buffer into the specified span.
173
        /// </summary>
174
        /// <param name="output">The output buffer</param>
175
        /// <returns>The number of items written to the output buffer.</returns>
176
        /// <remarks>
177
        /// Thread safe for single try take/drain + multiple try add.
178
        /// </remarks>
179
        public int DrainTo(Span<T> output)
180
#endif
181
        {
21,512,028✔
182
            return DrainToImpl(output);
21,512,028✔
183
        }
21,512,028✔
184

185
        // use an outer wrapper method to force the JIT to inline the inner adaptor methods
186
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
187
#if NETSTANDARD2_0
188
        private int DrainToImpl(ArraySegment<T> output)
189
#else
190
        private int DrainToImpl(Span<T> output)
191
#endif
192
        {
21,512,028✔
193
            int head = Volatile.Read(ref headAndTail.Head);
21,512,028✔
194
            int tail = headAndTail.Tail;
21,512,028✔
195
            int size = tail - head;
21,512,028✔
196

197
            if (size == 0)
21,512,028✔
198
            {
16,998,314✔
199
                return 0;
16,998,314✔
200
            }
201

202
            var localBuffer = buffer.AsSpanOrArray();
4,513,714✔
203

204
            int outCount = 0;
4,513,714✔
205

206
            do
207
            {
109,171,230✔
208
                int index = head & mask;
109,171,230✔
209

210
                T? item = Volatile.Read(ref localBuffer[index]);
109,171,230✔
211

212
                if (item == null)
109,171,230✔
213
                {
15,591✔
214
                    // not published yet
215
                    break;
15,591✔
216
                }
217

218
                localBuffer[index] = null;
109,155,639✔
219
                Write(output, outCount++, item);
109,155,639✔
220
                head++;
109,155,639✔
221
            }
109,155,639✔
222
            while (head != tail && outCount < Length(output));
109,155,639✔
223

224
            this.headAndTail.Head = head;
4,513,714✔
225

226
            return outCount;
4,513,714✔
227
        }
21,512,028✔
228

229
#if NETSTANDARD2_0
230
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
231
        private static void Write(ArraySegment<T> output, int index, T item)
232
        {
233
            output.Array[output.Offset + index] = item;
234
        }
235

236
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
237
        private static int Length(ArraySegment<T> output)
238
        {
239
            return output.Count;
240
        }
241
#else
242
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
243
        private static void Write(Span<T> output, int index, T item)
244
        {
109,155,639✔
245
            output[index] = item;
109,155,639✔
246
        }
109,155,639✔
247

248
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
249
        private static int Length(Span<T> output)
250
        {
104,657,518✔
251
            return output.Length;
104,657,518✔
252
        }
104,657,518✔
253
#endif
254

255
        /// <summary>
256
        /// Removes all values from the buffer.
257
        /// </summary>
258
        /// <remarks>
259
        /// Clear must be called from the single consumer thread.
260
        /// </remarks>
261
        public void Clear()
262
        {
820✔
263
            while (TryTake(out _) != BufferStatus.Empty)
16,790✔
264
            {
15,970✔
265
            }
15,970✔
266
        }
820✔
267
    }
268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc