• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 19579725753

21 Nov 2025 06:22PM UTC coverage: 83.577% (-0.1%) from 83.694%
19579725753

push

github

web-flow
Fix ShutdownRequested_completes_when_inactive_and_inactive_timeout_de… (#4142)

12086 of 14461 relevant lines covered (83.58%)

2964.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.11
src/IceRpc/Server.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Internal;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Diagnostics;
8
using System.Net;
9
using System.Net.Security;
10
using System.Security.Authentication;
11

12
namespace IceRpc;
13

14
/// <summary>A server accepts connections from clients and dispatches the requests it receives over these connections.
15
/// </summary>
16
public sealed class Server : IAsyncDisposable
17
{
18
    private readonly LinkedList<IProtocolConnection> _connections = new();
78✔
19

20
    private readonly TimeSpan _connectTimeout;
21

22
    // A detached connection is a protocol connection that we've decided to connect, or that is connecting, shutting
23
    // down or being disposed. It counts towards _maxConnections and both Server.ShutdownAsync and DisposeAsync wait for
24
    // detached connections to reach 0 using _detachedConnectionsTcs. Such a connection is "detached" because it's not
25
    // in _connections.
26
    private int _detachedConnectionCount;
27

28
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
78✔
29

30
    // A cancellation token source that is canceled by DisposeAsync.
31
    private readonly CancellationTokenSource _disposedCts = new();
78✔
32

33
    private Task? _disposeTask;
34

35
    private readonly Func<IConnectorListener> _listenerFactory;
36

37
    private Task? _listenTask;
38

39
    private readonly int _maxConnections;
40

41
    private readonly int _maxPendingConnections;
42

43
    private readonly object _mutex = new();
78✔
44

45
    private readonly ServerAddress _serverAddress;
46

47
    // A cancellation token source canceled by ShutdownAsync and DisposeAsync.
48
    private readonly CancellationTokenSource _shutdownCts;
49

50
    private Task? _shutdownTask;
51

52
    private readonly TimeSpan _shutdownTimeout;
53

54
    /// <summary>Constructs a server.</summary>
55
    /// <param name="options">The server options.</param>
56
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. The <see
57
    /// langword="null" /> value is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
58
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. The <see
59
    /// langword="null" /> value is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
60
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
61
    /// />.</param>
62
    public Server(
78✔
63
        ServerOptions options,
78✔
64
        IDuplexServerTransport? duplexServerTransport = null,
78✔
65
        IMultiplexedServerTransport? multiplexedServerTransport = null,
78✔
66
        ILogger? logger = null)
78✔
67
    {
78✔
68
        if (options.ConnectionOptions.Dispatcher is null)
78✔
69
        {
×
70
            throw new ArgumentException($"{nameof(ServerOptions.ConnectionOptions.Dispatcher)} cannot be null");
×
71
        }
72

73
        logger ??= NullLogger.Instance;
78✔
74

75
        _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
78✔
76

77
        duplexServerTransport ??= IDuplexServerTransport.Default;
78✔
78
        multiplexedServerTransport ??= IMultiplexedServerTransport.Default;
78✔
79
        _maxConnections = options.MaxConnections;
78✔
80
        _maxPendingConnections = options.MaxPendingConnections;
78✔
81

82
        _connectTimeout = options.ConnectTimeout;
78✔
83
        _shutdownTimeout = options.ShutdownTimeout;
78✔
84

85
        _serverAddress = options.ServerAddress;
78✔
86
        if (_serverAddress.Transport is null)
78✔
87
        {
72✔
88
            _serverAddress = _serverAddress with
72✔
89
            {
72✔
90
                Transport = _serverAddress.Protocol == Protocol.Ice ?
72✔
91
                    duplexServerTransport.Name : multiplexedServerTransport.Name
72✔
92
            };
72✔
93
        }
72✔
94

95
        _listenerFactory = () =>
78✔
96
        {
77✔
97
            IConnectorListener listener;
78✔
98
            if (_serverAddress.Protocol == Protocol.Ice)
77✔
99
            {
21✔
100
                IListener<IDuplexConnection> transportListener = duplexServerTransport.Listen(
21✔
101
                    _serverAddress,
21✔
102
                    new DuplexConnectionOptions
21✔
103
                    {
21✔
104
                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
21✔
105
                        Pool = options.ConnectionOptions.Pool,
21✔
106
                    },
21✔
107
                    options.ServerAuthenticationOptions);
21✔
108

78✔
109
                listener = new IceConnectorListener(
21✔
110
                    transportListener,
21✔
111
                    options.ConnectionOptions);
21✔
112
            }
21✔
113
            else
78✔
114
            {
56✔
115
                IListener<IMultiplexedConnection> transportListener = multiplexedServerTransport.Listen(
56✔
116
                    _serverAddress,
56✔
117
                    new MultiplexedConnectionOptions
56✔
118
                    {
56✔
119
                        MaxBidirectionalStreams = options.ConnectionOptions.MaxIceRpcBidirectionalStreams,
56✔
120
                        // Add an additional stream for the icerpc protocol control stream.
56✔
121
                        MaxUnidirectionalStreams = options.ConnectionOptions.MaxIceRpcUnidirectionalStreams + 1,
56✔
122
                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
56✔
123
                        Pool = options.ConnectionOptions.Pool
56✔
124
                    },
56✔
125
                    options.ServerAuthenticationOptions);
56✔
126

78✔
127
                listener = new IceRpcConnectorListener(
56✔
128
                    transportListener,
56✔
129
                    options.ConnectionOptions,
56✔
130
                    logger == NullLogger.Instance ? null : new LogTaskExceptionObserver(logger));
56✔
131
            }
56✔
132

78✔
133
            listener = new MetricsConnectorListenerDecorator(listener);
77✔
134
            if (logger != NullLogger.Instance)
77✔
135
            {
10✔
136
                listener = new LogConnectorListenerDecorator(listener, logger);
10✔
137
            }
10✔
138
            return listener;
77✔
139
        };
155✔
140
    }
78✔
141

142
    /// <summary>Constructs a server with the specified dispatcher and authentication options. All other properties
143
    /// use the <see cref="ServerOptions" /> defaults.</summary>
144
    /// <param name="dispatcher">The dispatcher of the server.</param>
145
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
146
    /// />, the server will accept only secure connections.</param>
147
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
148
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
149
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
150
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
151
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
152
    /// />.</param>
153
    public Server(
154
        IDispatcher dispatcher,
155
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
156
        IDuplexServerTransport? duplexServerTransport = null,
157
        IMultiplexedServerTransport? multiplexedServerTransport = null,
158
        ILogger? logger = null)
159
        : this(
1✔
160
            new ServerOptions
1✔
161
            {
1✔
162
                ServerAuthenticationOptions = serverAuthenticationOptions,
1✔
163
                ConnectionOptions = new()
1✔
164
                {
1✔
165
                    Dispatcher = dispatcher,
1✔
166
                }
1✔
167
            },
1✔
168
            duplexServerTransport,
1✔
169
            multiplexedServerTransport,
1✔
170
            logger)
1✔
171
    {
1✔
172
    }
1✔
173

174
    /// <summary>Constructs a server with the specified dispatcher, server address and authentication options. All
175
    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
176
    /// <param name="dispatcher">The dispatcher of the server.</param>
177
    /// <param name="serverAddress">The server address of the server.</param>
178
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
179
    /// />, the server will accept only secure connections.</param>
180
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
181
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
182
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
183
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
184
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
185
    /// />.</param>
186
    public Server(
187
        IDispatcher dispatcher,
188
        ServerAddress serverAddress,
189
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
190
        IDuplexServerTransport? duplexServerTransport = null,
191
        IMultiplexedServerTransport? multiplexedServerTransport = null,
192
        ILogger? logger = null)
193
        : this(
25✔
194
            new ServerOptions
25✔
195
            {
25✔
196
                ServerAuthenticationOptions = serverAuthenticationOptions,
25✔
197
                ConnectionOptions = new()
25✔
198
                {
25✔
199
                    Dispatcher = dispatcher,
25✔
200
                },
25✔
201
                ServerAddress = serverAddress
25✔
202
            },
25✔
203
            duplexServerTransport,
25✔
204
            multiplexedServerTransport,
25✔
205
            logger)
25✔
206
    {
25✔
207
    }
25✔
208

209
    /// <summary>Constructs a server with the specified dispatcher, server address URI and authentication options. All
210
    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
211
    /// <param name="dispatcher">The dispatcher of the server.</param>
212
    /// <param name="serverAddressUri">A URI that represents the server address of the server.</param>
213
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
214
    /// />, the server will accept only secure connections.</param>
215
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
216
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
217
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
218
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
219
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
220
    /// />.</param>
221
    public Server(
222
        IDispatcher dispatcher,
223
        Uri serverAddressUri,
224
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
225
        IDuplexServerTransport? duplexServerTransport = null,
226
        IMultiplexedServerTransport? multiplexedServerTransport = null,
227
        ILogger? logger = null)
228
        : this(
5✔
229
            dispatcher,
5✔
230
            new ServerAddress(serverAddressUri),
5✔
231
            serverAuthenticationOptions,
5✔
232
            duplexServerTransport,
5✔
233
            multiplexedServerTransport,
5✔
234
            logger)
5✔
235
    {
5✔
236
    }
5✔
237

238
    /// <summary>Releases all resources allocated by this server. The server stops listening for new connections and
239
    /// disposes the connections it accepted from clients.</summary>
240
    /// <returns>A value task that completes when the disposal of all connections accepted by the server has completed.
241
    /// This includes connections that were active when this method is called and connections whose disposal was
242
    /// initiated prior to this call.</returns>
243
    /// <remarks>The disposal of an underlying connection of the server aborts invocations, cancels dispatches and
244
    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
245
    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
246
    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
247
    public ValueTask DisposeAsync()
248
    {
79✔
249
        lock (_mutex)
79✔
250
        {
79✔
251
            if (_disposeTask is null)
79✔
252
            {
78✔
253
                _shutdownTask ??= Task.CompletedTask;
78✔
254
                if (_detachedConnectionCount == 0)
78✔
255
                {
68✔
256
                    _ = _detachedConnectionsTcs.TrySetResult();
68✔
257
                }
68✔
258

259
                _disposeTask = PerformDisposeAsync();
78✔
260
            }
78✔
261
            return new(_disposeTask);
79✔
262
        }
263

264
        async Task PerformDisposeAsync()
265
        {
78✔
266
            await Task.Yield(); // exit mutex lock
78✔
267

268
            _disposedCts.Cancel();
78✔
269

270
            // _listenTask etc are immutable when _disposeTask is not null.
271

272
            if (_listenTask is not null)
78✔
273
            {
77✔
274
                // Wait for shutdown before disposing connections.
275
                try
276
                {
77✔
277
                    await Task.WhenAll(_listenTask, _shutdownTask).ConfigureAwait(false);
77✔
278
                }
77✔
279
                catch
×
280
                {
×
281
                    // Ignore exceptions.
282
                }
×
283

284
                await Task.WhenAll(
77✔
285
                    _connections
77✔
286
                        .Select(connection => connection.DisposeAsync().AsTask())
39✔
287
                        .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
77✔
288
            }
77✔
289

290
            _disposedCts.Dispose();
78✔
291
            _shutdownCts.Dispose();
78✔
292
        }
78✔
293
    }
79✔
294

295
    /// <summary>Starts accepting connections on the configured server address. Requests received over these connections
296
    /// are then dispatched by the configured dispatcher.</summary>
297
    /// <returns>The server address this server is listening on and that a client would connect to. This address is the
298
    /// same as the <see cref="ServerOptions.ServerAddress" /> of <see cref="ServerOptions" /> except its
299
    /// <see cref="ServerAddress.Transport" /> property is always non-null and its port number is never 0 when the host
300
    /// is an IP address.</returns>
301
    /// <exception cref="IceRpcException">Thrown when the server transport fails to listen on the configured <see
302
    /// cref="ServerOptions.ServerAddress" />.</exception>
303
    /// <exception cref="InvalidOperationException">Thrown when the server is already listening, shut down or shutting
304
    /// down.</exception>
305
    /// <exception cref="ObjectDisposedException">Throw when the server is disposed.</exception>
306
    /// <remarks><see cref="Listen" /> can also throw exceptions from the transport; for example, the transport can
307
    /// reject the server address.</remarks>
308
    public ServerAddress Listen()
309
    {
79✔
310
        lock (_mutex)
79✔
311
        {
79✔
312
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
79✔
313

314
            if (_shutdownTask is not null)
78✔
315
            {
×
316
                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
×
317
            }
318
            if (_listenTask is not null)
78✔
319
            {
1✔
320
                throw new InvalidOperationException($"Server '{this}' is already listening.");
1✔
321
            }
322

323
            IConnectorListener listener = _listenerFactory();
77✔
324
            _listenTask = ListenAsync(listener); // _listenTask owns listener and must dispose it
77✔
325
            return listener.ServerAddress;
77✔
326
        }
327

328
        async Task ListenAsync(IConnectorListener listener)
329
        {
77✔
330
            await Task.Yield(); // exit mutex lock
77✔
331

332
            try
333
            {
77✔
334
                using var pendingConnectionSemaphore = new SemaphoreSlim(
77✔
335
                    _maxPendingConnections,
77✔
336
                    _maxPendingConnections);
77✔
337

338
                while (!_shutdownCts.IsCancellationRequested)
154✔
339
                {
154✔
340
                    await pendingConnectionSemaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
154✔
341

342
                    IConnector? connector = null;
153✔
343
                    do
344
                    {
183✔
345
                        try
346
                        {
183✔
347
                            (connector, _) = await listener.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
183✔
348
                        }
77✔
349
                        catch (Exception exception) when (IsRetryableAcceptException(exception))
106✔
350
                        {
30✔
351
                            // continue
352
                        }
30✔
353
                    }
107✔
354
                    while (connector is null);
107✔
355

356
                    // We don't wait for the connection to be activated or shutdown. This could take a while for some
357
                    // transports such as TLS based transports where the handshake requires few round trips between the
358
                    // client and server. Waiting could also cause a security issue if the client doesn't respond to the
359
                    // connection initialization as we wouldn't be able to accept new connections in the meantime. The
360
                    // call will eventually timeout if the ConnectTimeout expires.
361
                    CancellationToken cancellationToken = _disposedCts.Token;
77✔
362
                    _ = Task.Run(
77✔
363
                        async () =>
77✔
364
                        {
77✔
365
                            try
77✔
366
                            {
77✔
367
                                await ConnectAsync(connector, cancellationToken).ConfigureAwait(false);
77✔
368
                            }
69✔
369
                            catch
8✔
370
                            {
8✔
371
                                // Ignore connection establishment failure. This failures are logged by the
77✔
372
                                // LogConnectorDecorator
77✔
373
                            }
8✔
374
                            finally
77✔
375
                            {
77✔
376
                                // The connection dispose will dispose the transport connection if it has not been
77✔
377
                                // adopted by the protocol connection.
77✔
378
                                await connector.DisposeAsync().ConfigureAwait(false);
77✔
379

77✔
380
                                // The pending connection semaphore is disposed by the listen task completion once
77✔
381
                                // shutdown / dispose is initiated.
77✔
382
                                lock (_mutex)
77✔
383
                                {
77✔
384
                                    if (_shutdownTask is null)
77✔
385
                                    {
70✔
386
                                        pendingConnectionSemaphore.Release();
70✔
387
                                    }
70✔
388
                                }
77✔
389
                            }
77✔
390
                        },
77✔
391
                        CancellationToken.None); // the task must run to dispose the connector.
77✔
392
                }
77✔
393
            }
×
394
            catch
77✔
395
            {
77✔
396
                // Ignore. Exceptions thrown by listener.AcceptAsync are logged by the log decorator when appropriate.
397
            }
77✔
398
            finally
399
            {
77✔
400
                await listener.DisposeAsync().ConfigureAwait(false);
77✔
401
            }
77✔
402

403
            async Task ConnectAsync(IConnector connector, CancellationToken cancellationToken)
404
            {
77✔
405
                using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
77✔
406
                connectCts.CancelAfter(_connectTimeout);
77✔
407

408
                // Connect the transport connection first. This connection establishment can be interrupted by the
409
                // connect timeout or the server ShutdownAsync/DisposeAsync.
410
                TransportConnectionInformation transportConnectionInformation =
77✔
411
                    await connector.ConnectTransportConnectionAsync(connectCts.Token).ConfigureAwait(false);
77✔
412

413
                IProtocolConnection? protocolConnection = null;
72✔
414
                bool serverBusy = false;
72✔
415

416
                lock (_mutex)
72✔
417
                {
72✔
418
                    Debug.Assert(
72✔
419
                        _maxConnections == 0 || _connections.Count + _detachedConnectionCount <= _maxConnections);
72✔
420

421
                    if (_shutdownTask is null)
72✔
422
                    {
72✔
423
                        if (_maxConnections > 0 && (_connections.Count + _detachedConnectionCount) == _maxConnections)
72✔
424
                        {
7✔
425
                            serverBusy = true;
7✔
426
                        }
7✔
427
                        else
428
                        {
65✔
429
                            // The protocol connection adopts the transport connection from the connector and it's
430
                            // now responsible for disposing of it.
431
                            protocolConnection = connector.CreateProtocolConnection(transportConnectionInformation);
65✔
432
                            _detachedConnectionCount++;
65✔
433
                        }
65✔
434
                    }
72✔
435
                }
72✔
436

437
                if (protocolConnection is null)
72✔
438
                {
7✔
439
                    try
440
                    {
7✔
441
                        await connector.RefuseTransportConnectionAsync(serverBusy, connectCts.Token)
7✔
442
                            .ConfigureAwait(false);
7✔
443
                    }
5✔
444
                    catch
2✔
445
                    {
2✔
446
                        // ignore and continue
447
                    }
2✔
448
                    // The transport connection is disposed by the disposal of the connector.
449
                }
7✔
450
                else
451
                {
65✔
452
                    Task shutdownRequested;
453
                    try
454
                    {
65✔
455
                        (_, shutdownRequested) = await protocolConnection.ConnectAsync(connectCts.Token)
65✔
456
                            .ConfigureAwait(false);
65✔
457
                    }
62✔
458
                    catch
3✔
459
                    {
3✔
460
                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: false)
3✔
461
                            .ConfigureAwait(false);
3✔
462
                        throw;
3✔
463
                    }
464

465
                    LinkedListNode<IProtocolConnection>? listNode = null;
62✔
466

467
                    lock (_mutex)
62✔
468
                    {
469
                        if (_shutdownTask is null)
62✔
470
                        {
471
                            listNode = _connections.AddLast(protocolConnection);
62✔
472

473
                            // protocolConnection is no longer a detached connection since it's now "attached" in
474
                            // _connections.
475
                            _detachedConnectionCount--;
62✔
476
                        }
477
                    }
62✔
478

479
                    if (listNode is null)
62✔
480
                    {
481
                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: true)
×
482
                            .ConfigureAwait(false);
×
483
                    }
484
                    else
485
                    {
62✔
486
                        // Schedule removal after successful ConnectAsync.
487
                        _ = ShutdownWhenRequestedAsync(protocolConnection, shutdownRequested, listNode);
62✔
488
                    }
489
                }
62✔
490
            }
491
        }
492

493
        async Task DisposeDetachedConnectionAsync(IProtocolConnection connection, bool withShutdown)
494
        {
26✔
495
            if (withShutdown)
26✔
496
            {
23✔
497
                // _disposedCts is not disposed since we own a _backgroundConnectionDisposeCount.
498
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
23✔
499
                cts.CancelAfter(_shutdownTimeout);
23✔
500

501
                try
502
                {
23✔
503
                    // Can be canceled by DisposeAsync or the shutdown timeout.
504
                    await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
23✔
505
                }
12✔
506
                catch
11✔
507
                {
11✔
508
                    // Ignore connection shutdown failures. connection.ShutdownAsync makes sure it's an "expected"
509
                    // exception.
510
                }
11✔
511
            }
23✔
512

513
            await connection.DisposeAsync().ConfigureAwait(false);
26✔
514
            lock (_mutex)
26✔
515
            {
26✔
516
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
26✔
517
                {
11✔
518
                    _detachedConnectionsTcs.SetResult();
11✔
519
                }
11✔
520
            }
26✔
521
        }
26✔
522

523
        // Remove the connection from _connections after a successful ConnectAsync.
524
        async Task ShutdownWhenRequestedAsync(
525
            IProtocolConnection connection,
526
            Task shutdownRequested,
527
            LinkedListNode<IProtocolConnection> listNode)
528
        {
62✔
529
            await shutdownRequested.ConfigureAwait(false);
62✔
530

531
            lock (_mutex)
48✔
532
            {
48✔
533
                if (_shutdownTask is null)
48✔
534
                {
23✔
535
                    _connections.Remove(listNode);
23✔
536
                    _detachedConnectionCount++;
23✔
537
                }
23✔
538
                else
539
                {
25✔
540
                    // _connections is immutable and ShutdownAsync/DisposeAsync is responsible to shutdown/dispose
541
                    // this connection.
542
                    return;
25✔
543
                }
544
            }
23✔
545

546
            await DisposeDetachedConnectionAsync(connection, withShutdown: true).ConfigureAwait(false);
23✔
547
        }
48✔
548
    }
223✔
549

550
    /// <summary>Gracefully shuts down this server: the server stops accepting new connections and shuts down gracefully
551
    /// all its connections.</summary>
552
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
553
    /// <returns>A task that completes successfully once the shutdown of all connections accepted by the server has
554
    /// completed. This includes connections that were active when this method is called and connections whose shutdown
555
    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
556
    /// <list type="bullet">
557
    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
558
    /// server is disposed while being shut down.</description></item>
559
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
560
    /// cancellation token.</description></item>
561
    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
562
    /// </list>
563
    /// </returns>
564
    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
565
    /// <exception cref="ObjectDisposedException">Thrown if the server is disposed.</exception>
566
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
567
    {
22✔
568
        lock (_mutex)
22✔
569
        {
22✔
570
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
22✔
571

572
            if (_shutdownTask is not null)
22✔
573
            {
×
574
                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
×
575
            }
576

577
            if (_detachedConnectionCount == 0)
22✔
578
            {
21✔
579
                _detachedConnectionsTcs.SetResult();
21✔
580
            }
21✔
581

582
            _shutdownTask = PerformShutdownAsync();
22✔
583
        }
22✔
584
        return _shutdownTask;
22✔
585

586
        async Task PerformShutdownAsync()
587
        {
22✔
588
            await Task.Yield(); // exit mutex lock
22✔
589

590
            _shutdownCts.Cancel();
22✔
591

592
            // _listenTask is immutable once _shutdownTask is not null.
593
            if (_listenTask is not null)
22✔
594
            {
22✔
595
                try
596
                {
22✔
597
                    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
22✔
598
                        cancellationToken,
22✔
599
                        _disposedCts.Token);
22✔
600

601
                    cts.CancelAfter(_shutdownTimeout);
22✔
602

603
                    try
604
                    {
22✔
605
                        await Task.WhenAll(
22✔
606
                            _connections
22✔
607
                                .Select(connection => connection.ShutdownAsync(cts.Token))
12✔
608
                                .Append(_listenTask.WaitAsync(cts.Token))
22✔
609
                                .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token)))
22✔
610
                            .ConfigureAwait(false);
22✔
611
                    }
22✔
612
                    catch (OperationCanceledException)
×
613
                    {
×
614
                        throw;
×
615
                    }
616
                    catch
×
617
                    {
×
618
                        // Ignore _listenTask and connection shutdown exceptions
619

620
                        // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
621
                        cts.Token.ThrowIfCancellationRequested();
×
622
                    }
×
623
                }
22✔
624
                catch (OperationCanceledException)
×
625
                {
×
626
                    cancellationToken.ThrowIfCancellationRequested();
×
627

628
                    if (_disposedCts.IsCancellationRequested)
×
629
                    {
×
630
                        throw new IceRpcException(
×
631
                            IceRpcError.OperationAborted,
×
632
                            "The shutdown was aborted because the server was disposed.");
×
633
                    }
634
                    else
635
                    {
×
636
                        throw new TimeoutException(
×
637
                            $"The server shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
×
638
                    }
639
                }
640
            }
22✔
641
        }
22✔
642
    }
22✔
643

644
    /// <summary>Returns a string that represents this server.</summary>
645
    /// <returns>A string that represents this server.</returns>
646
    public override string ToString() => _serverAddress.ToString();
1✔
647

648
    /// <summary>Returns true if the <see cref="IConnectorListener.AcceptAsync" /> failure can be retried.</summary>
649
    private static bool IsRetryableAcceptException(Exception exception) =>
650
        // Transports such as Quic do the SSL handshake when the connection is accepted, this can throw
651
        // AuthenticationException if it fails.
652
        exception is IceRpcException or AuthenticationException;
106✔
653

654
    /// <summary>Provides a decorator that adds logging to a <see cref="IConnectorListener" />.</summary>
655
    private class LogConnectorListenerDecorator : IConnectorListener
656
    {
657
        public ServerAddress ServerAddress => _decoratee.ServerAddress;
46✔
658

659
        private readonly IConnectorListener _decoratee;
660
        private readonly ILogger _logger;
661

662
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
663
        {
18✔
664
            try
665
            {
18✔
666
                (IConnector connector, EndPoint remoteNetworkAddress) =
18✔
667
                    await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
18✔
668

669
                _logger.LogConnectionAccepted(ServerAddress, remoteNetworkAddress);
8✔
670
                return (
8✔
671
                    new LogConnectorDecorator(connector, ServerAddress, remoteNetworkAddress, _logger),
8✔
672
                    remoteNetworkAddress);
8✔
673
            }
674
            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
10✔
675
            {
10✔
676
                // Do not log this exception. The AcceptAsync call can fail with OperationCanceledException during
677
                // shutdown once the shutdown cancellation token is canceled.
678
                throw;
10✔
679
            }
680
            catch (ObjectDisposedException)
×
681
            {
×
682
                // Do not log this exception. The AcceptAsync call can fail with ObjectDisposedException during
683
                // shutdown once the listener is disposed or if it is accepting a connection while the listener is
684
                // disposed.
685
                throw;
×
686
            }
687
            catch (Exception exception) when (IsRetryableAcceptException(exception))
×
688
            {
×
689
                _logger.LogConnectionAcceptFailedWithRetryableException(ServerAddress, exception);
×
690
                throw;
×
691
            }
692
            catch (Exception exception)
×
693
            {
×
694
                _logger.LogConnectionAcceptFailed(ServerAddress, exception);
×
695
                throw;
×
696
            }
697
        }
8✔
698

699
        public ValueTask DisposeAsync()
700
        {
10✔
701
            _logger.LogStopAcceptingConnections(ServerAddress);
10✔
702
            return _decoratee.DisposeAsync();
10✔
703
        }
10✔
704

705
        internal LogConnectorListenerDecorator(IConnectorListener decoratee, ILogger logger)
10✔
706
        {
10✔
707
            _decoratee = decoratee;
10✔
708
            _logger = logger;
10✔
709
            _logger.LogStartAcceptingConnections(ServerAddress);
10✔
710
        }
10✔
711
    }
712

713
    private class LogConnectorDecorator : IConnector
714
    {
715
        private readonly IConnector _decoratee;
716
        private readonly ILogger _logger;
717
        private readonly EndPoint _remoteNetworkAddress;
718
        private readonly ServerAddress _serverAddress;
719

720
        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
721
            CancellationToken cancellationToken)
722
        {
8✔
723
            try
724
            {
8✔
725
                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
8✔
726
            }
727
            catch (Exception exception)
2✔
728
            {
2✔
729
                _logger.LogConnectionConnectFailed(_serverAddress, _remoteNetworkAddress, exception);
2✔
730
                throw;
2✔
731
            }
732
        }
6✔
733

734
        public IProtocolConnection CreateProtocolConnection(
735
            TransportConnectionInformation transportConnectionInformation) =>
736
            new LogProtocolConnectionDecorator(
6✔
737
                _decoratee.CreateProtocolConnection(transportConnectionInformation),
6✔
738
                _serverAddress,
6✔
739
                _remoteNetworkAddress,
6✔
740
                _logger);
6✔
741

742
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
8✔
743

744
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel) =>
745
            _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel);
×
746

747
        internal LogConnectorDecorator(
8✔
748
            IConnector decoratee,
8✔
749
            ServerAddress serverAddress,
8✔
750
            EndPoint remoteNetworkAddress,
8✔
751
            ILogger logger)
8✔
752
        {
8✔
753
            _decoratee = decoratee;
8✔
754
            _logger = logger;
8✔
755
            _serverAddress = serverAddress;
8✔
756
            _remoteNetworkAddress = remoteNetworkAddress;
8✔
757
        }
8✔
758
    }
759

760
    /// <summary>Provides a decorator that adds metrics to a <see cref="IConnectorListener" />.</summary>
761
    private class MetricsConnectorListenerDecorator : IConnectorListener
762
    {
763
        public ServerAddress ServerAddress => _decoratee.ServerAddress;
113✔
764

765
        private readonly IConnectorListener _decoratee;
766

767
        public async Task<(IConnector, EndPoint)> AcceptAsync(
768
            CancellationToken cancellationToken)
769
        {
183✔
770
            (IConnector connector, EndPoint remoteNetworkAddress) =
183✔
771
                await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
183✔
772
            return (new MetricsConnectorDecorator(connector), remoteNetworkAddress);
77✔
773
        }
77✔
774

775
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
77✔
776

777
        internal MetricsConnectorListenerDecorator(IConnectorListener decoratee) =>
77✔
778
            _decoratee = decoratee;
77✔
779
    }
780

781
    private class MetricsConnectorDecorator : IConnector
782
    {
783
        private readonly IConnector _decoratee;
784

785
        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
786
            CancellationToken cancellationToken)
787
        {
77✔
788
            Metrics.ServerMetrics.ConnectStart();
77✔
789
            try
790
            {
77✔
791
                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
77✔
792
            }
793
            catch
5✔
794
            {
5✔
795
                Metrics.ServerMetrics.ConnectStop();
5✔
796
                Metrics.ServerMetrics.ConnectionFailure();
5✔
797
                throw;
5✔
798
            }
799
        }
72✔
800

801
        public IProtocolConnection CreateProtocolConnection(
802
            TransportConnectionInformation transportConnectionInformation) =>
803
                new MetricsProtocolConnectionDecorator(
65✔
804
                    _decoratee.CreateProtocolConnection(transportConnectionInformation),
65✔
805
                    Metrics.ServerMetrics,
65✔
806
                    connectStarted: true);
65✔
807

808
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
77✔
809

810
        public async Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel)
811
        {
7✔
812
            try
813
            {
7✔
814
                await _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel).ConfigureAwait(false);
7✔
815
            }
5✔
816
            finally
817
            {
7✔
818
                Metrics.ServerMetrics.ConnectionFailure();
7✔
819
                Metrics.ServerMetrics.ConnectStop();
7✔
820
            }
7✔
821
        }
5✔
822

823
        internal MetricsConnectorDecorator(IConnector decoratee) => _decoratee = decoratee;
154✔
824
    }
825

826
    /// <summary>A connector listener accepts a transport connection and returns a <see cref="IConnector" />. The
827
    /// connector is used to refuse the transport connection or obtain a protocol connection once the transport
828
    /// connection is connected.</summary>
829
    private interface IConnectorListener : IAsyncDisposable
830
    {
831
        ServerAddress ServerAddress { get; }
832

833
        Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel);
834
    }
835

836
    /// <summary>A connector is returned by <see cref="IConnectorListener" />. The connector allows to connect the
837
    /// transport connection. If successful, the transport connection can either be refused or accepted by creating the
838
    /// protocol connection out of it.</summary>
839
    private interface IConnector : IAsyncDisposable
840
    {
841
        Task<TransportConnectionInformation> ConnectTransportConnectionAsync(CancellationToken cancellationToken);
842

843
        IProtocolConnection CreateProtocolConnection(TransportConnectionInformation transportConnectionInformation);
844

845
        Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel);
846
    }
847

848
    private class IceConnectorListener : IConnectorListener
849
    {
850
        public ServerAddress ServerAddress => _listener.ServerAddress;
39✔
851

852
        private readonly IListener<IDuplexConnection> _listener;
853
        private readonly ConnectionOptions _options;
854

855
        public ValueTask DisposeAsync() => _listener.DisposeAsync();
21✔
856

857
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
858
        {
43✔
859
            (IDuplexConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
43✔
860
                cancel).ConfigureAwait(false);
43✔
861
            return (new IceConnector(transportConnection, _options), remoteNetworkAddress);
22✔
862
        }
22✔
863

864
        internal IceConnectorListener(IListener<IDuplexConnection> listener, ConnectionOptions options)
21✔
865
        {
21✔
866
            _listener = listener;
21✔
867
            _options = options;
21✔
868
        }
21✔
869
    }
870

871
    private class IceConnector : IConnector
872
    {
873
        private readonly ConnectionOptions _options;
874
        private IDuplexConnection? _transportConnection;
875

876
        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
877
            CancellationToken cancellationToken) =>
878
            _transportConnection!.ConnectAsync(cancellationToken);
22✔
879

880
        public IProtocolConnection CreateProtocolConnection(
881
            TransportConnectionInformation transportConnectionInformation)
882
        {
19✔
883
            // The protocol connection takes ownership of the transport connection.
884
            var protocolConnection = new IceProtocolConnection(
19✔
885
                _transportConnection!,
19✔
886
                transportConnectionInformation,
19✔
887
                _options);
19✔
888
            _transportConnection = null;
19✔
889
            return protocolConnection;
19✔
890
        }
19✔
891

892
        public ValueTask DisposeAsync()
893
        {
22✔
894
            _transportConnection?.Dispose();
22✔
895
            return new();
22✔
896
        }
22✔
897

898
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken)
899
        {
2✔
900
            _transportConnection!.Dispose();
2✔
901
            return Task.CompletedTask;
2✔
902
        }
2✔
903

904
        internal IceConnector(IDuplexConnection transportConnection, ConnectionOptions options)
22✔
905
        {
22✔
906
            _transportConnection = transportConnection;
22✔
907
            _options = options;
22✔
908
        }
22✔
909
    }
910

911
    private class IceRpcConnectorListener : IConnectorListener
912
    {
913
        public ServerAddress ServerAddress => _listener.ServerAddress;
74✔
914

915
        private readonly IListener<IMultiplexedConnection> _listener;
916
        private readonly ConnectionOptions _options;
917
        private readonly ITaskExceptionObserver? _taskExceptionObserver;
918

919
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
920
        {
140✔
921
            (IMultiplexedConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
140✔
922
                cancel).ConfigureAwait(false);
140✔
923
            return (new IceRpcConnector(transportConnection, _options, _taskExceptionObserver), remoteNetworkAddress);
55✔
924
        }
55✔
925

926
        public ValueTask DisposeAsync() => _listener.DisposeAsync();
56✔
927

928
        internal IceRpcConnectorListener(
56✔
929
            IListener<IMultiplexedConnection> listener,
56✔
930
            ConnectionOptions options,
56✔
931
            ITaskExceptionObserver? taskExceptionObserver)
56✔
932
        {
56✔
933
            _listener = listener;
56✔
934
            _options = options;
56✔
935
            _taskExceptionObserver = taskExceptionObserver;
56✔
936
        }
56✔
937
    }
938

939
    private class IceRpcConnector : IConnector
940
    {
941
        private readonly ConnectionOptions _options;
942
        private readonly ITaskExceptionObserver? _taskExceptionObserver;
943
        private IMultiplexedConnection? _transportConnection;
944

945
        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
946
            CancellationToken cancellationToken) =>
947
            _transportConnection!.ConnectAsync(cancellationToken);
55✔
948

949
        public IProtocolConnection CreateProtocolConnection(
950
            TransportConnectionInformation transportConnectionInformation)
951
        {
46✔
952
            // The protocol connection takes ownership of the transport connection.
953
            var protocolConnection = new IceRpcProtocolConnection(
46✔
954
                _transportConnection!,
46✔
955
                transportConnectionInformation,
46✔
956
                _options,
46✔
957
                _taskExceptionObserver);
46✔
958
            _transportConnection = null;
46✔
959
            return protocolConnection;
46✔
960
        }
46✔
961

962
        public ValueTask DisposeAsync() => _transportConnection?.DisposeAsync() ?? new();
55✔
963

964
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken) =>
965
            _transportConnection!.CloseAsync(
5✔
966
                serverBusy ? MultiplexedConnectionCloseError.ServerBusy : MultiplexedConnectionCloseError.Refused,
5✔
967
                cancellationToken);
5✔
968

969
        internal IceRpcConnector(
55✔
970
            IMultiplexedConnection transportConnection,
55✔
971
            ConnectionOptions options,
55✔
972
            ITaskExceptionObserver? taskExceptionObserver)
55✔
973
        {
55✔
974
            _transportConnection = transportConnection;
55✔
975
            _options = options;
55✔
976
            _taskExceptionObserver = taskExceptionObserver;
55✔
977
        }
55✔
978
    }
979
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc