• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 20235282556

15 Dec 2025 02:13PM UTC coverage: 83.538% (-0.05%) from 83.587%
20235282556

push

github

web-flow
Remove Json serialize helper from JSON demo (#4189)

12037 of 14409 relevant lines covered (83.54%)

3026.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.08
src/IceRpc/ConnectionCache.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Features;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Diagnostics;
8
using System.Diagnostics.CodeAnalysis;
9
using System.Runtime.ExceptionServices;
10

11
namespace IceRpc;
12

13
/// <summary>Represents an invoker that routes outgoing requests to connections it manages.</summary>
14
/// <remarks><para>The connection cache routes requests based on the request's <see cref="IServerAddressFeature" />
15
/// feature or the server addresses of the request's target service.</para>
16
/// <para>The connection cache keeps at most one active connection per server address.</para></remarks>
17
public sealed class ConnectionCache : IInvoker, IAsyncDisposable
18
{
19
    // Connected connections.
20
    private readonly Dictionary<ServerAddress, IProtocolConnection> _activeConnections =
12✔
21
        new(ServerAddressComparer.OptionalTransport);
12✔
22

23
    private readonly IClientProtocolConnectionFactory _connectionFactory;
24

25
    private readonly TimeSpan _connectTimeout;
26

27
    // A detached connection is a protocol connection that is connecting, shutting down or being disposed. Both
28
    // ShutdownAsync and DisposeAsync wait for detached connections to reach 0 using _detachedConnectionsTcs. Such a
29
    // connection is "detached" because it's not in _activeConnections.
30
    private int _detachedConnectionCount;
31

32
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
12✔
33

34
    // A cancellation token source that is canceled when DisposeAsync is called.
35
    private readonly CancellationTokenSource _disposedCts = new();
12✔
36

37
    private Task? _disposeTask;
38

39
    private readonly Lock _mutex = new();
12✔
40

41
    // New connections in the process of connecting.
42
    private readonly Dictionary<ServerAddress, (IProtocolConnection Connection, Task ConnectTask)> _pendingConnections =
12✔
43
        new(ServerAddressComparer.OptionalTransport);
12✔
44

45
    private readonly bool _preferExistingConnection;
46

47
    private Task? _shutdownTask;
48

49
    private readonly TimeSpan _shutdownTimeout;
50

51
    /// <summary>Constructs a connection cache.</summary>
52
    /// <param name="options">The connection cache options.</param>
53
    /// <param name="duplexClientTransport">The duplex client transport. <see langword="null" /> is equivalent to <see
54
    /// cref="IDuplexClientTransport.Default" />.</param>
55
    /// <param name="multiplexedClientTransport">The multiplexed client transport. <see langword="null" /> is equivalent
56
    /// to <see cref="IMultiplexedClientTransport.Default" />.</param>
57
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
58
    /// />.</param>
59
    public ConnectionCache(
12✔
60
        ConnectionCacheOptions options,
12✔
61
        IDuplexClientTransport? duplexClientTransport = null,
12✔
62
        IMultiplexedClientTransport? multiplexedClientTransport = null,
12✔
63
        ILogger? logger = null)
12✔
64
    {
12✔
65
        _connectionFactory = new ClientProtocolConnectionFactory(
12✔
66
            options.ConnectionOptions,
12✔
67
            options.ConnectTimeout,
12✔
68
            options.ClientAuthenticationOptions,
12✔
69
            duplexClientTransport,
12✔
70
            multiplexedClientTransport,
12✔
71
            logger);
12✔
72

73
        _connectTimeout = options.ConnectTimeout;
12✔
74
        _shutdownTimeout = options.ShutdownTimeout;
12✔
75

76
        _preferExistingConnection = options.PreferExistingConnection;
12✔
77
    }
12✔
78

79
    /// <summary>Constructs a connection cache using the default options.</summary>
80
    public ConnectionCache()
81
        : this(new ConnectionCacheOptions())
×
82
    {
×
83
    }
×
84

85
    /// <summary>Releases all resources allocated by the cache. The cache disposes all the connections it
86
    /// created.</summary>
87
    /// <returns>A value task that completes when the disposal of all connections created by this cache has completed.
88
    /// This includes connections that were active when this method is called and connections whose disposal was
89
    /// initiated prior to this call.</returns>
90
    /// <remarks>The disposal of an underlying connection of the cache  aborts invocations, cancels dispatches and
91
    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
92
    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
93
    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
94
    public ValueTask DisposeAsync()
95
    {
13✔
96
        lock (_mutex)
97
        {
13✔
98
            if (_disposeTask is null)
13✔
99
            {
12✔
100
                _shutdownTask ??= Task.CompletedTask;
12✔
101
                if (_detachedConnectionCount == 0)
12✔
102
                {
11✔
103
                    _ = _detachedConnectionsTcs.TrySetResult();
11✔
104
                }
11✔
105

106
                _disposeTask = PerformDisposeAsync();
12✔
107
            }
12✔
108
            return new(_disposeTask);
13✔
109
        }
110

111
        async Task PerformDisposeAsync()
112
        {
12✔
113
            await Task.Yield(); // exit mutex lock
12✔
114

115
            _disposedCts.Cancel();
12✔
116

117
            // Wait for shutdown before disposing connections.
118
            try
119
            {
12✔
120
                await _shutdownTask.ConfigureAwait(false);
12✔
121
            }
12✔
122
            catch
×
123
            {
×
124
                // Ignore exceptions.
125
            }
×
126

127
            // Since a pending connection is "detached", it's disposed via the connectTask, not directly by this method.
128
            await Task.WhenAll(
12✔
129
                _activeConnections.Values.Select(connection => connection.DisposeAsync().AsTask())
8✔
130
                    .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
12✔
131

132
            _disposedCts.Dispose();
12✔
133
        }
12✔
134
    }
13✔
135

136
    /// <summary>Sends an outgoing request and returns the corresponding incoming response.</summary>
137
    /// <param name="request">The outgoing request being sent.</param>
138
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
139
    /// <returns>The corresponding <see cref="IncomingResponse" />.</returns>
140
    /// <exception cref="InvalidOperationException">Thrown if no <see cref="IServerAddressFeature" /> feature is set and
141
    /// the request's service address has no server addresses.</exception>
142
    /// <exception cref="IceRpcException">Thrown with one of the following error:
143
    /// <list type="bullet"><item><term><see cref="IceRpcError.InvocationRefused" /></term><description>This error
144
    /// indicates that the connection cache is shutdown.</description></item>
145
    /// <item><term><see cref="IceRpcError.NoConnection" /></term><description>This error indicates that the request
146
    /// <see cref="IServerAddressFeature" /> feature has no server addresses.</description></item></list>.
147
    /// </exception>
148
    /// <exception cref="ObjectDisposedException">Thrown if this connection cache is disposed.</exception>
149
    /// <remarks><para>If the request <see cref="IServerAddressFeature" /> feature is not set, the cache sets it from
150
    /// the server addresses of the target service.</para>
151
    /// <para>It then looks for an active connection. The <see cref="ConnectionCacheOptions.PreferExistingConnection" />
152
    /// property influences how the cache selects this active connection. If no active connection can be found, the
153
    /// cache creates a new connection to one of the server addresses from the <see cref="IServerAddressFeature" />
154
    /// feature.</para>
155
    /// <para>If the connection establishment to <see cref="IServerAddressFeature.ServerAddress" /> fails, <see
156
    /// cref="IServerAddressFeature.ServerAddress" /> is appended at the end of <see
157
    /// cref="IServerAddressFeature.AltServerAddresses" /> and the first address from <see
158
    /// cref="IServerAddressFeature.AltServerAddresses" /> replaces <see cref="IServerAddressFeature.ServerAddress" />.
159
    /// The cache tries again to find or establish a connection to <see cref="IServerAddressFeature.ServerAddress" />.
160
    /// If unsuccessful, the cache repeats this process until success or until it tried all the addresses. If all the
161
    /// attempts fail, this method throws the exception from the last attempt.</para></remarks>
162
    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken)
163
    {
94✔
164
        if (request.Features.Get<IServerAddressFeature>() is IServerAddressFeature serverAddressFeature)
94✔
165
        {
×
166
            if (serverAddressFeature.ServerAddress is null)
×
167
            {
×
168
                throw new IceRpcException(
×
169
                    IceRpcError.NoConnection,
×
170
                    $"Could not invoke '{request.Operation}' on '{request.ServiceAddress}': tried all server addresses without success.");
×
171
            }
172
        }
×
173
        else
174
        {
94✔
175
            if (request.ServiceAddress.ServerAddress is null)
94✔
176
            {
×
177
                throw new InvalidOperationException("Cannot send a request to a service without a server address.");
×
178
            }
179

180
            serverAddressFeature = new ServerAddressFeature(request.ServiceAddress);
94✔
181
            request.Features = request.Features.With(serverAddressFeature);
94✔
182
        }
94✔
183

184
        lock (_mutex)
185
        {
94✔
186
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
94✔
187

188
            if (_shutdownTask is not null)
94✔
189
            {
×
190
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
191
            }
192
        }
94✔
193

194
        return PerformInvokeAsync();
94✔
195

196
        async Task<IncomingResponse> PerformInvokeAsync()
197
        {
94✔
198
            Debug.Assert(serverAddressFeature.ServerAddress is not null);
94✔
199

200
            // When InvokeAsync (or ConnectAsync) throws an IceRpcException(InvocationRefused) we retry unless the
201
            // cache is being shutdown.
202
            while (true)
94✔
203
            {
94✔
204
                IProtocolConnection? connection = null;
94✔
205
                if (_preferExistingConnection)
94✔
206
                {
92✔
207
                    _ = TryGetActiveConnection(serverAddressFeature, out connection);
92✔
208
                }
92✔
209
                connection ??= await GetActiveConnectionAsync(serverAddressFeature, cancellationToken)
94✔
210
                    .ConfigureAwait(false);
94✔
211

212
                try
213
                {
94✔
214
                    return await connection.InvokeAsync(request, cancellationToken).ConfigureAwait(false);
94✔
215
                }
216
                catch (ObjectDisposedException)
×
217
                {
×
218
                    // This can occasionally happen if we find a connection that was just closed and then automatically
219
                    // disposed by this connection cache.
220
                }
×
221
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.InvocationRefused)
×
222
                {
×
223
                    // The connection is refusing new invocations.
224
                }
×
225
                catch (IceRpcException exception) when (exception.IceRpcError == IceRpcError.OperationAborted)
×
226
                {
×
227
                    lock (_mutex)
228
                    {
×
229
                        if (_disposeTask is null)
×
230
                        {
×
231
                            throw new IceRpcException(
×
232
                                IceRpcError.ConnectionAborted,
×
233
                                "The underlying connection was disposed while the invocation was in progress.");
×
234
                        }
235
                        else
236
                        {
×
237
                            throw;
×
238
                        }
239
                    }
240
                }
241

242
                // Make sure connection is no longer in _activeConnection before we retry.
243
                _ = RemoveFromActiveAsync(serverAddressFeature.ServerAddress.Value, connection);
×
244
            }
×
245
        }
94✔
246
    }
94✔
247

248
    /// <summary>Gracefully shuts down all connections created by this cache.</summary>
249
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
250
    /// <returns>A task that completes successfully once the shutdown of all connections created by this cache has
251
    /// completed. This includes connections that were active when this method is called and connections whose shutdown
252
    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
253
    /// <list type="bullet">
254
    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
255
    /// connection cache is disposed while being shut down.</description></item>
256
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
257
    /// cancellation token.</description></item>
258
    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
259
    /// </list>
260
    /// </returns>
261
    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
262
    /// <exception cref="ObjectDisposedException">Thrown if the connection cache is disposed.</exception>
263
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
264
    {
11✔
265
        lock (_mutex)
266
        {
11✔
267
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
11✔
268

269
            if (_shutdownTask is not null)
11✔
270
            {
×
271
                throw new InvalidOperationException("The connection cache is already shut down or shutting down.");
×
272
            }
273

274
            if (_detachedConnectionCount == 0)
11✔
275
            {
7✔
276
                _detachedConnectionsTcs.SetResult();
7✔
277
            }
7✔
278

279
            _shutdownTask = PerformShutdownAsync();
11✔
280
        }
11✔
281

282
        return _shutdownTask;
11✔
283

284
        async Task PerformShutdownAsync()
285
        {
11✔
286
            await Task.Yield(); // exit mutex lock
11✔
287

288
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposedCts.Token);
11✔
289
            cts.CancelAfter(_shutdownTimeout);
11✔
290

291
            try
292
            {
11✔
293
                // Since a pending connection is "detached", it's shutdown and disposed via the connectTask, not
294
                // directly by this method.
295
                try
296
                {
11✔
297
                    await Task.WhenAll(
11✔
298
                        _activeConnections.Values.Select(connection => connection.ShutdownAsync(cts.Token))
8✔
299
                            .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token))).ConfigureAwait(false);
11✔
300
                }
11✔
301
                catch (OperationCanceledException)
×
302
                {
×
303
                    throw;
×
304
                }
305
                catch
×
306
                {
×
307
                    // Ignore other connection shutdown failures.
308

309
                    // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
310
                    cts.Token.ThrowIfCancellationRequested();
×
311
                }
×
312
            }
11✔
313
            catch (OperationCanceledException)
×
314
            {
×
315
                cancellationToken.ThrowIfCancellationRequested();
×
316

317
                if (_disposedCts.IsCancellationRequested)
×
318
                {
×
319
                    throw new IceRpcException(
×
320
                        IceRpcError.OperationAborted,
×
321
                        "The shutdown was aborted because the connection cache was disposed.");
×
322
                }
323
                else
324
                {
×
325
                    throw new TimeoutException(
×
326
                        $"The connection cache shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
×
327
                }
328
            }
329
        }
11✔
330
    }
11✔
331

332
    private async Task CreateConnectTask(IProtocolConnection connection, ServerAddress serverAddress)
333
    {
20✔
334
        await Task.Yield(); // exit mutex lock
20✔
335

336
        // This task "owns" a detachedConnectionCount and as a result _disposedCts can't be disposed.
337
        using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
20✔
338
        cts.CancelAfter(_connectTimeout);
20✔
339

340
        Task shutdownRequested;
341
        Task? connectTask = null;
20✔
342

343
        try
344
        {
20✔
345
            try
346
            {
20✔
347
                (_, shutdownRequested) = await connection.ConnectAsync(cts.Token).ConfigureAwait(false);
20✔
348
            }
17✔
349
            catch (OperationCanceledException)
×
350
            {
×
351
                if (_disposedCts.IsCancellationRequested)
×
352
                {
×
353
                    throw new IceRpcException(
×
354
                        IceRpcError.OperationAborted,
×
355
                        "The connection establishment was aborted because the connection cache was disposed.");
×
356
                }
357
                else
358
                {
×
359
                    throw new TimeoutException(
×
360
                        $"The connection establishment timed out after {_connectTimeout.TotalSeconds} s.");
×
361
                }
362
            }
363
        }
17✔
364
        catch
3✔
365
        {
3✔
366
            lock (_mutex)
367
            {
3✔
368
                // connectTask is executing this method and about to throw.
369
                connectTask = _pendingConnections[serverAddress].ConnectTask;
3✔
370
                _pendingConnections.Remove(serverAddress);
3✔
371
            }
3✔
372

373
            _ = DisposePendingConnectionAsync(connection, connectTask);
3✔
374
            throw;
3✔
375
        }
376

377
        lock (_mutex)
378
        {
17✔
379
            if (_shutdownTask is null)
17✔
380
            {
17✔
381
                // the connection is now "attached" in _activeConnections
382
                _activeConnections.Add(serverAddress, connection);
17✔
383
                _detachedConnectionCount--;
17✔
384
            }
17✔
385
            else
386
            {
×
387
                connectTask = _pendingConnections[serverAddress].ConnectTask;
×
388
            }
×
389
            bool removed = _pendingConnections.Remove(serverAddress);
17✔
390
            Debug.Assert(removed);
17✔
391
        }
17✔
392

393
        if (connectTask is null)
17✔
394
        {
17✔
395
            _ = ShutdownWhenRequestedAsync(connection, serverAddress, shutdownRequested);
17✔
396
        }
17✔
397
        else
398
        {
×
399
            // As soon as this method completes successfully, we shut down then dispose the connection.
400
            _ = DisposePendingConnectionAsync(connection, connectTask);
×
401
        }
×
402

403
        async Task DisposePendingConnectionAsync(IProtocolConnection connection, Task connectTask)
404
        {
3✔
405
            try
406
            {
3✔
407
                await connectTask.ConfigureAwait(false);
3✔
408

409
                // Since we own a detachedConnectionCount, _disposedCts is not disposed.
410
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
×
411
                cts.CancelAfter(_shutdownTimeout);
×
412
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
×
413
            }
×
414
            catch
3✔
415
            {
3✔
416
                // Observe and ignore exceptions.
417
            }
3✔
418

419
            await connection.DisposeAsync().ConfigureAwait(false);
3✔
420

421
            lock (_mutex)
422
            {
3✔
423
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
3✔
424
                {
×
425
                    _detachedConnectionsTcs.SetResult();
×
426
                }
×
427
            }
3✔
428
        }
3✔
429

430
        async Task ShutdownWhenRequestedAsync(
431
            IProtocolConnection connection,
432
            ServerAddress serverAddress,
433
            Task shutdownRequested)
434
        {
17✔
435
            await shutdownRequested.ConfigureAwait(false);
17✔
436
            await RemoveFromActiveAsync(serverAddress, connection).ConfigureAwait(false);
13✔
437
        }
13✔
438
    }
17✔
439

440
    /// <summary>Gets an active connection, by creating and connecting (if necessary) a new protocol connection.
441
    /// </summary>
442
    /// <param name="serverAddressFeature">The server address feature.</param>
443
    /// <param name="cancellationToken">The cancellation token of the invocation calling this method.</param>
444
    private async Task<IProtocolConnection> GetActiveConnectionAsync(
445
        IServerAddressFeature serverAddressFeature,
446
        CancellationToken cancellationToken)
447
    {
17✔
448
        Debug.Assert(serverAddressFeature.ServerAddress is not null);
17✔
449
        Exception? connectionException = null;
17✔
450
        (IProtocolConnection Connection, Task ConnectTask) pendingConnectionValue;
451
        var enumerator = new ServerAddressEnumerator(serverAddressFeature);
17✔
452
        while (enumerator.MoveNext())
20✔
453
        {
20✔
454
            ServerAddress serverAddress = enumerator.Current;
20✔
455
            if (enumerator.CurrentIndex > 0)
20✔
456
            {
3✔
457
                // Rotate the server addresses before each new connection attempt after the initial attempt
458
                serverAddressFeature.RotateAddresses();
3✔
459
            }
3✔
460

461
            try
462
            {
20✔
463
                lock (_mutex)
464
                {
20✔
465
                    if (_disposeTask is not null)
20✔
466
                    {
×
467
                        throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
468
                    }
469
                    else if (_shutdownTask is not null)
20✔
470
                    {
×
471
                        throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache is shut down.");
×
472
                    }
473

474
                    if (_activeConnections.TryGetValue(serverAddress, out IProtocolConnection? connection))
20✔
475
                    {
×
476
                        return connection;
×
477
                    }
478

479
                    if (!_pendingConnections.TryGetValue(serverAddress, out pendingConnectionValue))
20✔
480
                    {
20✔
481
                        connection = _connectionFactory.CreateConnection(serverAddress);
20✔
482
                        _detachedConnectionCount++;
20✔
483
                        pendingConnectionValue = (connection, CreateConnectTask(connection, serverAddress));
20✔
484
                        _pendingConnections.Add(serverAddress, pendingConnectionValue);
20✔
485
                    }
20✔
486
                }
20✔
487
                // ConnectTask itself takes care of scheduling its exception observation when it fails.
488
                await pendingConnectionValue.ConnectTask.WaitAsync(cancellationToken).ConfigureAwait(false);
20✔
489
                return pendingConnectionValue.Connection;
17✔
490
            }
491
            catch (TimeoutException exception)
×
492
            {
×
493
                connectionException = exception;
×
494
            }
×
495
            catch (IceRpcException exception) when (exception.IceRpcError is
3✔
496
                IceRpcError.ConnectionAborted or
3✔
497
                IceRpcError.ConnectionRefused or
3✔
498
                IceRpcError.ServerBusy or
3✔
499
                IceRpcError.ServerUnreachable)
3✔
500
            {
3✔
501
                // keep going unless the connection cache was disposed or shut down
502
                connectionException = exception;
3✔
503
                lock (_mutex)
504
                {
3✔
505
                    if (_shutdownTask is not null)
3✔
506
                    {
×
507
                        throw;
×
508
                    }
509
                }
3✔
510
            }
3✔
511
        }
3✔
512

513
        Debug.Assert(connectionException is not null);
×
514
        ExceptionDispatchInfo.Throw(connectionException);
×
515
        Debug.Assert(false);
×
516
        throw connectionException;
×
517
    }
17✔
518

519
    /// <summary>Removes the connection from _activeConnections, and when successful, shuts down and disposes this
520
    /// connection.</summary>
521
    /// <param name="serverAddress">The server address key in _activeConnections.</param>
522
    /// <param name="connection">The connection to shutdown and dispose after removal.</param>
523
    private Task RemoveFromActiveAsync(ServerAddress serverAddress, IProtocolConnection connection)
524
    {
13✔
525
        lock (_mutex)
526
        {
13✔
527
            if (_shutdownTask is null && _activeConnections.Remove(serverAddress))
13✔
528
            {
9✔
529
                // it's now our connection.
530
                _detachedConnectionCount++;
9✔
531
            }
9✔
532
            else
533
            {
4✔
534
                // Another task owns this connection
535
                return Task.CompletedTask;
4✔
536
            }
537
        }
9✔
538

539
        return ShutdownAndDisposeConnectionAsync();
9✔
540

541
        async Task ShutdownAndDisposeConnectionAsync()
542
        {
9✔
543
            // _disposedCts is not disposed since we own a detachedConnectionCount
544
            using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
9✔
545
            cts.CancelAfter(_shutdownTimeout);
9✔
546

547
            try
548
            {
9✔
549
                await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
9✔
550
            }
9✔
551
            catch
×
552
            {
×
553
                // Ignore connection shutdown failures
554
            }
×
555

556
            await connection.DisposeAsync().ConfigureAwait(false);
9✔
557

558
            lock (_mutex)
559
            {
9✔
560
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
9✔
561
                {
5✔
562
                    _detachedConnectionsTcs.SetResult();
5✔
563
                }
5✔
564
            }
9✔
565
        }
9✔
566
    }
13✔
567

568
    /// <summary>Tries to get an existing connection matching one of the addresses of the server address feature.
569
    /// </summary>
570
    /// <param name="serverAddressFeature">The server address feature.</param>
571
    /// <param name="connection">When this method returns <see langword="true" />, this argument contains an active
572
    /// connection; otherwise, it is set to <see langword="null" />.</param>
573
    /// <returns><see langword="true" /> when an active connection matching any of the addresses of the server address
574
    /// feature is found; otherwise, <see langword="false"/>.</returns>
575
    private bool TryGetActiveConnection(
576
        IServerAddressFeature serverAddressFeature,
577
        [NotNullWhen(true)] out IProtocolConnection? connection)
578
    {
92✔
579
        lock (_mutex)
580
        {
92✔
581
            connection = null;
92✔
582
            if (_disposeTask is not null)
92✔
583
            {
×
584
                throw new IceRpcException(IceRpcError.OperationAborted, "The connection cache was disposed.");
×
585
            }
586

587
            if (_shutdownTask is not null)
92✔
588
            {
×
589
                throw new IceRpcException(IceRpcError.InvocationRefused, "The connection cache was shut down.");
×
590
            }
591

592
            var enumerator = new ServerAddressEnumerator(serverAddressFeature);
92✔
593
            while (enumerator.MoveNext())
112✔
594
            {
97✔
595
                ServerAddress serverAddress = enumerator.Current;
97✔
596
                if (_activeConnections.TryGetValue(serverAddress, out connection))
97✔
597
                {
77✔
598
                    if (enumerator.CurrentIndex > 0)
77✔
599
                    {
1✔
600
                        // This altServerAddress becomes the main server address, and the existing main
601
                        // server address becomes the first alt server address.
602
                        serverAddressFeature.AltServerAddresses = serverAddressFeature.AltServerAddresses
1✔
603
                            .RemoveAt(enumerator.CurrentIndex - 1)
1✔
604
                            .Insert(0, serverAddressFeature.ServerAddress!.Value);
1✔
605
                        serverAddressFeature.ServerAddress = serverAddress;
1✔
606
                    }
1✔
607
                    return true;
77✔
608
                }
609
            }
20✔
610
            return false;
15✔
611
        }
612
    }
92✔
613

614
    /// <summary>A helper struct that implements an enumerator that allows iterating the addresses of an
615
    /// <see cref="IServerAddressFeature" /> without allocations.</summary>
616
    private struct ServerAddressEnumerator
617
    {
618
        internal readonly ServerAddress Current
619
        {
620
            get
621
            {
117✔
622
                Debug.Assert(CurrentIndex >= 0 && CurrentIndex <= _altServerAddresses.Count);
117✔
623
                if (CurrentIndex == 0)
117✔
624
                {
109✔
625
                    Debug.Assert(_mainServerAddress is not null);
109✔
626
                    return _mainServerAddress.Value;
109✔
627
                }
628
                else
629
                {
8✔
630
                    return _altServerAddresses[CurrentIndex - 1];
8✔
631
                }
632
            }
117✔
633
        }
634

635
        internal int Count { get; }
×
636

637
        internal int CurrentIndex { get; private set; } = -1;
955✔
638

639
        private readonly ServerAddress? _mainServerAddress;
640
        private readonly IList<ServerAddress> _altServerAddresses;
641

642
        internal bool MoveNext()
643
        {
132✔
644
            if (CurrentIndex == -1)
132✔
645
            {
109✔
646
                if (_mainServerAddress is not null)
109✔
647
                {
109✔
648
                    CurrentIndex++;
109✔
649
                    return true;
109✔
650
                }
651
                else
652
                {
×
653
                    return false;
×
654
                }
655
            }
656
            else if (CurrentIndex < _altServerAddresses.Count)
23✔
657
            {
8✔
658
                CurrentIndex++;
8✔
659
                return true;
8✔
660
            }
661
            return false;
15✔
662
        }
132✔
663

664
        internal ServerAddressEnumerator(IServerAddressFeature serverAddressFeature)
665
        {
109✔
666
            _mainServerAddress = serverAddressFeature.ServerAddress;
109✔
667
            _altServerAddresses = serverAddressFeature.AltServerAddresses;
109✔
668
            if (_mainServerAddress is null)
109✔
669
            {
×
670
                Count = 0;
×
671
            }
×
672
            else
673
            {
109✔
674
                Count = _altServerAddresses.Count + 1;
109✔
675
            }
109✔
676
        }
109✔
677
    }
678
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc