• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 19644302394

24 Nov 2025 06:04PM UTC coverage: 83.456% (-0.1%) from 83.563%
19644302394

Pull #4148

github

web-flow
Merge 520878d6c into 13dc25b1f
Pull Request #4148: Add tracing to figure out Ice idle timeout test failure

12011 of 14392 relevant lines covered (83.46%)

2984.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.3
src/IceRpc/ClientConnection.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Features;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Collections.Immutable;
8
using System.Diagnostics;
9
using System.Net.Security;
10
using System.Security.Authentication;
11

12
namespace IceRpc;
13

14
/// <summary>Represents a client connection used to send requests to a server and receive the corresponding responses.
15
/// </summary>
16
/// <remarks>This client connection can also dispatch requests ("callbacks") received from the server. The client
17
/// connection's underlying connection is recreated and reconnected automatically when it's closed by any event other
18
/// than a call to <see cref="ShutdownAsync" /> or <see cref="DisposeAsync" />.</remarks>
19
public sealed class ClientConnection : IInvoker, IAsyncDisposable
20
{
21
    // The underlying protocol connection once successfully established.
22
    private (IProtocolConnection Connection, TransportConnectionInformation ConnectionInformation)? _activeConnection;
23

24
    private readonly IClientProtocolConnectionFactory _clientProtocolConnectionFactory;
25

26
    private readonly TimeSpan _connectTimeout;
27

28
    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
29
    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
30
    // connection is "detached" because it's not in _activeConnection.
31
    private int _detachedConnectionCount;
32

33
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
76✔
34

35
    // A cancellation token source that is canceled when DisposeAsync is called.
36
    private readonly CancellationTokenSource _disposedCts = new();
76✔
37
    private Task? _disposeTask;
38

39
    private readonly Lock _mutex = new();
76✔
40

41
    // A connection being established and its associated connect task. When non-null, _activeConnection is null.
42
    private (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask)? _pendingConnection;
43

44
    private Task? _shutdownTask;
45

46
    private readonly TimeSpan _shutdownTimeout;
47

48
    private readonly ServerAddress _serverAddress;
49

50
    /// <summary>Constructs a client connection.</summary>
51
    /// <param name="options">The client connection options.</param>
52
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
53
    /// cref="IDuplexClientTransport.Default" />.</param>
54
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
55
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
56
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
57
    /// </param>
58
    public ClientConnection(
76✔
59
        ClientConnectionOptions options,
76✔
60
        IDuplexClientTransport? duplexClientTransport = null,
76✔
61
        IMultiplexedClientTransport? multiplexedClientTransport = null,
76✔
62
        ILogger? logger = null)
76✔
63
    {
76✔
64
        _connectTimeout = options.ConnectTimeout;
76✔
65
        _shutdownTimeout = options.ShutdownTimeout;
76✔
66

67
        duplexClientTransport ??= IDuplexClientTransport.Default;
76✔
68
        multiplexedClientTransport ??= IMultiplexedClientTransport.Default;
76✔
69

70
        _serverAddress = options.ServerAddress ??
76✔
71
            throw new ArgumentException(
76✔
72
                $"{nameof(ClientConnectionOptions.ServerAddress)} is not set",
76✔
73
                nameof(options));
76✔
74

75
        if (_serverAddress.Transport is null)
76✔
76
        {
40✔
77
            _serverAddress = _serverAddress with
40✔
78
            {
40✔
79
                Transport = _serverAddress.Protocol == Protocol.Ice ?
40✔
80
                    duplexClientTransport.Name : multiplexedClientTransport.Name
40✔
81
            };
40✔
82
        }
40✔
83

84
        _clientProtocolConnectionFactory = new ClientProtocolConnectionFactory(
76✔
85
            options,
76✔
86
            options.ClientAuthenticationOptions,
76✔
87
            duplexClientTransport,
76✔
88
            multiplexedClientTransport,
76✔
89
            logger);
76✔
90
    }
76✔
91

92
    /// <summary>Constructs a client connection with the specified server address and client authentication options.
93
    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
94
    /// <param name="serverAddress">The connection's server address.</param>
95
    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
96
    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
97
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
98
    /// cref="IDuplexClientTransport.Default" />.</param>
99
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
100
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
101
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
102
    /// </param>
103
    public ClientConnection(
104
        ServerAddress serverAddress,
105
        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
106
        IDuplexClientTransport? duplexClientTransport = null,
107
        IMultiplexedClientTransport? multiplexedClientTransport = null,
108
        ILogger? logger = null)
109
        : this(
36✔
110
            new ClientConnectionOptions
36✔
111
            {
36✔
112
                ClientAuthenticationOptions = clientAuthenticationOptions,
36✔
113
                ServerAddress = serverAddress
36✔
114
            },
36✔
115
            duplexClientTransport,
36✔
116
            multiplexedClientTransport,
36✔
117
            logger)
36✔
118
    {
36✔
119
    }
36✔
120

121
    /// <summary>Constructs a client connection with the specified server address URI and client authentication options.
122
    /// All other properties use the <see cref="ClientConnectionOptions" /> defaults.</summary>
123
    /// <param name="serverAddressUri">The connection's server address URI.</param>
124
    /// <param name="clientAuthenticationOptions">The SSL client authentication options. When not <see langword="null"
125
    /// />, <see cref="ConnectAsync(CancellationToken)" /> will either establish a secure connection or fail.</param>
126
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
127
    /// cref="IDuplexClientTransport.Default" />.</param>
128
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
129
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
130
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance" />.
131
    /// </param>
132
    public ClientConnection(
133
        Uri serverAddressUri,
134
        SslClientAuthenticationOptions? clientAuthenticationOptions = null,
135
        IDuplexClientTransport? duplexClientTransport = null,
136
        IMultiplexedClientTransport? multiplexedClientTransport = null,
137
        ILogger? logger = null)
138
        : this(
3✔
139
            new ServerAddress(serverAddressUri),
3✔
140
            clientAuthenticationOptions,
3✔
141
            duplexClientTransport,
3✔
142
            multiplexedClientTransport,
3✔
143
            logger)
3✔
144
    {
3✔
145
    }
3✔
146

147
    /// <summary>Establishes the connection.</summary>
148
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
149
    /// <returns>A task that provides the <see cref="TransportConnectionInformation" /> of the transport connection,
150
    /// once this connection is established. This task can also complete with one of the following exceptions:
151
    /// <list type="bullet">
152
    /// <item><description><see cref="AuthenticationException" /> if authentication failed.</description></item>
153
    /// <item><description><see cref="IceRpcException" /> if the connection establishment failed.</description>
154
    /// </item>
155
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
156
    /// cancellation token.</description></item>
157
    /// <item><description><see cref="TimeoutException" /> if this connection attempt or a previous attempt exceeded
158
    /// <see cref="ClientConnectionOptions.ConnectTimeout" />.</description></item>
159
    /// </list>
160
    /// </returns>
161
    /// <exception cref="InvalidOperationException">Thrown if this client connection is shut down or shutting down.
162
    /// </exception>
163
    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
164
    /// <remarks>This method can be called multiple times and concurrently. If the connection is not established, it
165
    /// will be connected or reconnected.</remarks>
166
    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken = default)
167
    {
49✔
168
        Task<TransportConnectionInformation> connectTask;
169

170
        lock (_mutex)
171
        {
49✔
172
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
49✔
173
            if (_shutdownTask is not null)
49✔
174
            {
×
175
                throw new InvalidOperationException("Cannot connect a client connection after shutting it down.");
×
176
            }
177

178
            if (_activeConnection is not null)
49✔
179
            {
2✔
180
                return Task.FromResult(_activeConnection.Value.ConnectionInformation);
2✔
181
            }
182

183
            if (_pendingConnection is null)
47✔
184
            {
47✔
185
                IProtocolConnection newConnection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
47✔
186
                _detachedConnectionCount++;
47✔
187
                connectTask = CreateConnectTask(newConnection, cancellationToken);
47✔
188
                _pendingConnection = (newConnection, connectTask);
47✔
189
            }
47✔
190
            else
191
            {
×
192
                connectTask = _pendingConnection.Value.ConnectTask.WaitAsync(cancellationToken);
×
193
            }
×
194
        }
47✔
195

196
        return PerformConnectAsync();
47✔
197

198
        async Task<TransportConnectionInformation> PerformConnectAsync()
199
        {
47✔
200
            try
201
            {
47✔
202
                return await connectTask.ConfigureAwait(false);
47✔
203
            }
204
            catch (OperationCanceledException)
3✔
205
            {
3✔
206
                // Canceled via the cancellation token given to ConnectAsync, but not necessarily this ConnectAsync
207
                // call.
208

209
                cancellationToken.ThrowIfCancellationRequested();
3✔
210

211
                throw new IceRpcException(
×
212
                    IceRpcError.ConnectionAborted,
×
213
                    "The connection establishment was canceled by another concurrent attempt.");
×
214
            }
215
        }
26✔
216
    }
49✔
217

218
    /// <summary>Releases all resources allocated by the connection. The connection disposes all the underlying
219
    /// connections it created.</summary>
220
    /// <returns>A value task that completes when the disposal of all the underlying connections has
221
    /// completed.</returns>
222
    /// <remarks>The disposal of an underlying connection aborts invocations, cancels dispatches and disposes the
223
    /// underlying transport connection without waiting for the peer. To wait for invocations and dispatches to
224
    /// complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete promptly when
225
    /// its cancellation token is canceled, the disposal can hang.</remarks>
226
    public ValueTask DisposeAsync()
227
    {
83✔
228
        lock (_mutex)
229
        {
83✔
230
            if (_disposeTask is null)
83✔
231
            {
76✔
232
                _shutdownTask ??= Task.CompletedTask;
76✔
233
                if (_detachedConnectionCount == 0)
76✔
234
                {
73✔
235
                    _ = _detachedConnectionsTcs.TrySetResult();
73✔
236
                }
73✔
237

238
                _disposeTask = PerformDisposeAsync();
76✔
239
            }
76✔
240
        }
83✔
241
        return new(_disposeTask);
83✔
242

243
        async Task PerformDisposeAsync()
244
        {
76✔
245
            await Task.Yield(); // Exit mutex lock
76✔
246

247
            _disposedCts.Cancel();
76✔
248

249
            // Wait for shutdown before disposing connections.
250
            try
251
            {
76✔
252
                await _shutdownTask.ConfigureAwait(false);
76✔
253
            }
74✔
254
            catch
2✔
255
            {
2✔
256
                // ignore exceptions.
257
            }
2✔
258

259
            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
260
            if (_activeConnection is not null)
76✔
261
            {
44✔
262
                await _activeConnection.Value.Connection.DisposeAsync().ConfigureAwait(false);
44✔
263
            }
44✔
264

265
            await _detachedConnectionsTcs.Task.ConfigureAwait(false);
76✔
266

267
            _disposedCts.Dispose();
76✔
268
        }
76✔
269
    }
83✔
270

271
    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
272
    /// <param name="request">The outgoing request being sent.</param>
273
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
274
    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
275
    /// <exception cref="InvalidOperationException">Thrown if none of the request's server addresses matches this
276
    /// connection's server address.</exception>
277
    /// <exception cref="IceRpcException">Thrown with error <see cref="IceRpcError.InvocationRefused" /> if this client
278
    /// connection is shutdown.</exception>
279
    /// <exception cref="ObjectDisposedException">Thrown if this client connection is disposed.</exception>
280
    /// <remarks>If the connection is not established, it will be connected or reconnected.</remarks>
281
    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
282
    {
42✔
283
        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
42✔
284
        {
×
285
            if (serverAddressFeature.ServerAddress is ServerAddress mainServerAddress)
×
286
            {
×
287
                CheckRequestServerAddresses(mainServerAddress, serverAddressFeature.AltServerAddresses);
×
288
            }
×
289
        }
×
290
        else if (request.ServiceAddress.ServerAddress is ServerAddress mainServerAddress)
42✔
291
        {
17✔
292
            CheckRequestServerAddresses(mainServerAddress, request.ServiceAddress.AltServerAddresses);
17✔
293
        }
11✔
294
        // It's ok if the request has no server address at all.
295

296
        IProtocolConnection? activeConnection = null;
36✔
297

298
        lock (_mutex)
299
        {
36✔
300
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
36✔
301

302
            if (_shutdownTask is not null)
36✔
303
            {
×
304
                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
×
305
            }
306

307
            activeConnection = _activeConnection?.Connection;
36✔
308
        }
36✔
309

310
        return PerformInvokeAsync(activeConnection);
36✔
311

312
        void CheckRequestServerAddresses(
313
            ServerAddress mainServerAddress,
314
            ImmutableList<ServerAddress> altServerAddresses)
315
        {
17✔
316
            if (ServerAddressComparer.OptionalTransport.Equals(mainServerAddress, _serverAddress))
17✔
317
            {
10✔
318
                return;
10✔
319
            }
320

321
            foreach (ServerAddress serverAddress in altServerAddresses)
22✔
322
            {
1✔
323
                if (ServerAddressComparer.OptionalTransport.Equals(serverAddress, _serverAddress))
1✔
324
                {
1✔
325
                    return;
1✔
326
                }
327
            }
×
328

329
            throw new InvalidOperationException(
6✔
330
                $"None of the request's server addresses matches this connection's server address: {_serverAddress}");
6✔
331
        }
11✔
332

333
        async Task<IncomingResponse> PerformInvokeAsync(IProtocolConnection? connection)
334
        {
36✔
335
            // When InvokeAsync throws an IceRpcException(InvocationRefused) we retry unless the client connection is
336
            // being shutdown or disposed.
337
            while (true)
36✔
338
            {
36✔
339
                connection ??= await GetActiveConnectionAsync(cancellationToken).ConfigureAwait(false);
36✔
340

341
                try
342
                {
36✔
343
                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
36✔
344
                }
345
                catch (ObjectDisposedException)
×
346
                {
×
347
                    // This can occasionally happen if we find a connection that was just closed and then automatically
348
                    // disposed by this client connection.
349
                }
×
350
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
2✔
351
                {
×
352
                    // The connection is refusing new invocations.
353
                }
×
354
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
2✔
355
                {
2✔
356
                    lock (_mutex)
357
                    {
2✔
358
                        if (_disposeTask is null)
2✔
359
                        {
×
360
                            throw new IceRpcException(
×
361
                                IceRpcError.ConnectionAborted,
×
362
                                "The underlying connection was disposed while the invocation was in progress.");
×
363
                        }
364
                        else
365
                        {
2✔
366
                            throw;
2✔
367
                        }
368
                    }
369
                }
370

371
                // Make sure connection is no longer in _activeConnection before we retry.
372
                _ = RemoveFromActiveAsync(connection);
×
373
                connection = null;
×
374
            }
×
375
        }
32✔
376
    }
36✔
377

378
    /// <summary>Gracefully shuts down the connection. The shutdown waits for pending invocations and dispatches to
379
    /// complete.</summary>
380
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
381
    /// <returns>A task that completes once the shutdown is complete. This task can also complete with one of the
382
    /// following exceptions:
383
    /// <list type="bullet">
384
    /// <item><description><see cref="IceRpcException" /> if the connection shutdown failed.</description></item>
385
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
386
    /// cancellation token.</description></item>
387
    /// <item><description><see cref="TimeoutException" /> if this shutdown attempt or a previous attempt exceeded <see
388
    /// cref="ClientConnectionOptions.ShutdownTimeout" />.</description></item>
389
    /// </list>
390
    /// </returns>
391
    /// <exception cref="InvalidOperationException">Thrown if this connection is already shut down or shutting down.
392
    /// </exception>
393
    /// <exception cref="ObjectDisposedException">Thrown if this connection is disposed.</exception>
394
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
395
    {
8✔
396
        lock (_mutex)
397
        {
8✔
398
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
8✔
399
            if (_shutdownTask is not null)
8✔
400
            {
×
401
                throw new InvalidOperationException("The client connection is already shut down or shutting down.");
×
402
            }
403

404
            if (_detachedConnectionCount == 0)
8✔
405
            {
8✔
406
                _ = _detachedConnectionsTcs.TrySetResult();
8✔
407
            }
8✔
408

409
            _shutdownTask = PerformShutdownAsync();
8✔
410
            return _shutdownTask;
8✔
411
        }
412

413
        async Task PerformShutdownAsync()
414
        {
8✔
415
            await Task.Yield(); // exit mutex lock
8✔
416

417
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
8✔
418
            cts.CancelAfter(_shutdownTimeout);
8✔
419

420
            // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not directly by
421
            // this method.
422
            try
423
            {
8✔
424
                if (_activeConnection is not null)
8✔
425
                {
6✔
426
                    await _activeConnection.Value.Connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
6✔
427
                }
4✔
428

429
                await _detachedConnectionsTcs.Task.WaitAsync(cts.Token).ConfigureAwait(false);
6✔
430
            }
6✔
431
            catch (OperationCanceledException)
2✔
432
            {
2✔
433
                cancellationToken.ThrowIfCancellationRequested();
2✔
434

435
                if (_disposedCts.IsCancellationRequested)
1✔
436
                {
×
437
                    throw new IceRpcException(
×
438
                        IceRpcError.OperationAborted,
×
439
                        "The shutdown was aborted because the client connection was disposed.");
×
440
                }
441
                else
442
                {
1✔
443
                    throw new TimeoutException(
1✔
444
                        $"The client connection shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
1✔
445
                }
446
            }
447
            catch
×
448
            {
×
449
                // ignore other shutdown exception
450
            }
×
451
        }
6✔
452
    }
8✔
453

454
    /// <summary>Creates the connection establishment task for a pending connection.</summary>
455
    /// <param name="connection">The new pending connection to connect.</param>
456
    /// <param name="cancellationToken">The cancellation token that can cancel this task.</param>
457
    /// <returns>A task that completes successfully when the connection is connected.</returns>
458
    private async Task<TransportConnectionInformation> CreateConnectTask(
459
        IProtocolConnection connection,
460
        CancellationToken cancellationToken)
461
    {
69✔
462
        await Task.Yield(); // exit mutex lock
69✔
463

464
        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
465
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
69✔
466
        cts.CancelAfter(_connectTimeout);
69✔
467

468
        TransportConnectionInformation connectionInformation;
469
        Task shutdownRequested;
470
        Task? connectTask = null;
69✔
471

472
        try
473
        {
69✔
474
            try
475
            {
69✔
476
                (connectionInformation, shutdownRequested) = await connection.ConnectAsync(cts.Token)
69✔
477
                    .ConfigureAwait(false);
69✔
478
            }
48✔
479
            catch (OperationCanceledException)
5✔
480
            {
5✔
481
                cancellationToken.ThrowIfCancellationRequested();
5✔
482

483
                if (_disposedCts.IsCancellationRequested)
2✔
484
                {
1✔
485
                    throw new IceRpcException(
1✔
486
                        IceRpcError.OperationAborted,
1✔
487
                        "The connection establishment was aborted because the client connection was disposed.");
1✔
488
                }
489
                else
490
                {
1✔
491
                    throw new TimeoutException(
1✔
492
                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
1✔
493
                }
494
            }
495
        }
48✔
496
        catch
21✔
497
        {
21✔
498
            lock (_mutex)
499
            {
21✔
500
                Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
21✔
501
                Debug.Assert(_activeConnection is null);
21✔
502

503
                // connectTask is executing this method and about to throw.
504
                connectTask = _pendingConnection.Value.ConnectTask;
21✔
505
                _pendingConnection = null;
21✔
506
            }
21✔
507

508
            _ = DisposePendingConnectionAsync(connection, connectTask);
21✔
509
            throw;
21✔
510
        }
511

512
        lock (_mutex)
513
        {
48✔
514
            Debug.Assert(_pendingConnection is not null && _pendingConnection.Value.Connection == connection);
48✔
515
            Debug.Assert(_activeConnection is null);
48✔
516

517
            if (_shutdownTask is null)
48✔
518
            {
48✔
519
                // the connection is now "attached" in _activeConnection
520
                _activeConnection = (connection, connectionInformation);
48✔
521
                _detachedConnectionCount--;
48✔
522
            }
48✔
523
            else
524
            {
×
525
                connectTask = _pendingConnection.Value.ConnectTask;
×
526
            }
×
527
            _pendingConnection = null;
48✔
528
        }
48✔
529

530
        if (connectTask is null)
48✔
531
        {
48✔
532
            _ = ShutdownWhenRequestedAsync(connection, shutdownRequested);
48✔
533
        }
48✔
534
        else
535
        {
×
536
            // As soon as this method completes successfully, we shut down then dispose the connection.
537
            _ = DisposePendingConnectionAsync(connection, connectTask);
×
538
        }
×
539
        return connectionInformation;
48✔
540

541
        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
542
        {
21✔
543
            try
544
            {
21✔
545
                await connectTask.ConfigureAwait(false);
21✔
546

547
                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
548
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
×
549
                cts.CancelAfter(_shutdownTimeout);
×
550
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
×
551
            }
×
552
            catch
21✔
553
            {
21✔
554
                // Observe and ignore exceptions.
555
            }
21✔
556

557
            await connection.DisposeAsync().ConfigureAwait(false);
21✔
558

559
            lock (_mutex)
560
            {
21✔
561
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
21✔
562
                {
3✔
563
                    _detachedConnectionsTcs.SetResult();
3✔
564
                }
3✔
565
            }
21✔
566
        }
21✔
567

568
        async Task ShutdownWhenRequestedAsync(IProtocolConnection connection, Task shutdownRequested)
569
        {
48✔
570
            await shutdownRequested.ConfigureAwait(false);
48✔
571
            await RemoveFromActiveAsync(connection).ConfigureAwait(false);
10✔
572
        }
10✔
573
    }
48✔
574

575
    /// <summary>Removes the connection from _activeConnection, and when successful, shuts down and disposes this
576
    /// connection.</summary>
577
    /// <param name="connection">The connected connection to shutdown and dispose.</param>
578
    private Task RemoveFromActiveAsync(IProtocolConnection connection)
579
    {
10✔
580
        lock (_mutex)
581
        {
10✔
582
            if (_shutdownTask is null && _activeConnection?.Connection == connection)
10✔
583
            {
4✔
584
                _activeConnection = null; // it's now our connection.
4✔
585
                _detachedConnectionCount++;
4✔
586
            }
4✔
587
            else
588
            {
6✔
589
                // Another task owns this connection
590
                return Task.CompletedTask;
6✔
591
            }
592
        }
4✔
593

594
        return ShutdownAndDisposeConnectionAsync();
4✔
595

596
        async Task ShutdownAndDisposeConnectionAsync()
597
        {
4✔
598
            // _disposedCts is not disposed since we own a detachedConnectionCount
599
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
4✔
600
            cts.CancelAfter(_shutdownTimeout);
4✔
601

602
            try
603
            {
4✔
604
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
4✔
605
            }
4✔
606
            catch
×
607
            {
×
608
                // Ignore connection shutdown failures
609
            }
×
610

611
            await connection.DisposeAsync().ConfigureAwait(false);
4✔
612

613
            lock (_mutex)
614
            {
4✔
615
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
4✔
616
                {
×
617
                    _detachedConnectionsTcs.SetResult();
×
618
                }
×
619
            }
4✔
620
        }
4✔
621
    }
10✔
622

623
    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
624
    /// </summary>
625
    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
626
    /// <returns>A connected connection.</returns>
627
    /// <remarks>This method is called exclusively by <see cref="InvokeAsync" />.</remarks>
628
    private ValueTask<IProtocolConnection> GetActiveConnectionAsync(CancellationToken cancellationToken)
629
    {
22✔
630
        (IProtocolConnection Connection, Task<TransportConnectionInformation> ConnectTask) pendingConnectionValue;
631

632
        lock (_mutex)
633
        {
22✔
634
            if (_disposeTask is not null)
22✔
635
            {
×
636
                throw new IceRpcException(IceRpcError.OperationAborted, "The client connection was disposed.");
×
637
            }
638
            if (_shutdownTask is not null)
22✔
639
            {
×
640
                throw new IceRpcException(IceRpcError.InvocationRefused, "The client connection was shut down.");
×
641
            }
642

643
            if (_activeConnection is not null)
22✔
644
            {
×
645
                return new(_activeConnection.Value.Connection);
×
646
            }
647

648
            if (_pendingConnection is null)
22✔
649
            {
22✔
650
                IProtocolConnection connection = _clientProtocolConnectionFactory.CreateConnection(_serverAddress);
22✔
651
                _detachedConnectionCount++;
22✔
652

653
                // We pass CancellationToken.None because the invocation cancellation should not cancel the connection
654
                // establishment.
655
                Task<TransportConnectionInformation> connectTask =
22✔
656
                    CreateConnectTask(connection, CancellationToken.None);
22✔
657
                _pendingConnection = (connection, connectTask);
22✔
658
            }
22✔
659
            pendingConnectionValue = _pendingConnection.Value;
22✔
660
        }
22✔
661

662
        return PerformGetActiveConnectionAsync();
22✔
663

664
        async ValueTask<IProtocolConnection> PerformGetActiveConnectionAsync()
665
        {
22✔
666
            // ConnectTask itself takes care of scheduling its exception observation when it fails.
667
            try
668
            {
22✔
669
                _ = await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
22✔
670
            }
22✔
671
            catch (OperationCanceledException)
×
672
            {
×
673
                cancellationToken.ThrowIfCancellationRequested();
×
674

675
                // Canceled by the cancellation token given to ClientConnection.ConnectAsync.
676
                throw new IceRpcException(
×
677
                    IceRpcError.ConnectionAborted,
×
678
                    "The connection establishment was canceled by another concurrent attempt.");
×
679
            }
680
            return pendingConnectionValue.Connection;
22✔
681
        }
22✔
682
    }
22✔
683
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc