• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 20870171951

10 Jan 2026 01:03AM UTC coverage: 83.304% (-0.1%) from 83.399%
20870171951

Pull #4221

github

web-flow
Merge fe4f556d8 into f39e9819d
Pull Request #4221: Use new C#14 extension block syntax

12030 of 14441 relevant lines covered (83.3%)

2957.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.23
src/IceRpc.Slice/AsyncEnumerableExtensions.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using System.Diagnostics;
4
using System.IO.Pipelines;
5
using ZeroC.Slice;
6

7
namespace IceRpc.Slice;
8

9
/// <summary>Provides an extension method for <see cref="IAsyncEnumerable{T}" /> to encode elements into a
10
/// <see cref="PipeReader"/>.</summary>
11
public static class AsyncEnumerableExtensions
12
{
13
    /// <summary>Extension methods for <see cref="IAsyncEnumerable{T}" />.</summary>
14
    /// <param name="asyncEnumerable">The async enumerable to encode into a stream of bytes.</param>
15
    extension<T>(IAsyncEnumerable<T> asyncEnumerable)
16
    {
17
        /// <summary>Encodes an async enumerable into a stream of bytes represented by a
18
        /// <see cref="PipeReader"/>.</summary>
19
        /// <param name="encodeAction">The action used to encode one element.</param>
20
        /// <param name="useSegments"><see langword="true" /> if an element can be encoded on a variable number
21
        /// of bytes; otherwise, <see langword="false" />.</param>
22
        /// <param name="encoding">The Slice encoding to use.</param>
23
        /// <param name="encodeOptions">The Slice encode options.</param>
24
        /// <returns>A pipe reader that represents the encoded stream of bytes.</returns>
25
        /// <remarks>This extension method is used to encode streaming parameters and streaming return values with the
26
        /// Slice2 encoding.</remarks>
27
        public PipeReader ToPipeReader(
28
            EncodeAction<T> encodeAction,
29
            bool useSegments,
30
            SliceEncoding encoding = SliceEncoding.Slice2,
31
            SliceEncodeOptions? encodeOptions = null) =>
32
            new AsyncEnumerablePipeReader<T>(
25✔
33
                asyncEnumerable,
25✔
34
                encodeAction,
25✔
35
                useSegments,
25✔
36
                encoding,
25✔
37
                encodeOptions);
25✔
38
    }
39

40
    // Overriding ReadAtLeastAsyncCore or CopyToAsync methods for this reader is not critical since this reader is
41
    // mostly used by the IceRPC core to copy the encoded data for the enumerable to the network stream. This copy
42
    // doesn't use these methods.
43
#pragma warning disable CA1001 // Types that own disposable fields should be disposable.
44
    private class AsyncEnumerablePipeReader<T> : PipeReader
45
#pragma warning restore CA1001
46
    {
47
        // Disposed in Complete.
48
        private readonly IAsyncEnumerator<T> _asyncEnumerator;
49

50
        // We don't dispose _cts because it's not necessary
51
        // (see https://github.com/dotnet/runtime/issues/29970#issuecomment-717840778) and we can't easily dispose it
52
        // when no one is using it since CancelPendingRead can be called by another thread after Complete is called.
53
        private readonly CancellationTokenSource _cts = new();
25✔
54
        private readonly EncodeAction<T> _encodeAction;
55
        private readonly SliceEncoding _encoding;
56
        private bool _isCompleted;
57
        private readonly bool _useSegments;
58
        private readonly int _streamFlushThreshold;
59
        private Task<bool>? _moveNext;
60
        private readonly Pipe _pipe;
61

62
        public override void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
112✔
63

64
        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
65
            _pipe.Reader.AdvanceTo(consumed, examined);
×
66

67
        public override void CancelPendingRead()
68
        {
1✔
69
            _pipe.Reader.CancelPendingRead();
1✔
70
            _cts.Cancel();
1✔
71
        }
1✔
72

73
        public override void Complete(Exception? exception = null)
74
        {
26✔
75
            if (!_isCompleted)
26✔
76
            {
19✔
77
                _isCompleted = true;
19✔
78

79
                // Cancel MoveNextAsync if it's still running.
80
                _cts.Cancel();
19✔
81

82
                _pipe.Reader.Complete();
19✔
83
                _pipe.Writer.Complete();
19✔
84

85
                _ = DisposeEnumeratorAsync();
19✔
86
            }
19✔
87

88
            async Task DisposeEnumeratorAsync()
89
            {
19✔
90
                // Make sure MoveNextAsync is completed before disposing the enumerator. Calling DisposeAsync on the
91
                // enumerator while MoveNextAsync is still running is disallowed.
92
                if (_moveNext is not null)
19✔
93
                {
2✔
94
                    try
95
                    {
2✔
96
                        _ = await _moveNext.ConfigureAwait(false);
2✔
97
                    }
×
98
                    catch
2✔
99
                    {
2✔
100
                    }
2✔
101
                }
2✔
102
                await _asyncEnumerator.DisposeAsync().ConfigureAwait(false);
19✔
103
            }
19✔
104
        }
26✔
105

106
        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
107
        {
115✔
108
            if (!_pipe.Reader.TryRead(out ReadResult readResult))
115✔
109
            {
115✔
110
                // If no more buffered data to read, fill the pipe with new data.
111

112
                // If ReadAsync is canceled, cancel the enumerator iteration to ensure MoveNextAsync below completes.
113
                using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(
115✔
114
                    cts => ((CancellationTokenSource)cts!).Cancel(),
1✔
115
                    _cts);
115✔
116

117
                bool hasNext;
118
                try
119
                {
115✔
120
                    if (_moveNext is null)
115✔
121
                    {
24✔
122
                        hasNext = await _asyncEnumerator.MoveNextAsync().ConfigureAwait(false);
24✔
123
                    }
22✔
124
                    else
125
                    {
91✔
126
                        hasNext = await _moveNext.ConfigureAwait(false);
91✔
127
                        _moveNext = null;
90✔
128
                    }
90✔
129

130
                    if (hasNext && EncodeElements() is Task<bool> moveNext)
112✔
131
                    {
92✔
132
                        // Flush does not block because the pipe is configured to not pause flush.
133
                        ValueTask<FlushResult> valueTask = _pipe.Writer.FlushAsync(CancellationToken.None);
92✔
134
                        Debug.Assert(valueTask.IsCompletedSuccessfully);
92✔
135

136
                        _moveNext = moveNext;
92✔
137
                        // And the next ReadAsync will await _moveNext.
138
                    }
92✔
139
                    else
140
                    {
20✔
141
                        // No need to flush the writer, complete takes care of it.
142
                        _pipe.Writer.Complete();
20✔
143
                    }
20✔
144

145
                    // There are bytes in the reader or it's completed since we've just flushed or completed the writer.
146
                    bool ok = _pipe.Reader.TryRead(out readResult);
112✔
147
                    Debug.Assert(ok);
112✔
148
                }
112✔
149
                catch (OperationCanceledException exception)
2✔
150
                {
2✔
151
                    Debug.Assert(exception.CancellationToken == _cts.Token);
2✔
152
                    cancellationToken.ThrowIfCancellationRequested();
2✔
153

154
                    if (_pipe.Reader.TryRead(out readResult) && readResult.IsCanceled)
1✔
155
                    {
1✔
156
                        // Ok: return canceled readResult once after calling CancelPendingRead.
157
                        // Note that we can't return a canceled read result with a bogus buffer since the caller must
158
                        // be able to call reader.AdvanceTo with this buffer.
159
                    }
1✔
160
                    else
161
                    {
×
162
                        throw new NotSupportedException(
×
163
                            "Cannot resume reading an AsyncEnumerablePipeReader after canceling a ReadAsync " +
×
164
                            "or calling CancelPendingRead.");
×
165
                    }
166
                }
1✔
167
            }
113✔
168

169
            return readResult;
113✔
170

171
            Task<bool>? EncodeElements()
172
            {
110✔
173
                var encoder = new SliceEncoder(_pipe.Writer, _encoding);
110✔
174

175
                Span<byte> sizePlaceholder = default;
110✔
176
                if (_useSegments)
110✔
177
                {
72✔
178
                    sizePlaceholder = encoder.GetPlaceholderSpan(4);
72✔
179
                }
72✔
180

181
                Task<bool>? result = null;
110✔
182
                bool keepEncoding;
183

184
                do
185
                {
131,337✔
186
                    _encodeAction(ref encoder, _asyncEnumerator.Current);
131,337✔
187
                    ValueTask<bool> moveNext = _asyncEnumerator.MoveNextAsync();
131,337✔
188

189
                    if (moveNext.IsCompletedSuccessfully)
131,337✔
190
                    {
131,307✔
191
                        bool hasNext = moveNext.Result;
131,307✔
192

193
                        // If we reached the stream flush threshold, it's time to flush.
194
                        if (encoder.EncodedByteCount - sizePlaceholder.Length >= _streamFlushThreshold)
131,307✔
195
                        {
63✔
196
                            result = hasNext ? moveNext.AsTask() : null;
63✔
197
                            keepEncoding = false;
63✔
198
                        }
63✔
199
                        else
200
                        {
131,244✔
201
                            keepEncoding = hasNext;
131,244✔
202
                        }
131,244✔
203
                    }
131,307✔
204
                    else
205
                    {
30✔
206
                        // If we can't get the next element synchronously, we return the move next task and end the loop
207
                        // to flush the encoded elements.
208
                        result = moveNext.AsTask();
30✔
209
                        keepEncoding = false;
30✔
210
                    }
30✔
211
                }
131,337✔
212
                while (keepEncoding);
131,337✔
213

214
                if (_useSegments)
110✔
215
                {
72✔
216
                    SliceEncoder.EncodeVarUInt62(
72✔
217
                        (ulong)(encoder.EncodedByteCount - sizePlaceholder.Length),
72✔
218
                        sizePlaceholder);
72✔
219
                }
72✔
220
                return result;
110✔
221
            }
110✔
222
        }
113✔
223

224
        public override bool TryRead(out ReadResult result) => _pipe.Reader.TryRead(out result);
×
225

226
        internal AsyncEnumerablePipeReader(
25✔
227
            IAsyncEnumerable<T> asyncEnumerable,
25✔
228
            EncodeAction<T> encodeAction,
25✔
229
            bool useSegments,
25✔
230
            SliceEncoding encoding,
25✔
231
            SliceEncodeOptions? encodeOptions)
25✔
232
        {
25✔
233
            if (encoding == SliceEncoding.Slice1)
25✔
234
            {
×
235
                throw new NotSupportedException("Streaming is not supported by the Slice1 encoding.");
×
236
            }
237

238
            encodeOptions ??= SliceEncodeOptions.Default;
25✔
239
            _pipe = new Pipe(encodeOptions.PipeOptions);
25✔
240
            _streamFlushThreshold = encodeOptions.StreamFlushThreshold;
25✔
241
            _encodeAction = encodeAction;
25✔
242
            _encoding = encoding;
25✔
243
            _useSegments = useSegments;
25✔
244
            _asyncEnumerator = asyncEnumerable.GetAsyncEnumerator(_cts.Token);
25✔
245
        }
25✔
246
    }
247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc