• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 20830897503

08 Jan 2026 08:33PM UTC coverage: 83.498% (-0.1%) from 83.609%
20830897503

Pull #4216

github

web-flow
Merge 3e2b760b7 into 07ee5cc03
Pull Request #4216: Fix docfx DI examples

12058 of 14441 relevant lines covered (83.5%)

2968.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.36
src/IceRpc/ClientConnection.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Features;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Collections.Immutable;
8
using System.Diagnostics;
9
using System.Net.Security;
10
using System.Security.Authentication;
11

12
namespace IceRpc;
13

14
/// <summary>Represents a client connection used to send requests to a server and receive the corresponding responses.
15
/// </summary>
16
/// <remarks>This client connection can also dispatch requests ("callbacks") received from the server. The client
17
/// connection's underlying connection is recreated and reconnected automatically when it's closed by any event other
18
/// than a call to <see cref="ShutdownAsync" /> or <see cref="DisposeAsync" />.</remarks>
19
public sealed class ClientConnection : IInvoker, IAsyncDisposable
20
{
21
    // The underlying protocol connection once successfully established.
22
    private (IProtocolConnection Connection, TransportConnectionInformation ConnectionInformation)? _activeConnection;
23

24
    private readonly IClientProtocolConnectionFactory _clientProtocolConnectionFactory;
25

26
    private readonly TimeSpan _connectTimeout;
27

28
    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
29
    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
30
    // connection is "detached" because it's not in _activeConnection.
31
    private int _detachedConnectionCount;
32

33
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
77✔
34

35
    // A cancellation token source that is canceled when DisposeAsync is called.
36
    private readonly CancellationTokenSource _disposedCts = new();
77✔
37
    private Task? _disposeTask;
38

39
    private readonly Lock _mutex = new();
77✔
40

41
    // A connection being established and its associated connect task. When non-null, _activeConnection is null.
42
    private (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask)? _pendingConnection;
43

44
    private Task? _shutdownTask;
45

46
    private readonly TimeSpan _shutdownTimeout;
47

48
    private readonly ServerAddress _serverAddress;
49

50
    /// <summary>Constructs a client connection.</summary>
51
    /// <param name="options">The client connection options.</param>
52
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
53
    /// cref="IDuplexClientTransport.Default" />.</param>
54
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
55
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
56
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
57
    /// </param>
58
    public ClientConnection(
77✔
59
        ClientConnectionOptions options,
77✔
60
        IDuplexClientTransport? duplexClientTransport = null,
77✔
61
        IMultiplexedClientTransport? multiplexedClientTransport = null,
77✔
62
        ILogger? logger = null)
77✔
63
    {
77✔
64
        _connectTimeout = options.ConnectTimeout;
77✔
65
        _shutdownTimeout = options.ShutdownTimeout;
77✔
66

67
        duplexClientTransport ??= IDuplexClientTransport.Default;
77✔
68
        multiplexedClientTransport ??= IMultiplexedClientTransport.Default;
77✔
69

70
        _serverAddress = options.ServerAddress ??
77✔
71
            throw new ArgumentException(
77✔
72
                $"{nameof(ClientConnectionOptions.ServerAddress)} is not set",
77✔
73
                nameof(options));
77✔
74

75
        if (_serverAddress.Transport is null)
77✔
76
        {
41✔
77
            _serverAddress = _serverAddress with
41✔
78
            {
41✔
79
                Transport = _serverAddress.Protocol == Protocol.Ice ?
41✔
80
                    duplexClientTransport.Name : multiplexedClientTransport.Name
41✔
81
            };
41✔
82
        }
41✔
83

84
        _clientProtocolConnectionFactory = new ClientProtocolConnectionFactory(
77✔
85
            options,
77✔
86
            options.ConnectTimeout,
77✔
87
            options.ClientAuthenticationOptions,
77✔
88
            duplexClientTransport,
77✔
89
            multiplexedClientTransport,
77✔
90
            logger);
77✔
91
    }
77✔
92

93
    /// <summary>Constructs a client connection with the specified server address and client authentication options.
94
    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
95
    /// <param name="serverAddress">The connection's server address.</param>
96
    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
97
    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
98
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
99
    /// cref="IDuplexClientTransport.Default" />.</param>
100
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
101
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
102
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
103
    /// </param>
104
    public ClientConnection(
105
        ServerAddress serverAddress,
106
        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
107
        IDuplexClientTransport? duplexClientTransport = null,
108
        IMultiplexedClientTransport? multiplexedClientTransport = null,
109
        ILogger? logger = null)
110
        : this(
36✔
111
            new ClientConnectionOptions
36✔
112
            {
36✔
113
                ClientAuthenticationOptions = clientAuthenticationOptions,
36✔
114
                ServerAddress = serverAddress
36✔
115
            },
36✔
116
            duplexClientTransport,
36✔
117
            multiplexedClientTransport,
36✔
118
            logger)
36✔
119
    {
36✔
120
    }
36✔
121

122
    /// <summary>Constructs a client connection with the specified server address URI and client authentication options.
123
    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
124
    /// <param name="serverAddressUri">The connection's server address URI.</param>
125
    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
126
    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
127
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
128
    /// cref="IDuplexClientTransport.Default" />.</param>
129
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
130
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
131
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
132
    /// </param>
133
    public ClientConnection(
134
        Uri serverAddressUri,
135
        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
136
        IDuplexClientTransport? duplexClientTransport = null,
137
        IMultiplexedClientTransport? multiplexedClientTransport = null,
138
        ILogger? logger = null)
139
        : this(
3✔
140
            new ServerAddress(serverAddressUri),
3✔
141
            clientAuthenticationOptions,
3✔
142
            duplexClientTransport,
3✔
143
            multiplexedClientTransport,
3✔
144
            logger)
3✔
145
    {
3✔
146
    }
3✔
147

148
    /// <summary>Establishes the connection.</summary>
149
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
150
    /// <returns>A task that provides the <see cref="TransportConnectionInformation" /> of the transport connection,
151
    /// once this connection is established. This task can also complete with one of the following exceptions:
152
    /// <list type="bullet">
153
    /// <item><description><see cref="AuthenticationException" /> if authentication failed.</description></item>
154
    /// <item><description><see cref="IceRpcException" /> if the connection establishment failed.</description>
155
    /// </item>
156
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
157
    /// cancellation token.</description></item>
158
    /// <item><description><see cref="TimeoutException" /> if this connection attempt or a previous attempt exceeded
159
    /// <see cref="ClientConnectionOptions.ConnectTimeout" />.</description></item>
160
    /// </list>
161
    /// </returns>
162
    /// <exception cref="InvalidOperationException">Thrown if this client connection is shut down or shutting down.
163
    /// </exception>
164
    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
165
    /// <remarks>This method can be called multiple times and concurrently. If the connection is not established, it
166
    /// will be connected or reconnected.</remarks>
167
    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default)
168
    {
50✔
169
        Task<TransportConnectionInformation> connectTask;
170

171
        lock (_mutex)
172
        {
50✔
173
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
50✔
174
            if (_shutdownTask is not null)
50✔
175
            {
×
176
                throw new InvalidOperationException("Cannot connect a client connection after shutting it down.");
×
177
            }
178

179
            if (_activeConnection is not null)
50✔
180
            {
2✔
181
                return Task.FromResult(_activeConnection.Value.ConnectionInformation);
2✔
182
            }
183

184
            if (_pendingConnection is null)
48✔
185
            {
48✔
186
                IProtocolConnection newConnection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
48✔
187
                _detachedConnectionCount++;
48✔
188
                connectTask = CreateConnectTask(newConnection, cancellationToken);
48✔
189
                _pendingConnection = (newConnection, connectTask);
48✔
190
            }
48✔
191
            else
192
            {
×
193
                connectTask = _pendingConnection.Value.ConnectTask.WaitAsync(cancellationToken);
×
194
            }
×
195
        }
48✔
196

197
        return PerformConnectAsync();
48✔
198

199
        async Task<TransportConnectionInformation> PerformConnectAsync()
200
        {
48✔
201
            try
202
            {
48✔
203
                return await connectTask.ConfigureAwait(false);
48✔
204
            }
205
            catch (OperationCanceledException)
3✔
206
            {
3✔
207
                // Canceled via the cancellation token given to ConnectAsync, but not necessarily this ConnectAsync
208
                // call.
209

210
                cancellationToken.ThrowIfCancellationRequested();
3✔
211

212
                throw new IceRpcException(
×
213
                    IceRpcError.ConnectionAborted,
×
214
                    "The connection establishment was canceled by another concurrent attempt.");
×
215
            }
216
        }
27✔
217
    }
50✔
218

219
    /// <summary>Releases all resources allocated by the connection. The connection disposes all the underlying
220
    /// connections it created.</summary>
221
    /// <returns>A value task that completes when the disposal of all the underlying connections has
222
    /// completed.</returns>
223
    /// <remarks>The disposal of an underlying connection aborts invocations, cancels dispatches and disposes the
224
    /// underlying transport connection without waiting for the peer. To wait for invocations and dispatches to
225
    /// complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete promptly when
226
    /// its cancellation token is canceled, the disposal can hang.</remarks>
227
    public ValueTask DisposeAsync()
228
    {
84✔
229
        lock (_mutex)
230
        {
84✔
231
            if (_disposeTask is null)
84✔
232
            {
77✔
233
                _shutdownTask ??= Task.CompletedTask;
77✔
234
                if (_detachedConnectionCount == 0)
77✔
235
                {
74✔
236
                    _ = _detachedConnectionsTcs.TrySetResult();
74✔
237
                }
74✔
238

239
                _disposeTask = PerformDisposeAsync();
77✔
240
            }
77✔
241
        }
84✔
242
        return new(_disposeTask);
84✔
243

244
        async Task PerformDisposeAsync()
245
        {
77✔
246
            await Task.Yield(); // Exit mutex lock
77✔
247

248
            _disposedCts.Cancel();
77✔
249

250
            // Wait for shutdown before disposing connections.
251
            try
252
            {
77✔
253
                await _shutdownTask.ConfigureAwait(false);
77✔
254
            }
75✔
255
            catch
2✔
256
            {
2✔
257
                // ignore exceptions.
258
            }
2✔
259

260
            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
261
            if (_activeConnection is not null)
77✔
262
            {
45✔
263
                await _activeConnection.Value.Connection.DisposeAsync().ConfigureAwait(false);
45✔
264
            }
45✔
265

266
            await _detachedConnectionsTcs.Task.ConfigureAwait(false);
77✔
267

268
            _disposedCts.Dispose();
77✔
269
        }
77✔
270
    }
84✔
271

272
    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
273
    /// <param name="request">The outgoing request being sent.</param>
274
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
275
    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
276
    /// <exception cref="InvalidOperationException">Thrown if none of the request's server addresses matches this
277
    /// connection's server address.</exception>
278
    /// <exception cref="IceRpcException">Thrown with error <see cref="IceRpcError.InvocationRefused" /> if this client
279
    /// connection is shutdown.</exception>
280
    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
281
    /// <remarks>If the connection is not established, it will be connected or reconnected.</remarks>
282
    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
283
    {
42✔
284
        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
42✔
285
        {
×
286
            if (serverAddressFeature.ServerAddress is ServerAddress mainServerAddress)
×
287
            {
×
288
                CheckRequestServerAddresses(mainServerAddress, serverAddressFeature.AltServerAddresses);
×
289
            }
×
290
        }
×
291
        else if (request.ServiceAddress.ServerAddress is ServerAddress mainServerAddress)
42✔
292
        {
17✔
293
            CheckRequestServerAddresses(mainServerAddress, request.ServiceAddress.AltServerAddresses);
17✔
294
        }
11✔
295
        // It's ok if the request has no server address at all.
296

297
        IProtocolConnection? activeConnection = null;
36✔
298

299
        lock (_mutex)
300
        {
36✔
301
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
36✔
302

303
            if (_shutdownTask is not null)
36✔
304
            {
×
305
                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
×
306
            }
307

308
            activeConnection = _activeConnection?.Connection;
36✔
309
        }
36✔
310

311
        return PerformInvokeAsync(activeConnection);
36✔
312

313
        void CheckRequestServerAddresses(
314
            ServerAddress mainServerAddress,
315
            ImmutableList<ServerAddress> altServerAddresses)
316
        {
17✔
317
            if (ServerAddressComparer.OptionalTransport.Equals(mainServerAddress, _serverAddress))
17✔
318
            {
10✔
319
                return;
10✔
320
            }
321

322
            foreach (ServerAddress serverAddress in altServerAddresses)
22✔
323
            {
1✔
324
                if (ServerAddressComparer.OptionalTransport.Equals(serverAddress, _serverAddress))
1✔
325
                {
1✔
326
                    return;
1✔
327
                }
328
            }
×
329

330
            throw new InvalidOperationException(
6✔
331
                $"None of the request's server addresses matches this connection's server address: {_serverAddress}");
6✔
332
        }
11✔
333

334
        async Task<IncomingResponse> PerformInvokeAsync(IProtocolConnection? connection)
335
        {
36✔
336
            // When InvokeAsync throws an IceRpcException(InvocationRefused) we retry unless the client connection is
337
            // being shutdown or disposed.
338
            while (true)
36✔
339
            {
36✔
340
                connection ??= await GetActiveConnectionAsync(cancellationToken).ConfigureAwait(false);
36✔
341

342
                try
343
                {
36✔
344
                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
36✔
345
                }
346
                catch (ObjectDisposedException)
×
347
                {
×
348
                    // This can occasionally happen if we find a connection that was just closed and then automatically
349
                    // disposed by this client connection.
350
                }
×
351
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
2✔
352
                {
×
353
                    // The connection is refusing new invocations.
354
                }
×
355
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
2✔
356
                {
2✔
357
                    lock (_mutex)
358
                    {
2✔
359
                        if (_disposeTask is null)
2✔
360
                        {
×
361
                            throw new IceRpcException(
×
362
                                IceRpcError.ConnectionAborted,
×
363
                                "The underlying connection was disposed while the invocation was in progress.");
×
364
                        }
365
                        else
366
                        {
2✔
367
                            throw;
2✔
368
                        }
369
                    }
370
                }
371

372
                // Make sure connection is no longer in _activeConnection before we retry.
373
                _ = RemoveFromActiveAsync(connection);
×
374
                connection = null;
×
375
            }
×
376
        }
32✔
377
    }
36✔
378

379
    /// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to
380
    /// complete.</summary>
381
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
382
    /// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the
383
    /// following exceptions:
384
    /// <list type="bullet">
385
    /// <item><description><see cref="IceRpcException" /> if the connection shutdown failed.</description></item>
386
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
387
    /// cancellation token.</description></item>
388
    /// <item><description><see cref="TimeoutException" /> if this shutdown attempt or a previous attempt exceeded <see
389
    /// cref="ClientConnectionOptions.ShutdownTimeout" />.</description></item>
390
    /// </list>
391
    /// </returns>
392
    /// <exception cref="InvalidOperationException">Thrown if this connection is already shut down or shutting down.
393
    /// </exception>
394
    /// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
395
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
396
    {
8✔
397
        lock (_mutex)
398
        {
8✔
399
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
8✔
400
            if (_shutdownTask is not null)
8✔
401
            {
×
402
                throw new InvalidOperationException("The client connection is already shut down or shutting down.");
×
403
            }
404

405
            if (_detachedConnectionCount == 0)
8✔
406
            {
8✔
407
                _ = _detachedConnectionsTcs.TrySetResult();
8✔
408
            }
8✔
409

410
            _shutdownTask = PerformShutdownAsync();
8✔
411
            return _shutdownTask;
8✔
412
        }
413

414
        async Task PerformShutdownAsync()
415
        {
8✔
416
            await Task.Yield(); // exit mutex lock
8✔
417

418
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
8✔
419
            cts.CancelAfter(_shutdownTimeout);
8✔
420

421
            // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not directly by
422
            // this method.
423
            try
424
            {
8✔
425
                if (_activeConnection is not null)
8✔
426
                {
6✔
427
                    await _activeConnection.Value.Connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
6✔
428
                }
4✔
429

430
                await _detachedConnectionsTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
6✔
431
            }
6✔
432
            catch (OperationCanceledException)
2✔
433
            {
2✔
434
                cancellationToken.ThrowIfCancellationRequested();
2✔
435

436
                if (_disposedCts.IsCancellationRequested)
1✔
437
                {
×
438
                    throw new IceRpcException(
×
439
                        IceRpcError.OperationAborted,
×
440
                        "The shutdown was aborted because the client connection was disposed.");
×
441
                }
442
                else
443
                {
1✔
444
                    throw new TimeoutException(
1✔
445
                        $"The client connection shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
1✔
446
                }
447
            }
448
            catch
×
449
            {
×
450
                // ignore other shutdown exception
451
            }
×
452
        }
6✔
453
    }
8✔
454

455
    /// <summary>Creates the connection establishment task for a pending connection.</summary>
456
    /// <param name="connection">The new pending connection to connect.</param>
457
    /// <param name="cancellationToken">The cancellation token that can cancel this task.</param>
458
    /// <returns>A task that completes successfully when the connection is connected.</returns>
459
    private async Task<TransportConnectionInformation> CreateConnectTask(
460
        IProtocolConnection connection,
461
        CancellationToken cancellationToken)
462
    {
70✔
463
        await Task.Yield(); // exit mutex lock
70✔
464

465
        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
466
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
70✔
467
        cts.CancelAfter(_connectTimeout);
70✔
468

469
        TransportConnectionInformation connectionInformation;
470
        Task shutdownRequested;
471
        Task? connectTask = null;
70✔
472

473
        try
474
        {
70✔
475
            try
476
            {
70✔
477
                (connectionInformation, shutdownRequested) = await connection.ConnectAsync(cts.Token)
70✔
478
                    .ConfigureAwait(false);
70✔
479
            }
49✔
480
            catch (OperationCanceledException)
5✔
481
            {
5✔
482
                cancellationToken.ThrowIfCancellationRequested();
5✔
483

484
                if (_disposedCts.IsCancellationRequested)
2✔
485
                {
1✔
486
                    throw new IceRpcException(
1✔
487
                        IceRpcError.OperationAborted,
1✔
488
                        "The connection establishment was aborted because the client connection was disposed.");
1✔
489
                }
490
                else
491
                {
1✔
492
                    throw new TimeoutException(
1✔
493
                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
1✔
494
                }
495
            }
496
        }
49✔
497
        catch
21✔
498
        {
21✔
499
            lock (_mutex)
500
            {
21✔
501
                Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
21✔
502
                Debug.Assert(_activeConnection is null);
21✔
503

504
                // connectTask is executing this method and about to throw.
505
                connectTask = _pendingConnection.Value.ConnectTask;
21✔
506
                _pendingConnection = null;
21✔
507
            }
21✔
508

509
            _ = DisposePendingConnectionAsync(connection, connectTask);
21✔
510
            throw;
21✔
511
        }
512

513
        lock (_mutex)
514
        {
49✔
515
            Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
49✔
516
            Debug.Assert(_activeConnection is null);
49✔
517

518
            if (_shutdownTask is null)
49✔
519
            {
49✔
520
                // the connection is now "attached" in _activeConnection
521
                _activeConnection = (connection, connectionInformation);
49✔
522
                _detachedConnectionCount--;
49✔
523
            }
49✔
524
            else
525
            {
×
526
                connectTask = _pendingConnection.Value.ConnectTask;
×
527
            }
×
528
            _pendingConnection = null;
49✔
529
        }
49✔
530

531
        if (connectTask is null)
49✔
532
        {
49✔
533
            _ = ShutdownWhenRequestedAsync(connection, shutdownRequested);
49✔
534
        }
49✔
535
        else
536
        {
×
537
            // As soon as this method completes successfully, we shut down then dispose the connection.
538
            _ = DisposePendingConnectionAsync(connection, connectTask);
×
539
        }
×
540
        return connectionInformation;
49✔
541

542
        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
543
        {
21✔
544
            try
545
            {
21✔
546
                await connectTask.ConfigureAwait(false);
21✔
547

548
                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
549
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
×
550
                cts.CancelAfter(_shutdownTimeout);
×
551
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
×
552
            }
×
553
            catch
21✔
554
            {
21✔
555
                // Observe and ignore exceptions.
556
            }
21✔
557

558
            await connection.DisposeAsync().ConfigureAwait(false);
21✔
559

560
            lock (_mutex)
561
            {
21✔
562
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
21✔
563
                {
3✔
564
                    _detachedConnectionsTcs.SetResult();
3✔
565
                }
3✔
566
            }
21✔
567
        }
21✔
568

569
        async Task ShutdownWhenRequestedAsync(IProtocolConnection connection, Task shutdownRequested)
570
        {
49✔
571
            await shutdownRequested.ConfigureAwait(false);
49✔
572
            await RemoveFromActiveAsync(connection).ConfigureAwait(false);
8✔
573
        }
8✔
574
    }
49✔
575

576
    /// <summary>Removes the connection from _activeConnection, and when successful, shuts down and disposes this
577
    /// connection.</summary>
578
    /// <param name="connection">The connected connection to shutdown and dispose.</param>
579
    private Task RemoveFromActiveAsync(IProtocolConnection connection)
580
    {
8✔
581
        lock (_mutex)
582
        {
8✔
583
            if (_shutdownTask is null && _activeConnection?.Connection == connection)
8✔
584
            {
4✔
585
                _activeConnection = null; // it's now our connection.
4✔
586
                _detachedConnectionCount++;
4✔
587
            }
4✔
588
            else
589
            {
4✔
590
                // Another task owns this connection
591
                return Task.CompletedTask;
4✔
592
            }
593
        }
4✔
594

595
        return ShutdownAndDisposeConnectionAsync();
4✔
596

597
        async Task ShutdownAndDisposeConnectionAsync()
598
        {
4✔
599
            // _disposedCts is not disposed since we own a detachedConnectionCount
600
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
4✔
601
            cts.CancelAfter(_shutdownTimeout);
4✔
602

603
            try
604
            {
4✔
605
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
4✔
606
            }
4✔
607
            catch
×
608
            {
×
609
                // Ignore connection shutdown failures
610
            }
×
611

612
            await connection.DisposeAsync().ConfigureAwait(false);
4✔
613

614
            lock (_mutex)
615
            {
4✔
616
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
4✔
617
                {
×
618
                    _detachedConnectionsTcs.SetResult();
×
619
                }
×
620
            }
4✔
621
        }
4✔
622
    }
8✔
623

624
    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
625
    /// </summary>
626
    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
627
    /// <returns>A connected connection.</returns>
628
    /// <remarks>This method is called exclusively by <see cref="InvokeAsync" />.</remarks>
629
    private ValueTask<IProtocolConnection> GetActiveConnectionAsync(CancellationToken cancellationToken)
630
    {
22✔
631
        (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask) pendingConnectionValue;
632

633
        lock (_mutex)
634
        {
22✔
635
            if (_disposeTask is not null)
22✔
636
            {
×
637
                throw new IceRpcException(IceRpcError.OperationAborted, "The client connection was disposed.");
×
638
            }
639
            if (_shutdownTask is not null)
22✔
640
            {
×
641
                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
×
642
            }
643

644
            if (_activeConnection is not null)
22✔
645
            {
×
646
                return new(_activeConnection.Value.Connection);
×
647
            }
648

649
            if (_pendingConnection is null)
22✔
650
            {
22✔
651
                IProtocolConnection connection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
22✔
652
                _detachedConnectionCount++;
22✔
653

654
                // We pass CancellationToken.None because the invocation cancellation should not cancel the connection
655
                // establishment.
656
                Task<TransportConnectionInformation> connectTask =
22✔
657
                    CreateConnectTask(connection, CancellationToken.None);
22✔
658
                _pendingConnection = (connection, connectTask);
22✔
659
            }
22✔
660
            pendingConnectionValue = _pendingConnection.Value;
22✔
661
        }
22✔
662

663
        return PerformGetActiveConnectionAsync();
22✔
664

665
        async ValueTask<IProtocolConnection> PerformGetActiveConnectionAsync()
666
        {
22✔
667
            // ConnectTask itself takes care of scheduling its exception observation when it fails.
668
            try
669
            {
22✔
670
                _ = await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
22✔
671
            }
22✔
672
            catch (OperationCanceledException)
×
673
            {
×
674
                cancellationToken.ThrowIfCancellationRequested();
×
675

676
                // Canceled by the cancellation token given to ClientConnection.ConnectAsync.
677
                throw new IceRpcException(
×
678
                    IceRpcError.ConnectionAborted,
×
679
                    "The connection establishment was canceled by another concurrent attempt.");
×
680
            }
681
            return pendingConnectionValue.Connection;
22✔
682
        }
22✔
683
    }
22✔
684
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc