• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-dotnet / 388024

11 Apr 2024 01:17PM UTC coverage: 78.163% (-0.02%) from 78.183%
388024

push

CI-PR build

web-flow
Update TimeoutException message (#6773)

26183 of 33498 relevant lines covered (78.16%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.83
/libraries/Microsoft.Bot.Connector.Streaming/Application/StreamingTransportClient.cs
1
// Copyright (c) Microsoft Corporation. All rights reserved.
2
// Licensed under the MIT License.
3

4
using System;
5
using System.Collections.Generic;
6
using System.IO.Pipelines;
7
using System.Threading;
8
using System.Threading.Tasks;
9
using Microsoft.Bot.Connector.Streaming.Session;
10
using Microsoft.Bot.Connector.Streaming.Transport;
11
using Microsoft.Bot.Streaming;
12
using Microsoft.Bot.Streaming.Transport;
13
using Microsoft.Extensions.Logging;
14
using Microsoft.Extensions.Logging.Abstractions;
15

16
namespace Microsoft.Bot.Connector.Streaming.Application
17
{
18
    /// <inheritdoc />
19
    public abstract class StreamingTransportClient : IStreamingTransportClient
20
    {
21
        private readonly string _url;
22
        private readonly RequestHandler _requestHandler;
23
        private readonly TimeSpan _closeTimeout;
24
        private readonly TimeSpan? _keepAlive;
25

26
        private StreamingTransport _transport;
27
        private TransportHandler _application;
28
        private StreamingSession _session;
29

30
        private CancellationTokenSource _disconnectCts;
31
        private volatile bool _disposed = false;
32

33
        /// <summary>
34
        /// Initializes a new instance of the <see cref="StreamingTransportClient"/> class.
35
        /// </summary>
36
        /// <param name="url">The server URL to connect to.</param>
37
        /// <param name="requestHandler">Handler that will receive incoming requests to this client instance.</param>
38
        /// <param name="closeTimeOut">Optional time out for closing the client connection.</param>
39
        /// <param name="keepAlive">Optional spacing between keep alives for proactive disconnection detection. If null is provided, no keep alives will be sent.</param>
40
        /// <param name="logger"><see cref="ILogger"/> for the client.</param>
41
        protected StreamingTransportClient(string url, RequestHandler requestHandler, TimeSpan? closeTimeOut = null, TimeSpan? keepAlive = null, ILogger logger = null)
1✔
42
        {
43
            if (string.IsNullOrEmpty(url))
1✔
44
            {
45
                throw new ArgumentNullException(nameof(url));
×
46
            }
47

48
            _url = url;
1✔
49
            _requestHandler = requestHandler ?? throw new ArgumentNullException(nameof(requestHandler));
×
50
            _closeTimeout = closeTimeOut ?? TimeSpan.FromSeconds(15);
1✔
51
            _keepAlive = keepAlive;
1✔
52
            Logger = logger ?? NullLogger.Instance;
×
53
        }
1✔
54

55
        /// <inheritdoc />
56
        public event DisconnectedEventHandler Disconnected;
57

58
        /// <inheritdoc />
59
        public bool IsConnected { get; private set; } = false;
1✔
60

61
        /// <summary>
62
        /// Gets the <see cref="ILogger"/> instance for the streaming client.
63
        /// </summary>
64
        /// <value>A <see cref="ILogger"/> for the streaming client.</value>
65
        protected ILogger Logger { get; }
1✔
66

67
        /// <inheritdoc />
68
        public async Task ConnectAsync()
69
        {
70
            await ConnectAsync(new Dictionary<string, string>()).ConfigureAwait(false);
1✔
71
        }
×
72

73
        /// <inheritdoc />
74
        public async Task ConnectAsync(IDictionary<string, string> requestHeaders)
75
        {
76
            await ConnectAsync(requestHeaders, CancellationToken.None).ConfigureAwait(false);
1✔
77
        }
×
78

79
        /// <summary>
80
        /// Establish a client connection passing along additional headers, and a cancellation token.
81
        /// </summary>
82
        /// <param name="requestHeaders">Dictionary of header name and header value to be passed during connection. Generally, you will need channelID and Authorization.</param>
83
        /// <param name="cancellationToken"><see cref="CancellationToken"/> for the client connection.</param>
84
        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
85
        public async Task ConnectAsync(IDictionary<string, string> requestHeaders, CancellationToken cancellationToken)
86
        {
87
            await ConnectImplAsync(
1✔
88
                    connectFunc: (transport, connectionStatusChanged) => transport.ConnectAsync(_url, connectionStatusChanged, requestHeaders, cancellationToken),
1✔
89
                    cancellationToken: cancellationToken)
1✔
90
                .ConfigureAwait(false);
1✔
91
        }
×
92

93
        /// <inheritdoc />
94
        public async Task<ReceiveResponse> SendAsync(StreamingRequest message, CancellationToken cancellationToken = default)
95
        {
96
            CheckDisposed();
1✔
97

98
            if (_session == null)
1✔
99
            {
100
                throw new InvalidOperationException("Session not established. Call ConnectAsync() in order to send requests through this client.");
×
101
            }
102

103
            if (message == null)
1✔
104
            {
105
                throw new ArgumentNullException(nameof(message));
×
106
            }
107

108
            try
109
            {
110
                return await _session.SendRequestAsync(message, cancellationToken).ConfigureAwait(false);
1✔
111
            }
112
            catch (TimeoutException ex)
×
113
            {
114
                var timeoutMessage = $"The underlying connection has been disconnected, and the request has timed out after waiting {TaskExtensions.DefaultTimeout.Seconds} seconds for a response.";
×
115
                if (IsConnected)
×
116
                {
117
                    timeoutMessage = $"The request sent to the underlying connection has timed out after waiting {TaskExtensions.DefaultTimeout.Seconds} seconds for a response.";
×
118
                }
119

120
                throw new OperationCanceledException(timeoutMessage, ex, cancellationToken);
×
121
            }
122
        }
1✔
123

124
        /// <inheritdoc />
125
        public void Disconnect()
126
        {
127
            CheckDisposed();
1✔
128
            DisconnectAsync().ConfigureAwait(false).GetAwaiter().GetResult();
1✔
129
        }
1✔
130

131
        /// <summary>
132
        /// Disconnects.
133
        /// </summary>
134
        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
135
        public async Task DisconnectAsync()
136
        {
137
            CheckDisposed();
1✔
138
            await _application.StopAsync().ConfigureAwait(false);
1✔
139
            IsConnected = false;
1✔
140
        }
1✔
141

142
        /// <inheritdoc />
143
        public void Dispose()
144
        {
145
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
146
            Dispose(disposing: true);
1✔
147
            GC.SuppressFinalize(this);
1✔
148
        }
1✔
149

150
        internal async Task ConnectInternalAsync(CancellationToken cancellationToken)
151
        {
152
            await ConnectImplAsync(
1✔
153
                    connectFunc: (transport, connectionStatusChanged) => transport.ConnectAsync(connectionStatusChanged, cancellationToken),
1✔
154
                    cancellationToken: cancellationToken)
1✔
155
                .ConfigureAwait(false);
1✔
156
        }
1✔
157

158
        internal abstract StreamingTransport CreateStreamingTransport(IDuplexPipe application);
159

160
        /// <summary>
161
        /// Disposes objects used by the <see cref="StreamingTransportClient"/>.
162
        /// </summary>
163
        /// <param name="disposing">Whether called from a Dispose method (its value is true), or, from a finalizer (its value is false).</param>
164
        /// <remarks>
165
        /// The disposing parameter should be false when called from a finalizer, and true when called from the IDisposable.Dispose method.
166
        /// In other words, it is true when deterministically called, and false when non-deterministically called.
167
        /// </remarks>
168
        protected virtual void Dispose(bool disposing)
169
        {
170
            if (_disposed)
1✔
171
            {
172
                return;
×
173
            }
174

175
            if (disposing)
1✔
176
            {
177
                try
178
                {
179
                    Disconnect();
1✔
180
                    _disconnectCts.Cancel();
1✔
181
                }
1✔
182
                finally
183
                {
184
                    _transport.Dispose();
1✔
185
                    _application.Dispose();
1✔
186
                    _disconnectCts.Dispose();
1✔
187
                }
1✔
188
            }
189

190
            _disposed = true;
1✔
191
        }
1✔
192

193
        private static bool IsSuccessResponse(ReceiveResponse response)
194
        {
195
            return response != null && response.StatusCode >= 200 && response.StatusCode <= 299;
×
196
        }
197

198
        private async Task ConnectImplAsync(Func<StreamingTransport, Action<bool>, Task> connectFunc, CancellationToken cancellationToken)
199
        {
200
            CheckDisposed();
1✔
201

202
            _transport?.Dispose();
×
203
            _application?.Dispose();
×
204

205
            TimerAwaitable timer = null;
1✔
206
            Task timerTask = null;
1✔
207

208
            try
209
            {
210
                // Pipes
211
                var duplexPipePair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
1✔
212

213
                // Transport
214
                _transport = CreateStreamingTransport(duplexPipePair.Application);
1✔
215

216
                // Application
217
                _application = new TransportHandler(duplexPipePair.Transport, Logger);
1✔
218

219
                // Session
220
                _session = new StreamingSession(_requestHandler, _application, Logger, cancellationToken);
1✔
221

222
                // Set up cancellation
223
                _disconnectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1✔
224

225
                // Start transport and application
226
                var transportTask = connectFunc(_transport, connect => IsConnected = connect);
1✔
227
                var applicationTask = _application.ListenAsync(_disconnectCts.Token);
1✔
228
                var combinedTask = Task.WhenAll(transportTask, applicationTask);
1✔
229

230
                Log.ClientStarted(Logger, _url);
1✔
231

232
                // Periodic task: keep alive
233
                // Disposed with `timer.Stop()` in the finally block below
234
                if (_keepAlive.HasValue)
1✔
235
                {
236
                    timer = new TimerAwaitable(_keepAlive.Value, _keepAlive.Value);
1✔
237
                    timerTask = TimerLoopAsync(timer);
1✔
238
                }
239

240
                // Block until transport or application ends.
241
                await combinedTask.ConfigureAwait(false);
1✔
242

243
                // Signal that we're done
244
                _disconnectCts.Cancel();
1✔
245
                Log.ClientTransportApplicationCompleted(Logger, _url);
1✔
246
            }
247
            finally
248
            {
249
                timer?.Stop();
×
250

251
                if (timerTask != null)
1✔
252
                {
253
                    await timerTask.ConfigureAwait(false);
×
254
                }
255
            }
256

257
            Log.ClientCompleted(Logger, _url);
1✔
258
        }
1✔
259

260
        private async Task TimerLoopAsync(TimerAwaitable timer)
261
        {
262
            timer.Start();
1✔
263

264
            using (timer)
1✔
265
            {
266
                // await returns True until `timer.Stop()` is called in the `finally` block of `ReceiveLoop`
267
                while (await timer)
1✔
268
                {
269
                    try
270
                    {
271
                        // Ping server
272
                        var response = await SendAsync(StreamingRequest.CreateGet("/api/version"), _disconnectCts.Token).ConfigureAwait(false);
1✔
273

274
                        if (!IsSuccessResponse(response))
1✔
275
                        {
276
                            Log.ClientKeepAliveFail(Logger, _url, response.StatusCode);
×
277

278
                            IsConnected = false;
×
279

280
                            Disconnected?.Invoke(this, new DisconnectedEventArgs() { Reason = $"Received failure from server heartbeat: {response.StatusCode}." });
×
281
                        }
282
                        else
283
                        {
284
                            Log.ClientKeepAliveSucceed(Logger, _url);
1✔
285
                        }
286
                    }
1✔
287
#pragma warning disable CA1031 // Do not catch general exception types
288
                    catch (Exception e)
×
289
#pragma warning restore CA1031 // Do not catch general exception types
290
                    {
291
                        Log.ClientKeepAliveFail(Logger, _url, 0, e);
×
292
                        IsConnected = false;
×
293
                        Disconnected?.Invoke(this, new DisconnectedEventArgs() { Reason = $"Received failure from server heartbeat: {e}." });
×
294
                    }
×
295
                }
296
            }
×
297
        }
×
298

299
        private void CheckDisposed()
300
        {
301
            if (_disposed)
1✔
302
            {
303
                throw new ObjectDisposedException(GetType().FullName);
×
304
            }
305
        }
1✔
306

307
        /// <summary>
308
        /// Log messages for <see cref="StreamingTransport"/>.
309
        /// </summary>
310
        /// <remarks>
311
        /// Messages implemented using <see cref="LoggerMessage.Define(LogLevel, EventId, string)"/> to maximize performance.
312
        /// For more information, see https://docs.microsoft.com/en-us/aspnet/core/fundamentals/logging/loggermessage?view=aspnetcore-5.0.
313
        /// </remarks>
314
        private static class Log
315
        {
316
            private static readonly Action<ILogger, string, Exception> _clientStarted = LoggerMessage.Define<string>(
1✔
317
                LogLevel.Information, new EventId(1, nameof(ClientStarted)), "Streaming transport client connected to {string}.");
1✔
318

319
            private static readonly Action<ILogger, string, Exception> _clientCompleted = LoggerMessage.Define<string>(
1✔
320
                LogLevel.Information, new EventId(2, nameof(ClientCompleted)), "Streaming transport client connection to {string} closed.");
1✔
321

322
            private static readonly Action<ILogger, string, Exception> _clientKeepAliveSucceed = LoggerMessage.Define<string>(
1✔
323
                LogLevel.Debug, new EventId(3, nameof(ClientKeepAliveSucceed)), "Streaming transport client heartbeat to {string} succeeded.");
1✔
324

325
            private static readonly Action<ILogger, string, int, Exception> _clientKeepAliveFail = LoggerMessage.Define<string, int>(
1✔
326
                LogLevel.Error, new EventId(4, nameof(ClientKeepAliveFail)), "Streaming transport client heartbeat to {string} failed with status code {int}.");
1✔
327

328
            private static readonly Action<ILogger, string, Exception> _clientTransportApplicationCompleted = LoggerMessage.Define<string>(
1✔
329
                LogLevel.Debug, new EventId(5, nameof(ClientTransportApplicationCompleted)), "Streaming transport client to {string} completed transport and application tasks.");
1✔
330

331
            public static void ClientStarted(ILogger logger, string url) => _clientStarted(logger, url ?? string.Empty, null);
×
332

333
            public static void ClientCompleted(ILogger logger, string url) => _clientCompleted(logger, url ?? string.Empty, null);
×
334

335
            public static void ClientKeepAliveSucceed(ILogger logger, string url) => _clientKeepAliveSucceed(logger, url ?? string.Empty, null);
×
336

337
            public static void ClientKeepAliveFail(ILogger logger, string url, int statusCode = 0, Exception e = null) => _clientKeepAliveFail(logger, url ?? string.Empty, statusCode, e);
×
338

339
            public static void ClientTransportApplicationCompleted(ILogger logger, string url) => _clientTransportApplicationCompleted(logger, url ?? string.Empty, null);
×
340
        }
341
    }
342
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc