• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 19579725753

21 Nov 2025 06:22PM UTC coverage: 83.577% (-0.1%) from 83.694%
19579725753

push

github

web-flow
Fix ShutdownRequested_completes_when_inactive_and_inactive_timeout_de… (#4142)

12086 of 14461 relevant lines covered (83.58%)

2964.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.65
src/IceRpc/ConnectionCache.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Features;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Diagnostics;
8
using System.Diagnostics.CodeAnalysis;
9
using System.Runtime.ExceptionServices;
10

11
namespace IceRpc;
12

13
/// <summary>Represents an invoker that routes outgoing requests to connections it manages.</summary>
14
/// <remarks><para>The connection cache routes requests based on the request's <see cref="IServerAddressFeature" />
15
/// feature or the server addresses of the request's target service.</para>
16
/// <para>The connection cache keeps at most one active connection per server address.</para></remarks>
17
public sealed class ConnectionCache : IInvoker, IAsyncDisposable
18
{
19
    // Connected connections.
20
    private readonly Dictionary<ServerAddress, IProtocolConnection> _activeConnections =
11✔
21
        new(ServerAddressComparer.OptionalTransport);
11✔
22

23
    private readonly IClientProtocolConnectionFactory _connectionFactory;
24

25
    private readonly TimeSpan _connectTimeout;
26

27
    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
28
    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
29
    // connection is "detached" because it's not in _activeConnections.
30
    private int _detachedConnectionCount;
31

32
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
11✔
33

34
    // A cancellation token source that is canceled when DisposeAsync is called.
35
    private readonly CancellationTokenSource _disposedCts = new();
11✔
36

37
    private Task? _disposeTask;
38

39
    private readonly object _mutex = new();
11✔
40

41
    // New connections in the process of connecting.
42
    private readonly Dictionary<ServerAddress, (IProtocolConnection Connection, Task ConnectTask)> _pendingConnections =
11✔
43
        new(ServerAddressComparer.OptionalTransport);
11✔
44

45
    private readonly bool _preferExistingConnection;
46

47
    private Task? _shutdownTask;
48

49
    private readonly TimeSpan _shutdownTimeout;
50

51
    /// <summary>Constructs a connection cache.</summary>
52
    /// <param name="options">The connection cache options.</param>
53
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
54
    /// cref="IDuplexClientTransport.Default" />.</param>
55
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
56
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
57
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
58
    /// />.</param>
59
    public ConnectionCache(
11✔
60
        ConnectionCacheOptions options,
11✔
61
        IDuplexClientTransport? duplexClientTransport = null,
11✔
62
        IMultiplexedClientTransport? multiplexedClientTransport = null,
11✔
63
        ILogger? logger = null)
11✔
64
    {
11✔
65
        _connectionFactory = new ClientProtocolConnectionFactory(
11✔
66
            options.ConnectionOptions,
11✔
67
            options.ClientAuthenticationOptions,
11✔
68
            duplexClientTransport,
11✔
69
            multiplexedClientTransport,
11✔
70
            logger);
11✔
71

72
        _connectTimeout = options.ConnectTimeout;
11✔
73
        _shutdownTimeout = options.ShutdownTimeout;
11✔
74

75
        _preferExistingConnection = options.PreferExistingConnection;
11✔
76
    }
11✔
77

78
    /// <summary>Constructs a connection cache using the default options.</summary>
79
    public ConnectionCache()
80
        : this(new ConnectionCacheOptions())
×
81
    {
×
82
    }
×
83

84
    /// <summary>Releases all resources allocated by the cache. The cache disposes all the connections it
85
    /// created.</summary>
86
    /// <returns>A value task that completes when the disposal of all connections created by this cache has completed.
87
    /// This includes connections that were active when this method is called and connections whose disposal was
88
    /// initiated prior to this call.</returns>
89
    /// <remarks>The disposal of an underlying connection of the cache  aborts invocations, cancels dispatches and
90
    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
91
    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
92
    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
93
    public ValueTask DisposeAsync()
94
    {
12✔
95
        lock (_mutex)
12✔
96
        {
12✔
97
            if (_disposeTask is null)
12✔
98
            {
11✔
99
                _shutdownTask ??= Task.CompletedTask;
11✔
100
                if (_detachedConnectionCount == 0)
11✔
101
                {
10✔
102
                    _ = _detachedConnectionsTcs.TrySetResult();
10✔
103
                }
10✔
104

105
                _disposeTask = PerformDisposeAsync();
11✔
106
            }
11✔
107
            return new(_disposeTask);
12✔
108
        }
109

110
        async Task PerformDisposeAsync()
111
        {
11✔
112
            await Task.Yield(); // exit mutex lock
11✔
113

114
            _disposedCts.Cancel();
11✔
115

116
            // Wait for shutdown before disposing connections.
117
            try
118
            {
11✔
119
                await _shutdownTask.ConfigureAwait(false);
11✔
120
            }
11✔
121
            catch
×
122
            {
×
123
                // Ignore exceptions.
124
            }
×
125

126
            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
127
            await Task.WhenAll(
11✔
128
                _activeConnections.Values.Select(connection => connection.DisposeAsync().AsTask())
8✔
129
                    .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
11✔
130

131
            _disposedCts.Dispose();
11✔
132
        }
11✔
133
    }
12✔
134

135
    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
136
    /// <param name="request">The outgoing request being sent.</param>
137
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
138
    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
139
    /// <exception cref="InvalidOperationException">Thrown if no <see cref="IServerAddressFeature" /> feature is set and
140
    /// the request's service address has no server addresses.</exception>
141
    /// <exception cref="IceRpcException">Thrown with one of the following error:
142
    /// <list type="bullet"><item><term><see cref="IceRpcError.InvocationRefused" /></term><description>This error
143
    /// indicates that the connection cache is shutdown.</description></item>
144
    /// <item><term><see cref="IceRpcError.NoConnection" /></term><description>This error indicates that the request
145
    /// <see cref="IServerAddressFeature" /> feature has no server addresses.</description></item></list>.
146
    /// </exception>
147
    /// <exception cref="ObjectDisposedException">Thrown if this connection cache is disposed.</exception>
148
    /// <remarks><para>If the request <see cref="IServerAddressFeature" /> feature is not set, the cache sets it from
149
    /// the server addresses of the target service.</para>
150
    /// <para>It then looks for an active connection. The <see cref="ConnectionCacheOptions.PreferExistingConnection" />
151
    /// property influences how the cache selects this active connection. If no active connection can be found, the
152
    /// cache creates a new connection to one of the server addresses from the <see cref="IServerAddressFeature" />
153
    /// feature.</para>
154
    /// <para>If the connection establishment to <see cref="IServerAddressFeature.ServerAddress" /> fails, <see
155
    /// cref="IServerAddressFeature.ServerAddress" /> is appended at the end of <see
156
    /// cref="IServerAddressFeature.AltServerAddresses" /> and the first address from <see
157
    /// cref="IServerAddressFeature.AltServerAddresses" /> replaces <see cref="IServerAddressFeature.ServerAddress" />.
158
    /// The cache tries again to find or establish a connection to <see cref="IServerAddressFeature.ServerAddress" />.
159
    /// If unsuccessful, the cache repeats this process until success or until it tried all the addresses. If all the
160
    /// attempts fail, this method throws the exception from the last attempt.</para></remarks>
161
    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken)
162
    {
93✔
163
        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
93✔
164
        {
×
165
            if (serverAddressFeature.ServerAddress is null)
×
166
            {
×
167
                throw new IceRpcException(
×
168
                    IceRpcError.NoConnection,
×
169
                    $"Could not invoke '{request.Operation}' on '{request.ServiceAddress}': tried all server addresses without success.");
×
170
            }
171
        }
×
172
        else
173
        {
93✔
174
            if (request.ServiceAddress.ServerAddress is null)
93✔
175
            {
×
176
                throw new InvalidOperationException("Cannot send a request to a service without a server address.");
×
177
            }
178

179
            serverAddressFeature = new ServerAddressFeature(request.ServiceAddress);
93✔
180
            request.Features = request.Features.With(serverAddressFeature);
93✔
181
        }
93✔
182

183
        lock (_mutex)
93✔
184
        {
93✔
185
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
93✔
186

187
            if (_shutdownTask is not null)
93✔
188
            {
×
189
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
190
            }
191
        }
93✔
192

193
        return PerformInvokeAsync();
93✔
194

195
        async Task<IncomingResponse> PerformInvokeAsync()
196
        {
93✔
197
            Debug.Assert(serverAddressFeature.ServerAddress is not null);
93✔
198

199
            // When InvokeAsync (or ConnectAsync) throws an IceRpcException(InvocationRefused) we retry unless the
200
            // cache is being shutdown.
201
            while (true)
93✔
202
            {
93✔
203
                IProtocolConnection? connection = null;
93✔
204
                if (_preferExistingConnection)
93✔
205
                {
91✔
206
                    _ = TryGetActiveConnection(serverAddressFeature, out connection);
91✔
207
                }
91✔
208
                connection ??= await GetActiveConnectionAsync(serverAddressFeature, cancellationToken)
93✔
209
                    .ConfigureAwait(false);
93✔
210

211
                try
212
                {
93✔
213
                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
93✔
214
                }
215
                catch (ObjectDisposedException)
×
216
                {
×
217
                    // This can occasionally happen if we find a connection that was just closed and then automatically
218
                    // disposed by this connection cache.
219
                }
×
220
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
×
221
                {
×
222
                    // The connection is refusing new invocations.
223
                }
×
224
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
×
225
                {
×
226
                    lock (_mutex)
×
227
                    {
×
228
                        if (_disposeTask is null)
×
229
                        {
×
230
                            throw new IceRpcException(
×
231
                                IceRpcError.ConnectionAborted,
×
232
                                "The underlying connection was disposed while the invocation was in progress.");
×
233
                        }
234
                        else
235
                        {
×
236
                            throw;
×
237
                        }
238
                    }
239
                }
240

241
                // Make sure connection is no longer in _activeConnection before we retry.
242
                _ = RemoveFromActiveAsync(serverAddressFeature.ServerAddress.Value, connection);
×
243
            }
×
244
        }
93✔
245
    }
93✔
246

247
    /// <summary>Gracefully shuts down all connections created by this cache.</summary>
248
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
249
    /// <returns>A task that completes successfully once the shutdown of all connections created by this cache has
250
    /// completed. This includes connections that were active when this method is called and connections whose shutdown
251
    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
252
    /// <list type="bullet">
253
    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
254
    /// connection cache is disposed while being shut down.</description></item>
255
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
256
    /// cancellation token.</description></item>
257
    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
258
    /// </list>
259
    /// </returns>
260
    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
261
    /// <exception cref="ObjectDisposedException">Thrown if the connection cache is disposed.</exception>
262
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
263
    {
10✔
264
        lock (_mutex)
10✔
265
        {
10✔
266
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
10✔
267

268
            if (_shutdownTask is not null)
10✔
269
            {
×
270
                throw new InvalidOperationException("The connection cache is already shut down or shutting down.");
×
271
            }
272

273
            if (_detachedConnectionCount == 0)
10✔
274
            {
7✔
275
                _detachedConnectionsTcs.SetResult();
7✔
276
            }
7✔
277

278
            _shutdownTask = PerformShutdownAsync();
10✔
279
        }
10✔
280

281
        return _shutdownTask;
10✔
282

283
        async Task PerformShutdownAsync()
284
        {
10✔
285
            await Task.Yield(); // exit mutex lock
10✔
286

287
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
10✔
288
            cts.CancelAfter(_shutdownTimeout);
10✔
289

290
            try
291
            {
10✔
292
                // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not
293
                // directly by this method.
294
                try
295
                {
10✔
296
                    await Task.WhenAll(
10✔
297
                        _activeConnections.Values.Select(connection => connection.ShutdownAsync(cts.Token))
8✔
298
                            .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token))).ConfigureAwait(false);
10✔
299
                }
10✔
300
                catch (OperationCanceledException)
×
301
                {
×
302
                    throw;
×
303
                }
304
                catch
×
305
                {
×
306
                    // Ignore other connection shutdown failures.
307

308
                    // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
309
                    cts.Token.ThrowIfCancellationRequested();
×
310
                }
×
311
            }
10✔
312
            catch (OperationCanceledException)
×
313
            {
×
314
                cancellationToken.ThrowIfCancellationRequested();
×
315

316
                if (_disposedCts.IsCancellationRequested)
×
317
                {
×
318
                    throw new IceRpcException(
×
319
                        IceRpcError.OperationAborted,
×
320
                        "The shutdown was aborted because the connection cache was disposed.");
×
321
                }
322
                else
323
                {
×
324
                    throw new TimeoutException(
×
325
                        $"The connection cache shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
×
326
                }
327
            }
328
        }
10✔
329
    }
10✔
330

331
    private async Task CreateConnectTask(IProtocolConnection connection, ServerAddress serverAddress)
332
    {
19✔
333
        await Task.Yield(); // exit mutex lock
19✔
334

335
        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
336
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
19✔
337
        cts.CancelAfter(_connectTimeout);
19✔
338

339
        Task shutdownRequested;
340
        Task? connectTask = null;
19✔
341

342
        try
343
        {
19✔
344
            try
345
            {
19✔
346
                (_, shutdownRequested) = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
19✔
347
            }
16✔
348
            catch (OperationCanceledException)
×
349
            {
×
350
                if (_disposedCts.IsCancellationRequested)
×
351
                {
×
352
                    throw new IceRpcException(
×
353
                        IceRpcError.OperationAborted,
×
354
                        "The connection establishment was aborted because the connection cache was disposed.");
×
355
                }
356
                else
357
                {
×
358
                    throw new TimeoutException(
×
359
                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
×
360
                }
361
            }
362
        }
16✔
363
        catch
3✔
364
        {
3✔
365
            lock (_mutex)
3✔
366
            {
3✔
367
                // connectTask is executing this method and about to throw.
368
                connectTask = _pendingConnections[serverAddress].ConnectTask;
3✔
369
                _pendingConnections.Remove(serverAddress);
3✔
370
            }
3✔
371

372
            _ = DisposePendingConnectionAsync(connection, connectTask);
3✔
373
            throw;
3✔
374
        }
375

376
        lock (_mutex)
16✔
377
        {
16✔
378
            if (_shutdownTask is null)
16✔
379
            {
16✔
380
                // the connection is now "attached" in _activeConnections
381
                _activeConnections.Add(serverAddress, connection);
16✔
382
                _detachedConnectionCount--;
16✔
383
            }
16✔
384
            else
385
            {
×
386
                connectTask = _pendingConnections[serverAddress].ConnectTask;
×
387
            }
×
388
            bool removed = _pendingConnections.Remove(serverAddress);
16✔
389
            Debug.Assert(removed);
16✔
390
        }
16✔
391

392
        if (connectTask is null)
16✔
393
        {
16✔
394
            _ = ShutdownWhenRequestedAsync(connection, serverAddress, shutdownRequested);
16✔
395
        }
16✔
396
        else
397
        {
×
398
            // As soon as this method completes successfully, we shut down then dispose the connection.
399
            _ = DisposePendingConnectionAsync(connection, connectTask);
×
400
        }
×
401

402
        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
403
        {
3✔
404
            try
405
            {
3✔
406
                await connectTask.ConfigureAwait(false);
3✔
407

408
                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
409
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
×
410
                cts.CancelAfter(_shutdownTimeout);
×
411
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
×
412
            }
×
413
            catch
3✔
414
            {
3✔
415
                // Observe and ignore exceptions.
416
            }
3✔
417

418
            await connection.DisposeAsync().ConfigureAwait(false);
3✔
419

420
            lock (_mutex)
3✔
421
            {
3✔
422
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
3✔
423
                {
×
424
                    _detachedConnectionsTcs.SetResult();
×
425
                }
×
426
            }
3✔
427
        }
3✔
428

429
        async Task ShutdownWhenRequestedAsync(
430
            IProtocolConnection connection,
431
            ServerAddress serverAddress,
432
            Task shutdownRequested)
433
        {
16✔
434
            await shutdownRequested.ConfigureAwait(false);
16✔
435
            await RemoveFromActiveAsync(serverAddress, connection).ConfigureAwait(false);
12✔
436
        }
12✔
437
    }
16✔
438

439
    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
440
    /// </summary>
441
    /// <param name="serverAddressFeature">The server address feature.</param>
442
    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
443
    private async Task<IProtocolConnection> GetActiveConnectionAsync(
444
        IServerAddressFeature serverAddressFeature,
445
        CancellationToken cancellationToken)
446
    {
16✔
447
        Debug.Assert(serverAddressFeature.ServerAddress is not null);
16✔
448
        Exception? connectionException = null;
16✔
449
        (IProtocolConnection Connection, Task ConnectTask) pendingConnectionValue;
450
        var enumerator = new ServerAddressEnumerator(serverAddressFeature);
16✔
451
        while (enumerator.MoveNext())
19✔
452
        {
19✔
453
            ServerAddress serverAddress = enumerator.Current;
19✔
454
            if (enumerator.CurrentIndex > 0)
19✔
455
            {
3✔
456
                // Rotate the server addresses before each new connection attempt after the initial attempt
457
                serverAddressFeature.RotateAddresses();
3✔
458
            }
3✔
459

460
            try
461
            {
19✔
462
                lock (_mutex)
19✔
463
                {
19✔
464
                    if (_disposeTask is not null)
19✔
465
                    {
×
466
                        throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
467
                    }
468
                    else if (_shutdownTask is not null)
19✔
469
                    {
×
470
                        throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache is shut down.");
×
471
                    }
472

473
                    if (_activeConnections.TryGetValue(serverAddress, out IProtocolConnection? connection))
19✔
474
                    {
×
475
                        return connection;
×
476
                    }
477

478
                    if (!_pendingConnections.TryGetValue(serverAddress, out pendingConnectionValue))
19✔
479
                    {
19✔
480
                        connection = _connectionFactory.CreateConnection(serverAddress);
19✔
481
                        _detachedConnectionCount++;
19✔
482
                        pendingConnectionValue = (connection, CreateConnectTask(connection, serverAddress));
19✔
483
                        _pendingConnections.Add(serverAddress, pendingConnectionValue);
19✔
484
                    }
19✔
485
                }
19✔
486
                // ConnectTask itself takes care of scheduling its exception observation when it fails.
487
                await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
19✔
488
                return pendingConnectionValue.Connection;
16✔
489
            }
490
            catch (TimeoutException exception)
×
491
            {
×
492
                connectionException = exception;
×
493
            }
×
494
            catch (IceRpcException exception) when (exception.IceRpcError is
3✔
495
                IceRpcError.ConnectionAborted or
3✔
496
                IceRpcError.ConnectionRefused or
3✔
497
                IceRpcError.ServerBusy or
3✔
498
                IceRpcError.ServerUnreachable)
3✔
499
            {
3✔
500
                // keep going unless the connection cache was disposed or shut down
501
                connectionException = exception;
3✔
502
                lock (_mutex)
3✔
503
                {
3✔
504
                    if (_shutdownTask is not null)
3✔
505
                    {
×
506
                        throw;
×
507
                    }
508
                }
3✔
509
            }
3✔
510
        }
3✔
511

512
        Debug.Assert(connectionException is not null);
×
513
        ExceptionDispatchInfo.Throw(connectionException);
×
514
        Debug.Assert(false);
×
515
        throw connectionException;
×
516
    }
16✔
517

518
    /// <summary>Removes the connection from _activeConnections, and when successful, shuts down and disposes this
519
    /// connection.</summary>
520
    /// <param name="serverAddress">The server address key in _activeConnections.</param>
521
    /// <param name="connection">The connection to shutdown and dispose after removal.</param>
522
    private Task RemoveFromActiveAsync(ServerAddress serverAddress, IProtocolConnection connection)
523
    {
12✔
524
        lock (_mutex)
12✔
525
        {
12✔
526
            if (_shutdownTask is null && _activeConnections.Remove(serverAddress))
12✔
527
            {
8✔
528
                // it's now our connection.
529
                _detachedConnectionCount++;
8✔
530
            }
8✔
531
            else
532
            {
4✔
533
                // Another task owns this connection
534
                return Task.CompletedTask;
4✔
535
            }
536
        }
8✔
537

538
        return ShutdownAndDisposeConnectionAsync();
8✔
539

540
        async Task ShutdownAndDisposeConnectionAsync()
541
        {
8✔
542
            // _disposedCts is not disposed since we own a detachedConnectionCount
543
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
8✔
544
            cts.CancelAfter(_shutdownTimeout);
8✔
545

546
            try
547
            {
8✔
548
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
8✔
549
            }
8✔
550
            catch
×
551
            {
×
552
                // Ignore connection shutdown failures
553
            }
×
554

555
            await connection.DisposeAsync().ConfigureAwait(false);
8✔
556

557
            lock (_mutex)
8✔
558
            {
8✔
559
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
8✔
560
                {
4✔
561
                    _detachedConnectionsTcs.SetResult();
4✔
562
                }
4✔
563
            }
8✔
564
        }
8✔
565
    }
12✔
566

567
    /// <summary>Tries to get an existing connection matching one of the addresses of the server address feature.
568
    /// </summary>
569
    /// <param name="serverAddressFeature">The server address feature.</param>
570
    /// <param name="connection">When this method returns <see langword="true" />, this argument contains an active
571
    /// connection; otherwise, it is set to <see langword="null" />.</param>
572
    /// <returns><see langword="true" /> when an active connection matching any of the addresses of the server address
573
    /// feature is found; otherwise, <see langword="false"/>.</returns>
574
    private bool TryGetActiveConnection(
575
        IServerAddressFeature serverAddressFeature,
576
        [NotNullWhen(true)] out IProtocolConnection? connection)
577
    {
91✔
578
        lock (_mutex)
91✔
579
        {
91✔
580
            connection = null;
91✔
581
            if (_disposeTask is not null)
91✔
582
            {
×
583
                throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
584
            }
585

586
            if (_shutdownTask is not null)
91✔
587
            {
×
588
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
589
            }
590

591
            var enumerator = new ServerAddressEnumerator(serverAddressFeature);
91✔
592
            while (enumerator.MoveNext())
110✔
593
            {
96✔
594
                ServerAddress serverAddress = enumerator.Current;
96✔
595
                if (_activeConnections.TryGetValue(serverAddress, out connection))
96✔
596
                {
77✔
597
                    if (enumerator.CurrentIndex > 0)
77✔
598
                    {
1✔
599
                        // This altServerAddress becomes the main server address, and the existing main
600
                        // server address becomes the first alt server address.
601
                        serverAddressFeature.AltServerAddresses = serverAddressFeature.AltServerAddresses
1✔
602
                            .RemoveAt(enumerator.CurrentIndex - 1)
1✔
603
                            .Insert(0, serverAddressFeature.ServerAddress!.Value);
1✔
604
                        serverAddressFeature.ServerAddress = serverAddress;
1✔
605
                    }
1✔
606
                    return true;
77✔
607
                }
608
            }
19✔
609
            return false;
14✔
610
        }
611
    }
91✔
612

613
    /// <summary>A helper struct that implements an enumerator that allows iterating the addresses of an
614
    /// <see cref="IServerAddressFeature" /> without allocations.</summary>
615
    private struct ServerAddressEnumerator
616
    {
617
        internal readonly ServerAddress Current
618
        {
619
            get
620
            {
115✔
621
                Debug.Assert(CurrentIndex >= 0 && CurrentIndex <= _altServerAddresses.Count);
115✔
622
                if (CurrentIndex == 0)
115✔
623
                {
107✔
624
                    Debug.Assert(_mainServerAddress is not null);
107✔
625
                    return _mainServerAddress.Value;
107✔
626
                }
627
                else
628
                {
8✔
629
                    return _altServerAddresses[CurrentIndex - 1];
8✔
630
                }
631
            }
115✔
632
        }
633

634
        internal int Count { get; }
×
635

636
        internal int CurrentIndex { get; private set; } = -1;
938✔
637

638
        private readonly ServerAddress? _mainServerAddress;
639
        private readonly IList<ServerAddress> _altServerAddresses;
640

641
        internal bool MoveNext()
642
        {
129✔
643
            if (CurrentIndex == -1)
129✔
644
            {
107✔
645
                if (_mainServerAddress is not null)
107✔
646
                {
107✔
647
                    CurrentIndex++;
107✔
648
                    return true;
107✔
649
                }
650
                else
651
                {
×
652
                    return false;
×
653
                }
654
            }
655
            else if (CurrentIndex < _altServerAddresses.Count)
22✔
656
            {
8✔
657
                CurrentIndex++;
8✔
658
                return true;
8✔
659
            }
660
            return false;
14✔
661
        }
129✔
662

663
        internal ServerAddressEnumerator(IServerAddressFeature serverAddressFeature)
664
        {
107✔
665
            _mainServerAddress = serverAddressFeature.ServerAddress;
107✔
666
            _altServerAddresses = serverAddressFeature.AltServerAddresses;
107✔
667
            if (_mainServerAddress is null)
107✔
668
            {
×
669
                Count = 0;
×
670
            }
×
671
            else
672
            {
107✔
673
                Count = _altServerAddresses.Count + 1;
107✔
674
            }
107✔
675
        }
107✔
676
    }
677
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc