• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 20870171951

10 Jan 2026 01:03AM UTC coverage: 83.304% (-0.1%) from 83.399%
20870171951

Pull #4221

github

web-flow
Merge fe4f556d8 into f39e9819d
Pull Request #4221: Use new C#14 extension block syntax

12030 of 14441 relevant lines covered (83.3%)

2957.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.36
src/IceRpc.Slice/PipeReaderExtensions.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Internal;
4
using System.Buffers;
5
using System.Diagnostics;
6
using System.IO.Pipelines;
7
using System.Runtime.CompilerServices;
8
using ZeroC.Slice;
9

10
namespace IceRpc.Slice;
11

12
/// <summary>Provides extension methods for <see cref="PipeReader" /> to decode streamed elements.</summary>
13
public static class PipeReaderExtensions
14
{
15
    /// <summary>Extension methods for <see cref="PipeReader" />.</summary>
16
    /// <param name="reader">The pipe reader.</param>
17
    extension(PipeReader reader)
18
    {
19
        /// <summary>Creates an async enumerable over a pipe reader to decode streamed elements.</summary>
20
        /// <typeparam name="T">The type of the element being decoded.</typeparam>
21
        /// <param name="encoding">The Slice encoding version.</param>
22
        /// <param name="decodeFunc">The function used to decode the streamed member.</param>
23
        /// <param name="elementSize">The size in bytes of one element.</param>
24
        /// <param name="sliceFeature">The Slice feature to customize the decoding.</param>
25
        /// <returns>The async enumerable to decode and return the streamed elements.</returns>
26
        /// <exception cref="ArgumentException">Thrown if <paramref name="elementSize" /> is equal of inferior to
27
        /// <c>0</c>.</exception>
28
        /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no
29
        /// longer use the reader after this call.</remarks>
30
        public IAsyncEnumerable<T> ToAsyncEnumerable<T>(
31
            SliceEncoding encoding,
32
            DecodeFunc<T> decodeFunc,
33
            int elementSize,
34
            ISliceFeature? sliceFeature = null)
35
        {
13✔
36
            if (elementSize <= 0)
13✔
37
            {
×
38
                reader.Complete();
×
39
                throw new ArgumentException("The element size must be greater than 0.", nameof(elementSize));
×
40
            }
41

42
            sliceFeature ??= SliceFeature.Default;
13✔
43
            return reader.ToAsyncEnumerableCore(ReadAsync, DecodeBuffer);
13✔
44

45
            IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
46
            {
18✔
47
                // Since the elements are fixed-size, they can't contain service addresses hence baseProxy
48
                // can remain null.
49
                var decoder = new SliceDecoder(
18✔
50
                    buffer,
18✔
51
                    encoding,
18✔
52
                    maxCollectionAllocation: sliceFeature.MaxCollectionAllocation,
18✔
53
                    maxDepth: sliceFeature.MaxDepth);
18✔
54

55
                var items = new T[buffer.Length / elementSize];
18✔
56
                for (int i = 0; i < items.Length; ++i)
131,418✔
57
                {
65,692✔
58
                    items[i] = decodeFunc(ref decoder);
65,692✔
59
                }
65,691✔
60
                decoder.CheckEndOfBuffer();
17✔
61
                return items;
17✔
62
            }
17✔
63

64
            async ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken)
65
            {
21✔
66
                // Read the bytes for at least one element.
67
                // Note that the max number of bytes we can read in one shot is limited by the flow control of the
68
                // underlying transport.
69
                ReadResult readResult =
21✔
70
                    await reader.ReadAtLeastAsync(elementSize, cancellationToken).ConfigureAwait(false);
21✔
71

72
                // Check if the buffer contains extra bytes that we need to remove.
73
                ReadOnlySequence<byte> buffer = readResult.Buffer;
20✔
74
                if (elementSize > 1 && buffer.Length > elementSize)
20✔
75
                {
17✔
76
                    long extra = buffer.Length % elementSize;
17✔
77
                    if (extra > 0)
17✔
78
                    {
×
79
                        buffer = buffer.Slice(0, buffer.Length - extra);
×
80
                        return new ReadResult(buffer, isCanceled: readResult.IsCanceled, isCompleted: false);
×
81
                    }
82
                }
17✔
83

84
                // Return the read result as-is.
85
                return readResult;
20✔
86
            }
20✔
87
        }
88

89
        /// <summary>Creates an async enumerable over a pipe reader to decode variable size streamed elements.</summary>
90
        /// <typeparam name="T">The stream element type.</typeparam>
91
        /// <param name="encoding">The Slice encoding version.</param>
92
        /// <param name="decodeFunc">The function used to decode the streamed member.</param>
93
        /// <param name="sender">The proxy that sent the request, if applicable.</param>
94
        /// <param name="sliceFeature">The slice feature to customize the decoding.</param>
95
        /// <returns>The async enumerable to decode and return the streamed members.</returns>
96
        /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no
97
        /// longer use the reader after this call.</remarks>
98
        public IAsyncEnumerable<T> ToAsyncEnumerable<T>(
99
            SliceEncoding encoding,
100
            DecodeFunc<T> decodeFunc,
101
            IProxy? sender = null,
102
            ISliceFeature? sliceFeature = null)
103
        {
13✔
104
            sliceFeature ??= SliceFeature.Default;
13✔
105
            IProxy? baseProxy = sliceFeature.BaseProxy ?? sender;
13✔
106
            return reader.ToAsyncEnumerableCore(ReadAsync, DecodeBuffer);
13✔
107

108
            IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
109
            {
13✔
110
                // No activator or max depth since streams are Slice2+.
111
                var decoder = new SliceDecoder(buffer, encoding, baseProxy, sliceFeature.MaxCollectionAllocation);
13✔
112

113
                var items = new List<T>();
13✔
114
                do
115
                {
65,679✔
116
                    items.Add(decodeFunc(ref decoder));
65,679✔
117
                }
65,678✔
118
                while (decoder.Consumed < buffer.Length);
65,678✔
119

120
                return items;
12✔
121
            }
12✔
122

123
            ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken) =>
124
                reader.ReadSegmentAsync(encoding, sliceFeature.MaxSegmentSize, cancellationToken);
14✔
125
        }
126

127
        /// <summary>Decodes an async enumerable from a pipe reader.</summary>
128
        /// <param name="readFunc">The function used to read enough data to decode elements returned by the
129
        /// enumerable.</param>
130
        /// <param name="decodeBufferFunc">The function used to decode an element.</param>
131
        /// <param name="cancellationToken">The cancellation token which is provided to <see
132
        /// cref="IAsyncEnumerable{T}.GetAsyncEnumerator(CancellationToken)" />.</param>
133
        private async IAsyncEnumerable<T> ToAsyncEnumerableCore<T>(
134
            Func<PipeReader, CancellationToken, ValueTask<ReadResult>> readFunc,
135
            Func<ReadOnlySequence<byte>, IEnumerable<T>> decodeBufferFunc,
136
            [EnumeratorCancellation] CancellationToken cancellationToken = default)
137
        {
25✔
138
            try
139
            {
25✔
140
                while (true)
35✔
141
                {
35✔
142
                    ReadResult readResult;
143

144
                    try
145
                    {
35✔
146
                        readResult = await readFunc(reader, cancellationToken).ConfigureAwait(false);
35✔
147

148
                        if (readResult.IsCanceled)
34✔
149
                        {
×
150
                            // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
151
                            throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
×
152
                        }
153
                        if (readResult.Buffer.IsEmpty)
34✔
154
                        {
3✔
155
                            Debug.Assert(readResult.IsCompleted);
3✔
156
                            yield break;
3✔
157
                        }
158
                    }
31✔
159
                    catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
1✔
160
                    {
1✔
161
                        // Canceling the cancellation token is a normal way to complete an iteration.
162
                        yield break;
1✔
163
                    }
164

165
                    IEnumerable<T> elements = decodeBufferFunc(readResult.Buffer);
31✔
166
                    reader.AdvanceTo(readResult.Buffer.End);
29✔
167

168
                    foreach (T item in elements)
262,817✔
169
                    {
131,366✔
170
                        if (cancellationToken.IsCancellationRequested)
131,366✔
171
                        {
1✔
172
                            yield break;
1✔
173
                        }
174
                        yield return item;
131,365✔
175
                    }
131,364✔
176

177
                    if (readResult.IsCompleted)
27✔
178
                    {
17✔
179
                        yield break;
17✔
180
                    }
181
                }
10✔
182
            }
183
            finally
184
            {
25✔
185
                reader.Complete();
25✔
186
            }
25✔
187
        }
23✔
188
    }
189
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc