• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

icerpc / icerpc-csharp / 21217263926

21 Jan 2026 04:23PM UTC coverage: 83.477% (+0.1%) from 83.353%
21217263926

Pull #4241

github

web-flow
Merge 2a6d2cbae into 97cb31511
Pull Request #4241: Move build telemetry reporters to src

12100 of 14495 relevant lines covered (83.48%)

2935.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.21
src/IceRpc/Server.cs
1
// Copyright (c) ZeroC, Inc.
2

3
using IceRpc.Internal;
4
using IceRpc.Transports;
5
using Microsoft.Extensions.Logging;
6
using Microsoft.Extensions.Logging.Abstractions;
7
using System.Diagnostics;
8
using System.Net;
9
using System.Net.Security;
10
using System.Security.Authentication;
11

12
namespace IceRpc;
13

14
/// <summary>A server accepts connections from clients and dispatches the requests it receives over these connections.
15
/// </summary>
16
public sealed class Server : IAsyncDisposable
17
{
18
    private readonly LinkedList<IProtocolConnection> _connections = new();
81✔
19

20
    private readonly TimeSpan _connectTimeout;
21

22
    // A detached connection is a protocol connection that we've decided to connect, or that is connecting, shutting
23
    // down or being disposed. It counts towards _maxConnections and both Server.ShutdownAsync and DisposeAsync wait for
24
    // detached connections to reach 0 using _detachedConnectionsTcs. Such a connection is "detached" because it's not
25
    // in _connections.
26
    private int _detachedConnectionCount;
27

28
    private readonly TaskCompletionSource _detachedConnectionsTcs = new();
81✔
29

30
    // A cancellation token source that is canceled by DisposeAsync.
31
    private readonly CancellationTokenSource _disposedCts = new();
81✔
32

33
    private Task? _disposeTask;
34

35
    private readonly Func<IConnectorListener> _listenerFactory;
36

37
    private Task? _listenTask;
38

39
    private readonly int _maxConnections;
40

41
    private readonly int _maxPendingConnections;
42

43
    private readonly Lock _mutex = new();
81✔
44

45
    private readonly ServerAddress _serverAddress;
46

47
    // A cancellation token source canceled by ShutdownAsync and DisposeAsync.
48
    private readonly CancellationTokenSource _shutdownCts;
49

50
    private Task? _shutdownTask;
51

52
    private readonly TimeSpan _shutdownTimeout;
53

54
    /// <summary>Constructs a server.</summary>
55
    /// <param name="options">The server options.</param>
56
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. The <see
57
    /// langword="null" /> value is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
58
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. The <see
59
    /// langword="null" /> value is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
60
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
61
    /// />.</param>
62
    public Server(
81✔
63
        ServerOptions options,
81✔
64
        IDuplexServerTransport? duplexServerTransport = null,
81✔
65
        IMultiplexedServerTransport? multiplexedServerTransport = null,
81✔
66
        ILogger? logger = null)
81✔
67
    {
81✔
68
        if (options.ConnectionOptions.Dispatcher is null)
81✔
69
        {
×
70
            throw new ArgumentException($"{nameof(ServerOptions.ConnectionOptions.Dispatcher)} cannot be null");
×
71
        }
72

73
        logger ??= NullLogger.Instance;
81✔
74

75
        _shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
81✔
76

77
        duplexServerTransport ??= IDuplexServerTransport.Default;
81✔
78
        multiplexedServerTransport ??= IMultiplexedServerTransport.Default;
81✔
79
        _maxConnections = options.MaxConnections;
81✔
80
        _maxPendingConnections = options.MaxPendingConnections;
81✔
81

82
        _connectTimeout = options.ConnectTimeout;
81✔
83
        _shutdownTimeout = options.ShutdownTimeout;
81✔
84

85
        _serverAddress = options.ServerAddress;
81✔
86
        if (_serverAddress.Transport is null)
81✔
87
        {
75✔
88
            _serverAddress = _serverAddress with
75✔
89
            {
75✔
90
                Transport = _serverAddress.Protocol == Protocol.Ice ?
75✔
91
                    duplexServerTransport.Name : multiplexedServerTransport.Name
75✔
92
            };
75✔
93
        }
75✔
94

95
        _listenerFactory = () =>
81✔
96
        {
80✔
97
            IConnectorListener listener;
81✔
98
            if (_serverAddress.Protocol == Protocol.Ice)
80✔
99
            {
21✔
100
                IListener<IDuplexConnection> transportListener = duplexServerTransport.Listen(
21✔
101
                    _serverAddress,
21✔
102
                    new DuplexConnectionOptions
21✔
103
                    {
21✔
104
                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
21✔
105
                        Pool = options.ConnectionOptions.Pool,
21✔
106
                    },
21✔
107
                    options.ServerAuthenticationOptions);
21✔
108

81✔
109
                listener = new IceConnectorListener(
21✔
110
                    transportListener,
21✔
111
                    options.ConnectionOptions);
21✔
112
            }
21✔
113
            else
81✔
114
            {
59✔
115
                IListener<IMultiplexedConnection> transportListener = multiplexedServerTransport.Listen(
59✔
116
                    _serverAddress,
59✔
117
                    new MultiplexedConnectionOptions
59✔
118
                    {
59✔
119
                        HandshakeTimeout = options.ConnectTimeout,
59✔
120
                        MaxBidirectionalStreams = options.ConnectionOptions.MaxIceRpcBidirectionalStreams,
59✔
121
                        // Add an additional stream for the icerpc protocol control stream.
59✔
122
                        MaxUnidirectionalStreams = options.ConnectionOptions.MaxIceRpcUnidirectionalStreams + 1,
59✔
123
                        MinSegmentSize = options.ConnectionOptions.MinSegmentSize,
59✔
124
                        Pool = options.ConnectionOptions.Pool
59✔
125
                    },
59✔
126
                    options.ServerAuthenticationOptions);
59✔
127

81✔
128
                listener = new IceRpcConnectorListener(
59✔
129
                    transportListener,
59✔
130
                    options.ConnectionOptions,
59✔
131
                    logger == NullLogger.Instance ? null : new LogTaskExceptionObserver(logger));
59✔
132
            }
59✔
133

81✔
134
            listener = new MetricsConnectorListenerDecorator(listener);
80✔
135
            if (logger != NullLogger.Instance)
80✔
136
            {
10✔
137
                listener = new LogConnectorListenerDecorator(listener, logger);
10✔
138
            }
10✔
139
            return listener;
80✔
140
        };
161✔
141
    }
81✔
142

143
    /// <summary>Constructs a server with the specified dispatcher and authentication options. All other properties
144
    /// use the <see cref="ServerOptions" /> defaults.</summary>
145
    /// <param name="dispatcher">The dispatcher of the server.</param>
146
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
147
    /// />, the server will accept only secure connections.</param>
148
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
149
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
150
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
151
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
152
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
153
    /// />.</param>
154
    public Server(
155
        IDispatcher dispatcher,
156
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
157
        IDuplexServerTransport? duplexServerTransport = null,
158
        IMultiplexedServerTransport? multiplexedServerTransport = null,
159
        ILogger? logger = null)
160
        : this(
1✔
161
            new ServerOptions
1✔
162
            {
1✔
163
                ServerAuthenticationOptions = serverAuthenticationOptions,
1✔
164
                ConnectionOptions = new()
1✔
165
                {
1✔
166
                    Dispatcher = dispatcher,
1✔
167
                }
1✔
168
            },
1✔
169
            duplexServerTransport,
1✔
170
            multiplexedServerTransport,
1✔
171
            logger)
1✔
172
    {
1✔
173
    }
1✔
174

175
    /// <summary>Constructs a server with the specified dispatcher, server address and authentication options. All
176
    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
177
    /// <param name="dispatcher">The dispatcher of the server.</param>
178
    /// <param name="serverAddress">The server address of the server.</param>
179
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
180
    /// />, the server will accept only secure connections.</param>
181
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
182
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
183
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
184
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
185
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
186
    /// />.</param>
187
    public Server(
188
        IDispatcher dispatcher,
189
        ServerAddress serverAddress,
190
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
191
        IDuplexServerTransport? duplexServerTransport = null,
192
        IMultiplexedServerTransport? multiplexedServerTransport = null,
193
        ILogger? logger = null)
194
        : this(
25✔
195
            new ServerOptions
25✔
196
            {
25✔
197
                ServerAuthenticationOptions = serverAuthenticationOptions,
25✔
198
                ConnectionOptions = new()
25✔
199
                {
25✔
200
                    Dispatcher = dispatcher,
25✔
201
                },
25✔
202
                ServerAddress = serverAddress
25✔
203
            },
25✔
204
            duplexServerTransport,
25✔
205
            multiplexedServerTransport,
25✔
206
            logger)
25✔
207
    {
25✔
208
    }
25✔
209

210
    /// <summary>Constructs a server with the specified dispatcher, server address URI and authentication options. All
211
    /// other properties use the <see cref="ServerOptions" /> defaults.</summary>
212
    /// <param name="dispatcher">The dispatcher of the server.</param>
213
    /// <param name="serverAddressUri">A URI that represents the server address of the server.</param>
214
    /// <param name="serverAuthenticationOptions">The SSL server authentication options. When not <see langword="null"
215
    /// />, the server will accept only secure connections.</param>
216
    /// <param name="duplexServerTransport">The transport used to create ice protocol connections. <see langword="null"
217
    /// /> is equivalent to <see cref="IDuplexServerTransport.Default" />.</param>
218
    /// <param name="multiplexedServerTransport">The transport used to create icerpc protocol connections. <see
219
    /// langword="null" /> is equivalent to <see cref="IMultiplexedServerTransport.Default" />.</param>
220
    /// <param name="logger">The logger. <see langword="null" /> is equivalent to <see cref="NullLogger.Instance"
221
    /// />.</param>
222
    public Server(
223
        IDispatcher dispatcher,
224
        Uri serverAddressUri,
225
        SslServerAuthenticationOptions? serverAuthenticationOptions = null,
226
        IDuplexServerTransport? duplexServerTransport = null,
227
        IMultiplexedServerTransport? multiplexedServerTransport = null,
228
        ILogger? logger = null)
229
        : this(
5✔
230
            dispatcher,
5✔
231
            new ServerAddress(serverAddressUri),
5✔
232
            serverAuthenticationOptions,
5✔
233
            duplexServerTransport,
5✔
234
            multiplexedServerTransport,
5✔
235
            logger)
5✔
236
    {
5✔
237
    }
5✔
238

239
    /// <summary>Releases all resources allocated by this server. The server stops listening for new connections and
240
    /// disposes the connections it accepted from clients.</summary>
241
    /// <returns>A value task that completes when the disposal of all connections accepted by the server has completed.
242
    /// This includes connections that were active when this method is called and connections whose disposal was
243
    /// initiated prior to this call.</returns>
244
    /// <remarks>The disposal of an underlying connection of the server aborts invocations, cancels dispatches and
245
    /// disposes the underlying transport connection without waiting for the peer. To wait for invocations and
246
    /// dispatches to complete, call <see cref="ShutdownAsync" /> first. If the configured dispatcher does not complete
247
    /// promptly when its cancellation token is canceled, the disposal can hang.</remarks>
248
    public ValueTask DisposeAsync()
249
    {
82✔
250
        lock (_mutex)
251
        {
82✔
252
            if (_disposeTask is null)
82✔
253
            {
81✔
254
                _shutdownTask ??= Task.CompletedTask;
81✔
255
                if (_detachedConnectionCount == 0)
81✔
256
                {
72✔
257
                    _ = _detachedConnectionsTcs.TrySetResult();
72✔
258
                }
72✔
259

260
                _disposeTask = PerformDisposeAsync();
81✔
261
            }
81✔
262
            return new(_disposeTask);
82✔
263
        }
264

265
        async Task PerformDisposeAsync()
266
        {
81✔
267
            await Task.Yield(); // exit mutex lock
81✔
268

269
            _disposedCts.Cancel();
81✔
270

271
            // _listenTask etc are immutable when _disposeTask is not null.
272

273
            if (_listenTask is not null)
81✔
274
            {
80✔
275
                // Wait for shutdown before disposing connections.
276
                try
277
                {
80✔
278
                    await Task.WhenAll(_listenTask, _shutdownTask).ConfigureAwait(false);
80✔
279
                }
80✔
280
                catch
×
281
                {
×
282
                    // Ignore exceptions.
283
                }
×
284

285
                await Task.WhenAll(
80✔
286
                    _connections
80✔
287
                        .Select(connection => connection.DisposeAsync().AsTask())
46✔
288
                        .Append(_detachedConnectionsTcs.Task)).ConfigureAwait(false);
80✔
289
            }
80✔
290

291
            _disposedCts.Dispose();
81✔
292
            _shutdownCts.Dispose();
81✔
293
        }
81✔
294
    }
82✔
295

296
    /// <summary>Starts accepting connections on the configured server address. Requests received over these connections
297
    /// are then dispatched by the configured dispatcher.</summary>
298
    /// <returns>The server address this server is listening on and that a client would connect to. This address is the
299
    /// same as the <see cref="ServerOptions.ServerAddress" /> of <see cref="ServerOptions" /> except its
300
    /// <see cref="ServerAddress.Transport" /> property is always non-null and its port number is never 0 when the host
301
    /// is an IP address.</returns>
302
    /// <exception cref="IceRpcException">Thrown when the server transport fails to listen on the configured <see
303
    /// cref="ServerOptions.ServerAddress" />.</exception>
304
    /// <exception cref="InvalidOperationException">Thrown when the server is already listening, shut down or shutting
305
    /// down.</exception>
306
    /// <exception cref="ObjectDisposedException">Throw when the server is disposed.</exception>
307
    /// <remarks><see cref="Listen" /> can also throw exceptions from the transport; for example, the transport can
308
    /// reject the server address.</remarks>
309
    public ServerAddress Listen()
310
    {
82✔
311
        lock (_mutex)
312
        {
82✔
313
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
82✔
314

315
            if (_shutdownTask is not null)
81✔
316
            {
×
317
                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
×
318
            }
319
            if (_listenTask is not null)
81✔
320
            {
1✔
321
                throw new InvalidOperationException($"Server '{this}' is already listening.");
1✔
322
            }
323

324
            IConnectorListener listener = _listenerFactory();
80✔
325
            _listenTask = ListenAsync(listener); // _listenTask owns listener and must dispose it
80✔
326
            return listener.ServerAddress;
80✔
327
        }
328

329
        async Task ListenAsync(IConnectorListener listener)
330
        {
80✔
331
            await Task.Yield(); // exit mutex lock
80✔
332

333
            try
334
            {
80✔
335
                using var pendingConnectionSemaphore = new SemaphoreSlim(
80✔
336
                    _maxPendingConnections,
80✔
337
                    _maxPendingConnections);
80✔
338

339
                while (!_shutdownCts.IsCancellationRequested)
159✔
340
                {
158✔
341
                    await pendingConnectionSemaphore.WaitAsync(_shutdownCts.Token).ConfigureAwait(false);
158✔
342

343
                    IConnector? connector = null;
158✔
344
                    do
345
                    {
345✔
346
                        try
347
                        {
345✔
348
                            (connector, _) = await listener.AcceptAsync(_shutdownCts.Token).ConfigureAwait(false);
345✔
349
                        }
79✔
350
                        catch (Exception exception) when (IsRetryableAcceptException(exception))
266✔
351
                        {
187✔
352
                            // continue
353
                        }
187✔
354
                    }
266✔
355
                    while (connector is null);
266✔
356

357
                    // We don't wait for the connection to be activated or shutdown. This could take a while for some
358
                    // transports such as TLS based transports where the handshake requires few round trips between the
359
                    // client and server. Waiting could also cause a security issue if the client doesn't respond to the
360
                    // connection initialization as we wouldn't be able to accept new connections in the meantime. The
361
                    // call will eventually timeout if the ConnectTimeout expires.
362
                    CancellationToken cancellationToken = _disposedCts.Token;
79✔
363
                    _ = Task.Run(
79✔
364
                        async () =>
79✔
365
                        {
79✔
366
                            try
79✔
367
                            {
79✔
368
                                await ConnectAsync(connector, cancellationToken).ConfigureAwait(false);
79✔
369
                            }
73✔
370
                            catch
6✔
371
                            {
6✔
372
                                // Ignore connection establishment failure. This failures are logged by the
79✔
373
                                // LogConnectorDecorator
79✔
374
                            }
6✔
375
                            finally
79✔
376
                            {
79✔
377
                                // The connection dispose will dispose the transport connection if it has not been
79✔
378
                                // adopted by the protocol connection.
79✔
379
                                await connector.DisposeAsync().ConfigureAwait(false);
79✔
380

79✔
381
                                // The pending connection semaphore is disposed by the listen task completion once
79✔
382
                                // shutdown / dispose is initiated.
79✔
383
                                lock (_mutex)
79✔
384
                                {
79✔
385
                                    if (_shutdownTask is null)
79✔
386
                                    {
74✔
387
                                        pendingConnectionSemaphore.Release();
74✔
388
                                    }
74✔
389
                                }
79✔
390
                            }
79✔
391
                        },
79✔
392
                        CancellationToken.None); // the task must run to dispose the connector.
79✔
393
                }
79✔
394
            }
1✔
395
            catch
79✔
396
            {
79✔
397
                // Ignore. Exceptions thrown by listener.AcceptAsync are logged by the log decorator when appropriate.
398
            }
79✔
399
            finally
400
            {
80✔
401
                await listener.DisposeAsync().ConfigureAwait(false);
80✔
402
            }
80✔
403

404
            async Task ConnectAsync(IConnector connector, CancellationToken cancellationToken)
405
            {
79✔
406
                using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
79✔
407
                connectCts.CancelAfter(_connectTimeout);
79✔
408

409
                // Connect the transport connection first. This connection establishment can be interrupted by the
410
                // connect timeout or the server ShutdownAsync/DisposeAsync.
411
                TransportConnectionInformation transportConnectionInformation =
79✔
412
                    await connector.ConnectTransportConnectionAsync(connectCts.Token).ConfigureAwait(false);
79✔
413

414
                IProtocolConnection? protocolConnection = null;
75✔
415
                bool serverBusy = false;
75✔
416

417
                lock (_mutex)
418
                {
75✔
419
                    Debug.Assert(
75✔
420
                        _maxConnections == 0 || _connections.Count + _detachedConnectionCount <= _maxConnections);
75✔
421

422
                    if (_shutdownTask is null)
75✔
423
                    {
75✔
424
                        if (_maxConnections > 0 && (_connections.Count + _detachedConnectionCount) == _maxConnections)
75✔
425
                        {
7✔
426
                            serverBusy = true;
7✔
427
                        }
7✔
428
                        else
429
                        {
68✔
430
                            // The protocol connection adopts the transport connection from the connector and it's
431
                            // now responsible for disposing of it.
432
                            protocolConnection = connector.CreateProtocolConnection(transportConnectionInformation);
68✔
433
                            _detachedConnectionCount++;
68✔
434
                        }
68✔
435
                    }
75✔
436
                }
75✔
437

438
                if (protocolConnection is null)
75✔
439
                {
7✔
440
                    try
441
                    {
7✔
442
                        await connector.RefuseTransportConnectionAsync(serverBusy, connectCts.Token)
7✔
443
                            .ConfigureAwait(false);
7✔
444
                    }
4✔
445
                    catch
3✔
446
                    {
3✔
447
                        // ignore and continue
448
                    }
3✔
449
                    // The transport connection is disposed by the disposal of the connector.
450
                }
7✔
451
                else
452
                {
68✔
453
                    Task shutdownRequested;
454
                    try
455
                    {
68✔
456
                        (_, shutdownRequested) = await protocolConnection.ConnectAsync(connectCts.Token)
68✔
457
                            .ConfigureAwait(false);
68✔
458
                    }
66✔
459
                    catch
2✔
460
                    {
2✔
461
                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: false)
2✔
462
                            .ConfigureAwait(false);
2✔
463
                        throw;
2✔
464
                    }
465

466
                    LinkedListNode<IProtocolConnection>? listNode = null;
66✔
467

468
                    lock (_mutex)
469
                    {
470
                        if (_shutdownTask is null)
66✔
471
                        {
472
                            listNode = _connections.AddLast(protocolConnection);
66✔
473

474
                            // protocolConnection is no longer a detached connection since it's now "attached" in
475
                            // _connections.
476
                            _detachedConnectionCount--;
66✔
477
                        }
478
                    }
66✔
479

480
                    if (listNode is null)
66✔
481
                    {
482
                        await DisposeDetachedConnectionAsync(protocolConnection, withShutdown: true)
×
483
                            .ConfigureAwait(false);
×
484
                    }
485
                    else
486
                    {
66✔
487
                        // Schedule removal after successful ConnectAsync.
488
                        _ = ShutdownWhenRequestedAsync(protocolConnection, shutdownRequested, listNode);
66✔
489
                    }
490
                }
66✔
491
            }
492
        }
493

494
        async Task DisposeDetachedConnectionAsync(IProtocolConnection connection, bool withShutdown)
495
        {
22✔
496
            if (withShutdown)
22✔
497
            {
20✔
498
                // _disposedCts is not disposed since we own a _backgroundConnectionDisposeCount.
499
                using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
20✔
500
                cts.CancelAfter(_shutdownTimeout);
20✔
501

502
                try
503
                {
20✔
504
                    // Can be canceled by DisposeAsync or the shutdown timeout.
505
                    await connection.ShutdownAsync(cts.Token).ConfigureAwait(false);
20✔
506
                }
12✔
507
                catch
8✔
508
                {
8✔
509
                    // Ignore connection shutdown failures. connection.ShutdownAsync makes sure it's an "expected"
510
                    // exception.
511
                }
8✔
512
            }
20✔
513

514
            await connection.DisposeAsync().ConfigureAwait(false);
22✔
515
            lock (_mutex)
516
            {
22✔
517
                if (--_detachedConnectionCount == 0 && _shutdownTask is not null)
22✔
518
                {
10✔
519
                    _detachedConnectionsTcs.SetResult();
10✔
520
                }
10✔
521
            }
22✔
522
        }
22✔
523

524
        // Remove the connection from _connections after a successful ConnectAsync.
525
        async Task ShutdownWhenRequestedAsync(
526
            IProtocolConnection connection,
527
            Task shutdownRequested,
528
            LinkedListNode<IProtocolConnection> listNode)
529
        {
66✔
530
            await shutdownRequested.ConfigureAwait(false);
66✔
531

532
            lock (_mutex)
533
            {
52✔
534
                if (_shutdownTask is null)
52✔
535
                {
20✔
536
                    _connections.Remove(listNode);
20✔
537
                    _detachedConnectionCount++;
20✔
538
                }
20✔
539
                else
540
                {
32✔
541
                    // _connections is immutable and ShutdownAsync/DisposeAsync is responsible to shutdown/dispose
542
                    // this connection.
543
                    return;
32✔
544
                }
545
            }
20✔
546

547
            await DisposeDetachedConnectionAsync(connection, withShutdown: true).ConfigureAwait(false);
20✔
548
        }
52✔
549
    }
233✔
550

551
    /// <summary>Gracefully shuts down this server: the server stops accepting new connections and shuts down gracefully
552
    /// all its connections.</summary>
553
    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
554
    /// <returns>A task that completes successfully once the shutdown of all connections accepted by the server has
555
    /// completed. This includes connections that were active when this method is called and connections whose shutdown
556
    /// was initiated prior to this call. This task can also complete with one of the following exceptions:
557
    /// <list type="bullet">
558
    /// <item><description><see cref="IceRpcException" /> with error <see cref="IceRpcError.OperationAborted" /> if the
559
    /// server is disposed while being shut down.</description></item>
560
    /// <item><description><see cref="OperationCanceledException" /> if cancellation was requested through the
561
    /// cancellation token.</description></item>
562
    /// <item><description><see cref="TimeoutException" /> if the shutdown timed out.</description></item>
563
    /// </list>
564
    /// </returns>
565
    /// <exception cref="InvalidOperationException">Thrown if this method is called more than once.</exception>
566
    /// <exception cref="ObjectDisposedException">Thrown if the server is disposed.</exception>
567
    public Task ShutdownAsync(CancellationToken cancellationToken = default)
568
    {
23✔
569
        lock (_mutex)
570
        {
23✔
571
            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
23✔
572

573
            if (_shutdownTask is not null)
23✔
574
            {
×
575
                throw new InvalidOperationException($"Server '{this}' is shut down or shutting down.");
×
576
            }
577

578
            if (_detachedConnectionCount == 0)
23✔
579
            {
22✔
580
                _detachedConnectionsTcs.SetResult();
22✔
581
            }
22✔
582

583
            _shutdownTask = PerformShutdownAsync();
23✔
584
        }
23✔
585
        return _shutdownTask;
23✔
586

587
        async Task PerformShutdownAsync()
588
        {
23✔
589
            await Task.Yield(); // exit mutex lock
23✔
590

591
            _shutdownCts.Cancel();
23✔
592

593
            // _listenTask is immutable once _shutdownTask is not null.
594
            if (_listenTask is not null)
23✔
595
            {
23✔
596
                try
597
                {
23✔
598
                    using var cts = CancellationTokenSource.CreateLinkedTokenSource(
23✔
599
                        cancellationToken,
23✔
600
                        _disposedCts.Token);
23✔
601

602
                    cts.CancelAfter(_shutdownTimeout);
23✔
603

604
                    try
605
                    {
23✔
606
                        await Task.WhenAll(
23✔
607
                            _connections
23✔
608
                                .Select(connection => connection.ShutdownAsync(cts.Token))
13✔
609
                                .Append(_listenTask.WaitAsync(cts.Token))
23✔
610
                                .Append(_detachedConnectionsTcs.Task.WaitAsync(cts.Token)))
23✔
611
                            .ConfigureAwait(false);
23✔
612
                    }
23✔
613
                    catch (OperationCanceledException)
×
614
                    {
×
615
                        throw;
×
616
                    }
617
                    catch
×
618
                    {
×
619
                        // Ignore _listenTask and connection shutdown exceptions
620

621
                        // Throw OperationCanceledException if this WhenAll exception is hiding an OCE.
622
                        cts.Token.ThrowIfCancellationRequested();
×
623
                    }
×
624
                }
23✔
625
                catch (OperationCanceledException)
×
626
                {
×
627
                    cancellationToken.ThrowIfCancellationRequested();
×
628

629
                    if (_disposedCts.IsCancellationRequested)
×
630
                    {
×
631
                        throw new IceRpcException(
×
632
                            IceRpcError.OperationAborted,
×
633
                            "The shutdown was aborted because the server was disposed.");
×
634
                    }
635
                    else
636
                    {
×
637
                        throw new TimeoutException(
×
638
                            $"The server shut down timed out after {_shutdownTimeout.TotalSeconds} s.");
×
639
                    }
640
                }
641
            }
23✔
642
        }
23✔
643
    }
23✔
644

645
    /// <summary>Returns a string that represents this server.</summary>
646
    /// <returns>A string that represents this server.</returns>
647
    public override string ToString() => _serverAddress.ToString();
1✔
648

649
    /// <summary>Returns true if the <see cref="IConnectorListener.AcceptAsync" /> failure can be retried.</summary>
650
    private static bool IsRetryableAcceptException(Exception exception) =>
651
        // Transports such as QUIC do the SSL handshake when the connection is accepted, this can throw
652
        // AuthenticationException if it fails.
653
        exception is IceRpcException or AuthenticationException;
266✔
654

655
    /// <summary>Provides a decorator that adds logging to a <see cref="IConnectorListener" />.</summary>
656
    private class LogConnectorListenerDecorator : IConnectorListener
657
    {
658
        public ServerAddress ServerAddress => _decoratee.ServerAddress;
46✔
659

660
        private readonly IConnectorListener _decoratee;
661
        private readonly ILogger _logger;
662

663
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancellationToken)
664
        {
18✔
665
            try
666
            {
18✔
667
                (IConnector connector, EndPoint remoteNetworkAddress) =
18✔
668
                    await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
18✔
669

670
                _logger.LogConnectionAccepted(ServerAddress, remoteNetworkAddress);
8✔
671
                return (
8✔
672
                    new LogConnectorDecorator(connector, ServerAddress, remoteNetworkAddress, _logger),
8✔
673
                    remoteNetworkAddress);
8✔
674
            }
675
            catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
10✔
676
            {
10✔
677
                // Do not log this exception. The AcceptAsync call can fail with OperationCanceledException during
678
                // shutdown once the shutdown cancellation token is canceled.
679
                throw;
10✔
680
            }
681
            catch (ObjectDisposedException)
×
682
            {
×
683
                // Do not log this exception. The AcceptAsync call can fail with ObjectDisposedException during
684
                // shutdown once the listener is disposed or if it is accepting a connection while the listener is
685
                // disposed.
686
                throw;
×
687
            }
688
            catch (Exception exception) when (IsRetryableAcceptException(exception))
×
689
            {
×
690
                _logger.LogConnectionAcceptFailedWithRetryableException(ServerAddress, exception);
×
691
                throw;
×
692
            }
693
            catch (Exception exception)
×
694
            {
×
695
                _logger.LogConnectionAcceptFailed(ServerAddress, exception);
×
696
                throw;
×
697
            }
698
        }
8✔
699

700
        public ValueTask DisposeAsync()
701
        {
10✔
702
            _logger.LogStopAcceptingConnections(ServerAddress);
10✔
703
            return _decoratee.DisposeAsync();
10✔
704
        }
10✔
705

706
        internal LogConnectorListenerDecorator(IConnectorListener decoratee, ILogger logger)
10✔
707
        {
10✔
708
            _decoratee = decoratee;
10✔
709
            _logger = logger;
10✔
710
            _logger.LogStartAcceptingConnections(ServerAddress);
10✔
711
        }
10✔
712
    }
713

714
    private class LogConnectorDecorator : IConnector
715
    {
716
        private readonly IConnector _decoratee;
717
        private readonly ILogger _logger;
718
        private readonly EndPoint _remoteNetworkAddress;
719
        private readonly ServerAddress _serverAddress;
720

721
        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
722
            CancellationToken cancellationToken)
723
        {
8✔
724
            try
725
            {
8✔
726
                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
8✔
727
            }
728
            catch (Exception exception)
2✔
729
            {
2✔
730
                _logger.LogConnectionConnectFailed(_serverAddress, _remoteNetworkAddress, exception);
2✔
731
                throw;
2✔
732
            }
733
        }
6✔
734

735
        public IProtocolConnection CreateProtocolConnection(
736
            TransportConnectionInformation transportConnectionInformation) =>
737
            new LogProtocolConnectionDecorator(
6✔
738
                _decoratee.CreateProtocolConnection(transportConnectionInformation),
6✔
739
                _serverAddress,
6✔
740
                _remoteNetworkAddress,
6✔
741
                _logger);
6✔
742

743
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
8✔
744

745
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel) =>
746
            _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel);
×
747

748
        internal LogConnectorDecorator(
8✔
749
            IConnector decoratee,
8✔
750
            ServerAddress serverAddress,
8✔
751
            EndPoint remoteNetworkAddress,
8✔
752
            ILogger logger)
8✔
753
        {
8✔
754
            _decoratee = decoratee;
8✔
755
            _logger = logger;
8✔
756
            _serverAddress = serverAddress;
8✔
757
            _remoteNetworkAddress = remoteNetworkAddress;
8✔
758
        }
8✔
759
    }
760

761
    /// <summary>Provides a decorator that adds metrics to a <see cref="IConnectorListener" />.</summary>
762
    private class MetricsConnectorListenerDecorator : IConnectorListener
763
    {
764
        public ServerAddress ServerAddress => _decoratee.ServerAddress;
116✔
765

766
        private readonly IConnectorListener _decoratee;
767

768
        public async Task<(IConnector, EndPoint)> AcceptAsync(
769
            CancellationToken cancellationToken)
770
        {
345✔
771
            (IConnector connector, EndPoint remoteNetworkAddress) =
345✔
772
                await _decoratee.AcceptAsync(cancellationToken).ConfigureAwait(false);
345✔
773
            return (new MetricsConnectorDecorator(connector), remoteNetworkAddress);
79✔
774
        }
79✔
775

776
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
80✔
777

778
        internal MetricsConnectorListenerDecorator(IConnectorListener decoratee) =>
80✔
779
            _decoratee = decoratee;
80✔
780
    }
781

782
    private class MetricsConnectorDecorator : IConnector
783
    {
784
        private readonly IConnector _decoratee;
785

786
        public async Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
787
            CancellationToken cancellationToken)
788
        {
79✔
789
            Metrics.ServerMetrics.ConnectStart();
79✔
790
            try
791
            {
79✔
792
                return await _decoratee.ConnectTransportConnectionAsync(cancellationToken).ConfigureAwait(false);
79✔
793
            }
794
            catch
4✔
795
            {
4✔
796
                Metrics.ServerMetrics.ConnectStop();
4✔
797
                Metrics.ServerMetrics.ConnectionFailure();
4✔
798
                throw;
4✔
799
            }
800
        }
75✔
801

802
        public IProtocolConnection CreateProtocolConnection(
803
            TransportConnectionInformation transportConnectionInformation) =>
804
                new MetricsProtocolConnectionDecorator(
68✔
805
                    _decoratee.CreateProtocolConnection(transportConnectionInformation),
68✔
806
                    Metrics.ServerMetrics,
68✔
807
                    connectStarted: true);
68✔
808

809
        public ValueTask DisposeAsync() => _decoratee.DisposeAsync();
79✔
810

811
        public async Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel)
812
        {
7✔
813
            try
814
            {
7✔
815
                await _decoratee.RefuseTransportConnectionAsync(serverBusy, cancel).ConfigureAwait(false);
7✔
816
            }
4✔
817
            finally
818
            {
7✔
819
                Metrics.ServerMetrics.ConnectionFailure();
7✔
820
                Metrics.ServerMetrics.ConnectStop();
7✔
821
            }
7✔
822
        }
4✔
823

824
        internal MetricsConnectorDecorator(IConnector decoratee) => _decoratee = decoratee;
158✔
825
    }
826

827
    /// <summary>A connector listener accepts a transport connection and returns a <see cref="IConnector" />. The
828
    /// connector is used to refuse the transport connection or obtain a protocol connection once the transport
829
    /// connection is connected.</summary>
830
    private interface IConnectorListener : IAsyncDisposable
831
    {
832
        ServerAddress ServerAddress { get; }
833

834
        Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel);
835
    }
836

837
    /// <summary>A connector is returned by <see cref="IConnectorListener" />. The connector allows to connect the
838
    /// transport connection. If successful, the transport connection can either be refused or accepted by creating the
839
    /// protocol connection out of it.</summary>
840
    private interface IConnector : IAsyncDisposable
841
    {
842
        Task<TransportConnectionInformation> ConnectTransportConnectionAsync(CancellationToken cancellationToken);
843

844
        IProtocolConnection CreateProtocolConnection(TransportConnectionInformation transportConnectionInformation);
845

846
        Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancel);
847
    }
848

849
    private class IceConnectorListener : IConnectorListener
850
    {
851
        public ServerAddress ServerAddress => _listener.ServerAddress;
39✔
852

853
        private readonly IListener<IDuplexConnection> _listener;
854
        private readonly ConnectionOptions _options;
855

856
        public ValueTask DisposeAsync() => _listener.DisposeAsync();
21✔
857

858
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
859
        {
42✔
860
            (IDuplexConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
42✔
861
                cancel).ConfigureAwait(false);
42✔
862
            return (new IceConnector(transportConnection, _options), remoteNetworkAddress);
22✔
863
        }
22✔
864

865
        internal IceConnectorListener(IListener<IDuplexConnection> listener, ConnectionOptions options)
21✔
866
        {
21✔
867
            _listener = listener;
21✔
868
            _options = options;
21✔
869
        }
21✔
870
    }
871

872
    private class IceConnector : IConnector
873
    {
874
        private readonly ConnectionOptions _options;
875
        private IDuplexConnection? _transportConnection;
876

877
        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
878
            CancellationToken cancellationToken) =>
879
            _transportConnection!.ConnectAsync(cancellationToken);
22✔
880

881
        public IProtocolConnection CreateProtocolConnection(
882
            TransportConnectionInformation transportConnectionInformation)
883
        {
19✔
884
            // The protocol connection takes ownership of the transport connection.
885
            var protocolConnection = new IceProtocolConnection(
19✔
886
                _transportConnection!,
19✔
887
                transportConnectionInformation,
19✔
888
                _options);
19✔
889
            _transportConnection = null;
19✔
890
            return protocolConnection;
19✔
891
        }
19✔
892

893
        public ValueTask DisposeAsync()
894
        {
22✔
895
            _transportConnection?.Dispose();
22✔
896
            return new();
22✔
897
        }
22✔
898

899
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken)
900
        {
2✔
901
            _transportConnection!.Dispose();
2✔
902
            return Task.CompletedTask;
2✔
903
        }
2✔
904

905
        internal IceConnector(IDuplexConnection transportConnection, ConnectionOptions options)
22✔
906
        {
22✔
907
            _transportConnection = transportConnection;
22✔
908
            _options = options;
22✔
909
        }
22✔
910
    }
911

912
    private class IceRpcConnectorListener : IConnectorListener
913
    {
914
        public ServerAddress ServerAddress => _listener.ServerAddress;
77✔
915

916
        private readonly IListener<IMultiplexedConnection> _listener;
917
        private readonly ConnectionOptions _options;
918
        private readonly ITaskExceptionObserver? _taskExceptionObserver;
919

920
        public async Task<(IConnector, EndPoint)> AcceptAsync(CancellationToken cancel)
921
        {
303✔
922
            (IMultiplexedConnection transportConnection, EndPoint remoteNetworkAddress) = await _listener.AcceptAsync(
303✔
923
                cancel).ConfigureAwait(false);
303✔
924
            return (new IceRpcConnector(transportConnection, _options, _taskExceptionObserver), remoteNetworkAddress);
57✔
925
        }
57✔
926

927
        public ValueTask DisposeAsync() => _listener.DisposeAsync();
59✔
928

929
        internal IceRpcConnectorListener(
59✔
930
            IListener<IMultiplexedConnection> listener,
59✔
931
            ConnectionOptions options,
59✔
932
            ITaskExceptionObserver? taskExceptionObserver)
59✔
933
        {
59✔
934
            _listener = listener;
59✔
935
            _options = options;
59✔
936
            _taskExceptionObserver = taskExceptionObserver;
59✔
937
        }
59✔
938
    }
939

940
    private class IceRpcConnector : IConnector
941
    {
942
        private readonly ConnectionOptions _options;
943
        private readonly ITaskExceptionObserver? _taskExceptionObserver;
944
        private IMultiplexedConnection? _transportConnection;
945

946
        public Task<TransportConnectionInformation> ConnectTransportConnectionAsync(
947
            CancellationToken cancellationToken) =>
948
            _transportConnection!.ConnectAsync(cancellationToken);
57✔
949

950
        public IProtocolConnection CreateProtocolConnection(
951
            TransportConnectionInformation transportConnectionInformation)
952
        {
49✔
953
            // The protocol connection takes ownership of the transport connection.
954
            var protocolConnection = new IceRpcProtocolConnection(
49✔
955
                _transportConnection!,
49✔
956
                transportConnectionInformation,
49✔
957
                _options,
49✔
958
                _taskExceptionObserver);
49✔
959
            _transportConnection = null;
49✔
960
            return protocolConnection;
49✔
961
        }
49✔
962

963
        public ValueTask DisposeAsync() => _transportConnection?.DisposeAsync() ?? new();
57✔
964

965
        public Task RefuseTransportConnectionAsync(bool serverBusy, CancellationToken cancellationToken) =>
966
            _transportConnection!.CloseAsync(
5✔
967
                serverBusy ? MultiplexedConnectionCloseError.ServerBusy : MultiplexedConnectionCloseError.Refused,
5✔
968
                cancellationToken);
5✔
969

970
        internal IceRpcConnector(
57✔
971
            IMultiplexedConnection transportConnection,
57✔
972
            ConnectionOptions options,
57✔
973
            ITaskExceptionObserver? taskExceptionObserver)
57✔
974
        {
57✔
975
            _transportConnection = transportConnection;
57✔
976
            _options = options;
57✔
977
            _taskExceptionObserver = taskExceptionObserver;
57✔
978
        }
57✔
979
    }
980
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc