• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 19712769452

26 Nov 2025 05:46PM UTC coverage: 83.473% (+0.07%) from 83.407%
19712769452

push

github

web-flow
Add HandshakeTimeout to MultiplexedConnectionOptions (#4172)

12021 of 14401 relevant lines covered (83.47%)

2974.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.08
src/IceRpc/ConnectionCache.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Features;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Diagnostics;
8
using System.Diagnostics.CodeAnalysis;
9
using System.Runtime.ExceptionServices;
10

11
namespace IceRpc;
12

13
/// <summary>Represents an invoker that routes outgoing requests to connections it manages.</summary>
14
/// <remarks><para>The connection cache routes requests based on the request's <see cref="IServerAddressFeature" />
15
/// feature or the server addresses of the request's target service.</para>
16
/// <para>The connection cache keeps at most one active connection per server address.</para></remarks>
17
public sealed class ConnectionCache : IInvoker, IAsyncDisposable
18
{
19
    // Connected connections.
20
    private readonly Dictionary<ServerAddress, IProtocolConnection> _activeConnections =
11✔
21
        new(ServerAddressComparer.OptionalTransport);
11✔
22

23
    private readonly IClientProtocolConnectionFactory _connectionFactory;
24

25
    private readonly TimeSpan _connectTimeout;
26

27
    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
28
    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
29
    // connection is "detached" because it's not in _activeConnections.
30
    private int _detachedConnectionCount;
31

32
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
11✔
33

34
    // A cancellation token source that is canceled when DisposeAsync is called.
35
    private readonly CancellationTokenSource _disposedCts = new();
11✔
36

37
    private Task? _disposeTask;
38

39
    private readonly Lock _mutex = new();
11✔
40

41
    // New connections in the process of connecting.
42
    private readonly Dictionary<ServerAddress, (IProtocolConnection Connection, Task ConnectTask)> _pendingConnections =
11✔
43
        new(ServerAddressComparer.OptionalTransport);
11✔
44

45
    private readonly bool _preferExistingConnection;
46

47
    private Task? _shutdownTask;
48

49
    private readonly TimeSpan _shutdownTimeout;
50

51
    /// <summary>Constructs a connection cache.</summary>
52
    /// <param name="options">The connection cache options.</param>
53
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
54
    /// cref="IDuplexClientTransport.Default" />.</param>
55
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
56
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
57
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
58
    /// />.</param>
59
    public ConnectionCache(
11✔
60
        ConnectionCacheOptions options,
11✔
61
        IDuplexClientTransport? duplexClientTransport = null,
11✔
62
        IMultiplexedClientTransport? multiplexedClientTransport = null,
11✔
63
        ILogger? logger = null)
11✔
64
    {
11✔
65
        _connectionFactory = new ClientProtocolConnectionFactory(
11✔
66
            options.ConnectionOptions,
11✔
67
            options.ConnectTimeout,
11✔
68
            options.ClientAuthenticationOptions,
11✔
69
            duplexClientTransport,
11✔
70
            multiplexedClientTransport,
11✔
71
            logger);
11✔
72

73
        _connectTimeout = options.ConnectTimeout;
11✔
74
        _shutdownTimeout = options.ShutdownTimeout;
11✔
75

76
        _preferExistingConnection = options.PreferExistingConnection;
11✔
77
    }
11✔
78

79
    /// <summary>Constructs a connection cache using the default options.</summary>
80
    public ConnectionCache()
81
        : this(new ConnectionCacheOptions())
×
82
    {
×
83
    }
×
84

85
    /// <summary>Releases all resources allocated by the cache. The cache disposes all the connections it
86
    /// created.</summary>
87
    /// <returns>A value task that completes when the disposal of all connections created by this cache has completed.
88
    /// This includes connections that were active when this method is called and connections whose disposal was
89
    /// initiated prior to this call.</returns>
90
    /// <remarks>The disposal of an underlying connection of the cache  aborts invocations, cancels dispatches and
91
    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
92
    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
93
    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
94
    public ValueTask DisposeAsync()
95
    {
12✔
96
        lock (_mutex)
97
        {
12✔
98
            if (_disposeTask is null)
12✔
99
            {
11✔
100
                _shutdownTask ??= Task.CompletedTask;
11✔
101
                if (_detachedConnectionCount == 0)
11✔
102
                {
10✔
103
                    _ = _detachedConnectionsTcs.TrySetResult();
10✔
104
                }
10✔
105

106
                _disposeTask = PerformDisposeAsync();
11✔
107
            }
11✔
108
            return new(_disposeTask);
12✔
109
        }
110

111
        async Task PerformDisposeAsync()
112
        {
11✔
113
            await Task.Yield(); // exit mutex lock
11✔
114

115
            _disposedCts.Cancel();
11✔
116

117
            // Wait for shutdown before disposing connections.
118
            try
119
            {
11✔
120
                await _shutdownTask.ConfigureAwait(false);
11✔
121
            }
11✔
122
            catch
×
123
            {
×
124
                // Ignore exceptions.
125
            }
×
126

127
            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
128
            await Task.WhenAll(
11✔
129
                _activeConnections.Values.Select(connection => connection.DisposeAsync().AsTask())
8✔
130
                    .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
11✔
131

132
            _disposedCts.Dispose();
11✔
133
        }
11✔
134
    }
12✔
135

136
    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
137
    /// <param name="request">The outgoing request being sent.</param>
138
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
139
    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
140
    /// <exception cref="InvalidOperationException">Thrown if no <see cref="IServerAddressFeature" /> feature is set and
141
    /// the request's service address has no server addresses.</exception>
142
    /// <exception cref="IceRpcException">Thrown with one of the following error:
143
    /// <list type="bullet"><item><term><see cref="IceRpcError.InvocationRefused" /></term><description>This error
144
    /// indicates that the connection cache is shutdown.</description></item>
145
    /// <item><term><see cref="IceRpcError.NoConnection" /></term><description>This error indicates that the request
146
    /// <see cref="IServerAddressFeature" /> feature has no server addresses.</description></item></list>.
147
    /// </exception>
148
    /// <exception cref="ObjectDisposedException">Thrown if this connection cache is disposed.</exception>
149
    /// <remarks><para>If the request <see cref="IServerAddressFeature" /> feature is not set, the cache sets it from
150
    /// the server addresses of the target service.</para>
151
    /// <para>It then looks for an active connection. The <see cref="ConnectionCacheOptions.PreferExistingConnection" />
152
    /// property influences how the cache selects this active connection. If no active connection can be found, the
153
    /// cache creates a new connection to one of the server addresses from the <see cref="IServerAddressFeature" />
154
    /// feature.</para>
155
    /// <para>If the connection establishment to <see cref="IServerAddressFeature.ServerAddress" /> fails, <see
156
    /// cref="IServerAddressFeature.ServerAddress" /> is appended at the end of <see
157
    /// cref="IServerAddressFeature.AltServerAddresses" /> and the first address from <see
158
    /// cref="IServerAddressFeature.AltServerAddresses" /> replaces <see cref="IServerAddressFeature.ServerAddress" />.
159
    /// The cache tries again to find or establish a connection to <see cref="IServerAddressFeature.ServerAddress" />.
160
    /// If unsuccessful, the cache repeats this process until success or until it tried all the addresses. If all the
161
    /// attempts fail, this method throws the exception from the last attempt.</para></remarks>
162
    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken)
163
    {
93✔
164
        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
93✔
165
        {
×
166
            if (serverAddressFeature.ServerAddress is null)
×
167
            {
×
168
                throw new IceRpcException(
×
169
                    IceRpcError.NoConnection,
×
170
                    $"Could not invoke '{request.Operation}' on '{request.ServiceAddress}': tried all server addresses without success.");
×
171
            }
172
        }
×
173
        else
174
        {
93✔
175
            if (request.ServiceAddress.ServerAddress is null)
93✔
176
            {
×
177
                throw new InvalidOperationException("Cannot send a request to a service without a server address.");
×
178
            }
179

180
            serverAddressFeature = new ServerAddressFeature(request.ServiceAddress);
93✔
181
            request.Features = request.Features.With(serverAddressFeature);
93✔
182
        }
93✔
183

184
        lock (_mutex)
185
        {
93✔
186
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
93✔
187

188
            if (_shutdownTask is not null)
93✔
189
            {
×
190
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
191
            }
192
        }
93✔
193

194
        return PerformInvokeAsync();
93✔
195

196
        async Task<IncomingResponse> PerformInvokeAsync()
197
        {
93✔
198
            Debug.Assert(serverAddressFeature.ServerAddress is not null);
93✔
199

200
            // When InvokeAsync (or ConnectAsync) throws an IceRpcException(InvocationRefused) we retry unless the
201
            // cache is being shutdown.
202
            while (true)
93✔
203
            {
93✔
204
                IProtocolConnection? connection = null;
93✔
205
                if (_preferExistingConnection)
93✔
206
                {
91✔
207
                    _ = TryGetActiveConnection(serverAddressFeature, out connection);
91✔
208
                }
91✔
209
                connection ??= await GetActiveConnectionAsync(serverAddressFeature, cancellationToken)
93✔
210
                    .ConfigureAwait(false);
93✔
211

212
                try
213
                {
93✔
214
                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
93✔
215
                }
216
                catch (ObjectDisposedException)
×
217
                {
×
218
                    // This can occasionally happen if we find a connection that was just closed and then automatically
219
                    // disposed by this connection cache.
220
                }
×
221
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
×
222
                {
×
223
                    // The connection is refusing new invocations.
224
                }
×
225
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
×
226
                {
×
227
                    lock (_mutex)
228
                    {
×
229
                        if (_disposeTask is null)
×
230
                        {
×
231
                            throw new IceRpcException(
×
232
                                IceRpcError.ConnectionAborted,
×
233
                                "The underlying connection was disposed while the invocation was in progress.");
×
234
                        }
235
                        else
236
                        {
×
237
                            throw;
×
238
                        }
239
                    }
240
                }
241

242
                // Make sure connection is no longer in _activeConnection before we retry.
243
                _ = RemoveFromActiveAsync(serverAddressFeature.ServerAddress.Value, connection);
×
244
            }
×
245
        }
93✔
246
    }
93✔
247

248
    /// <summary>Gracefully shuts down all connections created by this cache.</summary>
249
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
250
    /// <returns>A task that completes successfully once the shutdown of all connections created by this cache has
251
    /// completed. This includes connections that were active when this method is called and connections whose shutdown
252
    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
253
    /// <list type="bullet">
254
    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
255
    /// connection cache is disposed while being shut down.</description></item>
256
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
257
    /// cancellation token.</description></item>
258
    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
259
    /// </list>
260
    /// </returns>
261
    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
262
    /// <exception cref="ObjectDisposedException">Thrown if the connection cache is disposed.</exception>
263
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
264
    {
10✔
265
        lock (_mutex)
266
        {
10✔
267
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
10✔
268

269
            if (_shutdownTask is not null)
10✔
270
            {
×
271
                throw new InvalidOperationException("The connection cache is already shut down or shutting down.");
×
272
            }
273

274
            if (_detachedConnectionCount == 0)
10✔
275
            {
8✔
276
                _detachedConnectionsTcs.SetResult();
8✔
277
            }
8✔
278

279
            _shutdownTask = PerformShutdownAsync();
10✔
280
        }
10✔
281

282
        return _shutdownTask;
10✔
283

284
        async Task PerformShutdownAsync()
285
        {
10✔
286
            await Task.Yield(); // exit mutex lock
10✔
287

288
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
10✔
289
            cts.CancelAfter(_shutdownTimeout);
10✔
290

291
            try
292
            {
10✔
293
                // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not
294
                // directly by this method.
295
                try
296
                {
10✔
297
                    await Task.WhenAll(
10✔
298
                        _activeConnections.Values.Select(connection => connection.ShutdownAsync(cts.Token))
8✔
299
                            .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token))).ConfigureAwait(false);
10✔
300
                }
10✔
301
                catch (OperationCanceledException)
×
302
                {
×
303
                    throw;
×
304
                }
305
                catch
×
306
                {
×
307
                    // Ignore other connection shutdown failures.
308

309
                    // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
310
                    cts.Token.ThrowIfCancellationRequested();
×
311
                }
×
312
            }
10✔
313
            catch (OperationCanceledException)
×
314
            {
×
315
                cancellationToken.ThrowIfCancellationRequested();
×
316

317
                if (_disposedCts.IsCancellationRequested)
×
318
                {
×
319
                    throw new IceRpcException(
×
320
                        IceRpcError.OperationAborted,
×
321
                        "The shutdown was aborted because the connection cache was disposed.");
×
322
                }
323
                else
324
                {
×
325
                    throw new TimeoutException(
×
326
                        $"The connection cache shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
×
327
                }
328
            }
329
        }
10✔
330
    }
10✔
331

332
    private async Task CreateConnectTask(IProtocolConnection connection, ServerAddress serverAddress)
333
    {
19✔
334
        await Task.Yield(); // exit mutex lock
19✔
335

336
        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
337
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
19✔
338
        cts.CancelAfter(_connectTimeout);
19✔
339

340
        Task shutdownRequested;
341
        Task? connectTask = null;
19✔
342

343
        try
344
        {
19✔
345
            try
346
            {
19✔
347
                (_, shutdownRequested) = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
19✔
348
            }
16✔
349
            catch (OperationCanceledException)
×
350
            {
×
351
                if (_disposedCts.IsCancellationRequested)
×
352
                {
×
353
                    throw new IceRpcException(
×
354
                        IceRpcError.OperationAborted,
×
355
                        "The connection establishment was aborted because the connection cache was disposed.");
×
356
                }
357
                else
358
                {
×
359
                    throw new TimeoutException(
×
360
                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
×
361
                }
362
            }
363
        }
16✔
364
        catch
3✔
365
        {
3✔
366
            lock (_mutex)
367
            {
3✔
368
                // connectTask is executing this method and about to throw.
369
                connectTask = _pendingConnections[serverAddress].ConnectTask;
3✔
370
                _pendingConnections.Remove(serverAddress);
3✔
371
            }
3✔
372

373
            _ = DisposePendingConnectionAsync(connection, connectTask);
3✔
374
            throw;
3✔
375
        }
376

377
        lock (_mutex)
378
        {
16✔
379
            if (_shutdownTask is null)
16✔
380
            {
16✔
381
                // the connection is now "attached" in _activeConnections
382
                _activeConnections.Add(serverAddress, connection);
16✔
383
                _detachedConnectionCount--;
16✔
384
            }
16✔
385
            else
386
            {
×
387
                connectTask = _pendingConnections[serverAddress].ConnectTask;
×
388
            }
×
389
            bool removed = _pendingConnections.Remove(serverAddress);
16✔
390
            Debug.Assert(removed);
16✔
391
        }
16✔
392

393
        if (connectTask is null)
16✔
394
        {
16✔
395
            _ = ShutdownWhenRequestedAsync(connection, serverAddress, shutdownRequested);
16✔
396
        }
16✔
397
        else
398
        {
×
399
            // As soon as this method completes successfully, we shut down then dispose the connection.
400
            _ = DisposePendingConnectionAsync(connection, connectTask);
×
401
        }
×
402

403
        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
404
        {
3✔
405
            try
406
            {
3✔
407
                await connectTask.ConfigureAwait(false);
3✔
408

409
                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
410
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
×
411
                cts.CancelAfter(_shutdownTimeout);
×
412
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
×
413
            }
×
414
            catch
3✔
415
            {
3✔
416
                // Observe and ignore exceptions.
417
            }
3✔
418

419
            await connection.DisposeAsync().ConfigureAwait(false);
3✔
420

421
            lock (_mutex)
422
            {
3✔
423
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
3✔
424
                {
×
425
                    _detachedConnectionsTcs.SetResult();
×
426
                }
×
427
            }
3✔
428
        }
3✔
429

430
        async Task ShutdownWhenRequestedAsync(
431
            IProtocolConnection connection,
432
            ServerAddress serverAddress,
433
            Task shutdownRequested)
434
        {
16✔
435
            await shutdownRequested.ConfigureAwait(false);
16✔
436
            await RemoveFromActiveAsync(serverAddress, connection).ConfigureAwait(false);
12✔
437
        }
12✔
438
    }
16✔
439

440
    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
441
    /// </summary>
442
    /// <param name="serverAddressFeature">The server address feature.</param>
443
    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
444
    private async Task<IProtocolConnection> GetActiveConnectionAsync(
445
        IServerAddressFeature serverAddressFeature,
446
        CancellationToken cancellationToken)
447
    {
16✔
448
        Debug.Assert(serverAddressFeature.ServerAddress is not null);
16✔
449
        Exception? connectionException = null;
16✔
450
        (IProtocolConnection Connection, Task ConnectTask) pendingConnectionValue;
451
        var enumerator = new ServerAddressEnumerator(serverAddressFeature);
16✔
452
        while (enumerator.MoveNext())
19✔
453
        {
19✔
454
            ServerAddress serverAddress = enumerator.Current;
19✔
455
            if (enumerator.CurrentIndex > 0)
19✔
456
            {
3✔
457
                // Rotate the server addresses before each new connection attempt after the initial attempt
458
                serverAddressFeature.RotateAddresses();
3✔
459
            }
3✔
460

461
            try
462
            {
19✔
463
                lock (_mutex)
464
                {
19✔
465
                    if (_disposeTask is not null)
19✔
466
                    {
×
467
                        throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
468
                    }
469
                    else if (_shutdownTask is not null)
19✔
470
                    {
×
471
                        throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache is shut down.");
×
472
                    }
473

474
                    if (_activeConnections.TryGetValue(serverAddress, out IProtocolConnection? connection))
19✔
475
                    {
×
476
                        return connection;
×
477
                    }
478

479
                    if (!_pendingConnections.TryGetValue(serverAddress, out pendingConnectionValue))
19✔
480
                    {
19✔
481
                        connection = _connectionFactory.CreateConnection(serverAddress);
19✔
482
                        _detachedConnectionCount++;
19✔
483
                        pendingConnectionValue = (connection, CreateConnectTask(connection, serverAddress));
19✔
484
                        _pendingConnections.Add(serverAddress, pendingConnectionValue);
19✔
485
                    }
19✔
486
                }
19✔
487
                // ConnectTask itself takes care of scheduling its exception observation when it fails.
488
                await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
19✔
489
                return pendingConnectionValue.Connection;
16✔
490
            }
491
            catch (TimeoutException exception)
×
492
            {
×
493
                connectionException = exception;
×
494
            }
×
495
            catch (IceRpcException exception) when (exception.IceRpcError is
3✔
496
                IceRpcError.ConnectionAborted or
3✔
497
                IceRpcError.ConnectionRefused or
3✔
498
                IceRpcError.ServerBusy or
3✔
499
                IceRpcError.ServerUnreachable)
3✔
500
            {
3✔
501
                // keep going unless the connection cache was disposed or shut down
502
                connectionException = exception;
3✔
503
                lock (_mutex)
504
                {
3✔
505
                    if (_shutdownTask is not null)
3✔
506
                    {
×
507
                        throw;
×
508
                    }
509
                }
3✔
510
            }
3✔
511
        }
3✔
512

513
        Debug.Assert(connectionException is not null);
×
514
        ExceptionDispatchInfo.Throw(connectionException);
×
515
        Debug.Assert(false);
×
516
        throw connectionException;
×
517
    }
16✔
518

519
    /// <summary>Removes the connection from _activeConnections, and when successful, shuts down and disposes this
520
    /// connection.</summary>
521
    /// <param name="serverAddress">The server address key in _activeConnections.</param>
522
    /// <param name="connection">The connection to shutdown and dispose after removal.</param>
523
    private Task RemoveFromActiveAsync(ServerAddress serverAddress, IProtocolConnection connection)
524
    {
12✔
525
        lock (_mutex)
526
        {
12✔
527
            if (_shutdownTask is null && _activeConnections.Remove(serverAddress))
12✔
528
            {
8✔
529
                // it's now our connection.
530
                _detachedConnectionCount++;
8✔
531
            }
8✔
532
            else
533
            {
4✔
534
                // Another task owns this connection
535
                return Task.CompletedTask;
4✔
536
            }
537
        }
8✔
538

539
        return ShutdownAndDisposeConnectionAsync();
8✔
540

541
        async Task ShutdownAndDisposeConnectionAsync()
542
        {
8✔
543
            // _disposedCts is not disposed since we own a detachedConnectionCount
544
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
8✔
545
            cts.CancelAfter(_shutdownTimeout);
8✔
546

547
            try
548
            {
8✔
549
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
8✔
550
            }
8✔
551
            catch
×
552
            {
×
553
                // Ignore connection shutdown failures
554
            }
×
555

556
            await connection.DisposeAsync().ConfigureAwait(false);
8✔
557

558
            lock (_mutex)
559
            {
8✔
560
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
8✔
561
                {
3✔
562
                    _detachedConnectionsTcs.SetResult();
3✔
563
                }
3✔
564
            }
8✔
565
        }
8✔
566
    }
12✔
567

568
    /// <summary>Tries to get an existing connection matching one of the addresses of the server address feature.
569
    /// </summary>
570
    /// <param name="serverAddressFeature">The server address feature.</param>
571
    /// <param name="connection">When this method returns <see langword="true" />, this argument contains an active
572
    /// connection; otherwise, it is set to <see langword="null" />.</param>
573
    /// <returns><see langword="true" /> when an active connection matching any of the addresses of the server address
574
    /// feature is found; otherwise, <see langword="false"/>.</returns>
575
    private bool TryGetActiveConnection(
576
        IServerAddressFeature serverAddressFeature,
577
        [NotNullWhen(true)] out IProtocolConnection? connection)
578
    {
91✔
579
        lock (_mutex)
580
        {
91✔
581
            connection = null;
91✔
582
            if (_disposeTask is not null)
91✔
583
            {
×
584
                throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
585
            }
586

587
            if (_shutdownTask is not null)
91✔
588
            {
×
589
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
590
            }
591

592
            var enumerator = new ServerAddressEnumerator(serverAddressFeature);
91✔
593
            while (enumerator.MoveNext())
110✔
594
            {
96✔
595
                ServerAddress serverAddress = enumerator.Current;
96✔
596
                if (_activeConnections.TryGetValue(serverAddress, out connection))
96✔
597
                {
77✔
598
                    if (enumerator.CurrentIndex > 0)
77✔
599
                    {
1✔
600
                        // This altServerAddress becomes the main server address, and the existing main
601
                        // server address becomes the first alt server address.
602
                        serverAddressFeature.AltServerAddresses = serverAddressFeature.AltServerAddresses
1✔
603
                            .RemoveAt(enumerator.CurrentIndex - 1)
1✔
604
                            .Insert(0, serverAddressFeature.ServerAddress!.Value);
1✔
605
                        serverAddressFeature.ServerAddress = serverAddress;
1✔
606
                    }
1✔
607
                    return true;
77✔
608
                }
609
            }
19✔
610
            return false;
14✔
611
        }
612
    }
91✔
613

614
    /// <summary>A helper struct that implements an enumerator that allows iterating the addresses of an
615
    /// <see cref="IServerAddressFeature" /> without allocations.</summary>
616
    private struct ServerAddressEnumerator
617
    {
618
        internal readonly ServerAddress Current
619
        {
620
            get
621
            {
115✔
622
                Debug.Assert(CurrentIndex >= 0 && CurrentIndex <= _altServerAddresses.Count);
115✔
623
                if (CurrentIndex == 0)
115✔
624
                {
107✔
625
                    Debug.Assert(_mainServerAddress is not null);
107✔
626
                    return _mainServerAddress.Value;
107✔
627
                }
628
                else
629
                {
8✔
630
                    return _altServerAddresses[CurrentIndex - 1];
8✔
631
                }
632
            }
115✔
633
        }
634

635
        internal int Count { get; }
×
636

637
        internal int CurrentIndex { get; private set; } = -1;
938✔
638

639
        private readonly ServerAddress? _mainServerAddress;
640
        private readonly IList<ServerAddress> _altServerAddresses;
641

642
        internal bool MoveNext()
643
        {
129✔
644
            if (CurrentIndex == -1)
129✔
645
            {
107✔
646
                if (_mainServerAddress is not null)
107✔
647
                {
107✔
648
                    CurrentIndex++;
107✔
649
                    return true;
107✔
650
                }
651
                else
652
                {
×
653
                    return false;
×
654
                }
655
            }
656
            else if (CurrentIndex < _altServerAddresses.Count)
22✔
657
            {
8✔
658
                CurrentIndex++;
8✔
659
                return true;
8✔
660
            }
661
            return false;
14✔
662
        }
129✔
663

664
        internal ServerAddressEnumerator(IServerAddressFeature serverAddressFeature)
665
        {
107✔
666
            _mainServerAddress = serverAddressFeature.ServerAddress;
107✔
667
            _altServerAddresses = serverAddressFeature.AltServerAddresses;
107✔
668
            if (_mainServerAddress is null)
107✔
669
            {
×
670
                Count = 0;
×
671
            }
×
672
            else
673
            {
107✔
674
                Count = _altServerAddresses.Count + 1;
107✔
675
            }
107✔
676
        }
107✔
677
    }
678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc