• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-dotnet / 386500

18 Mar 2024 02:55PM UTC coverage: 78.206% (-0.04%) from 78.243%
386500

push

CI-PR build

web-flow
Move SaveAllChanges method from SetProperty to OAuthInput (#6757)

26185 of 33482 relevant lines covered (78.21%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.91
/libraries/Microsoft.Bot.Connector.Streaming/Application/StreamingTransportClient.cs
1
// Copyright (c) Microsoft Corporation. All rights reserved.
2
// Licensed under the MIT License.
3

4
using System;
5
using System.Collections.Generic;
6
using System.IO.Pipelines;
7
using System.Threading;
8
using System.Threading.Tasks;
9
using Microsoft.Bot.Connector.Streaming.Session;
10
using Microsoft.Bot.Connector.Streaming.Transport;
11
using Microsoft.Bot.Streaming;
12
using Microsoft.Bot.Streaming.Transport;
13
using Microsoft.Extensions.Logging;
14
using Microsoft.Extensions.Logging.Abstractions;
15

16
namespace Microsoft.Bot.Connector.Streaming.Application
17
{
18
    /// <inheritdoc />
19
    public abstract class StreamingTransportClient : IStreamingTransportClient
20
    {
21
        private readonly string _url;
22
        private readonly RequestHandler _requestHandler;
23
        private readonly TimeSpan _closeTimeout;
24
        private readonly TimeSpan? _keepAlive;
25

26
        private StreamingTransport _transport;
27
        private TransportHandler _application;
28
        private StreamingSession _session;
29

30
        private CancellationTokenSource _disconnectCts;
31
        private volatile bool _disposed = false;
32

33
        /// <summary>
34
        /// Initializes a new instance of the <see cref="StreamingTransportClient"/> class.
35
        /// </summary>
36
        /// <param name="url">The server URL to connect to.</param>
37
        /// <param name="requestHandler">Handler that will receive incoming requests to this client instance.</param>
38
        /// <param name="closeTimeOut">Optional time out for closing the client connection.</param>
39
        /// <param name="keepAlive">Optional spacing between keep alives for proactive disconnection detection. If null is provided, no keep alives will be sent.</param>
40
        /// <param name="logger"><see cref="ILogger"/> for the client.</param>
41
        protected StreamingTransportClient(string url, RequestHandler requestHandler, TimeSpan? closeTimeOut = null, TimeSpan? keepAlive = null, ILogger logger = null)
1✔
42
        {
43
            if (string.IsNullOrEmpty(url))
1✔
44
            {
45
                throw new ArgumentNullException(nameof(url));
×
46
            }
47

48
            _url = url;
1✔
49
            _requestHandler = requestHandler ?? throw new ArgumentNullException(nameof(requestHandler));
×
50
            _closeTimeout = closeTimeOut ?? TimeSpan.FromSeconds(15);
1✔
51
            _keepAlive = keepAlive;
1✔
52
            Logger = logger ?? NullLogger.Instance;
×
53
        }
1✔
54

55
        /// <inheritdoc />
56
        public event DisconnectedEventHandler Disconnected;
57

58
        /// <inheritdoc />
59
        public bool IsConnected { get; private set; } = false;
1✔
60

61
        /// <summary>
62
        /// Gets the <see cref="ILogger"/> instance for the streaming client.
63
        /// </summary>
64
        /// <value>A <see cref="ILogger"/> for the streaming client.</value>
65
        protected ILogger Logger { get; }
1✔
66

67
        /// <inheritdoc />
68
        public async Task ConnectAsync()
69
        {
70
            await ConnectAsync(new Dictionary<string, string>()).ConfigureAwait(false);
1✔
71
        }
×
72

73
        /// <inheritdoc />
74
        public async Task ConnectAsync(IDictionary<string, string> requestHeaders)
75
        {
76
            await ConnectAsync(requestHeaders, CancellationToken.None).ConfigureAwait(false);
1✔
77
        }
×
78

79
        /// <summary>
80
        /// Establish a client connection passing along additional headers, and a cancellation token.
81
        /// </summary>
82
        /// <param name="requestHeaders">Dictionary of header name and header value to be passed during connection. Generally, you will need channelID and Authorization.</param>
83
        /// <param name="cancellationToken"><see cref="CancellationToken"/> for the client connection.</param>
84
        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
85
        public async Task ConnectAsync(IDictionary<string, string> requestHeaders, CancellationToken cancellationToken)
86
        {
87
            await ConnectImplAsync(
1✔
88
                    connectFunc: (transport, connectionStatusChanged) => transport.ConnectAsync(_url, connectionStatusChanged, requestHeaders, cancellationToken),
1✔
89
                    cancellationToken: cancellationToken)
1✔
90
                .ConfigureAwait(false);
1✔
91
        }
×
92

93
        /// <inheritdoc />
94
        public async Task<ReceiveResponse> SendAsync(StreamingRequest message, CancellationToken cancellationToken = default)
95
        {
96
            CheckDisposed();
1✔
97

98
            if (_session == null)
1✔
99
            {
100
                throw new InvalidOperationException("Session not established. Call ConnectAsync() in order to send requests through this client.");
×
101
            }
102

103
            if (message == null)
1✔
104
            {
105
                throw new ArgumentNullException(nameof(message));
×
106
            }
107

108
            return await _session.SendRequestAsync(message, cancellationToken).ConfigureAwait(false);
1✔
109
        }
1✔
110

111
        /// <inheritdoc />
112
        public void Disconnect()
113
        {
114
            CheckDisposed();
1✔
115
            DisconnectAsync().ConfigureAwait(false).GetAwaiter().GetResult();
1✔
116
        }
1✔
117

118
        /// <summary>
119
        /// Disconnects.
120
        /// </summary>
121
        /// <returns>A <see cref="Task"/> representing the asynchronous operation.</returns>
122
        public async Task DisconnectAsync()
123
        {
124
            CheckDisposed();
1✔
125
            await _application.StopAsync().ConfigureAwait(false);
1✔
126
            IsConnected = false;
1✔
127
        }
1✔
128

129
        /// <inheritdoc />
130
        public void Dispose()
131
        {
132
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
133
            Dispose(disposing: true);
1✔
134
            GC.SuppressFinalize(this);
1✔
135
        }
1✔
136

137
        internal async Task ConnectInternalAsync(CancellationToken cancellationToken)
138
        {
139
            await ConnectImplAsync(
1✔
140
                    connectFunc: (transport, connectionStatusChanged) => transport.ConnectAsync(connectionStatusChanged, cancellationToken),
1✔
141
                    cancellationToken: cancellationToken)
1✔
142
                .ConfigureAwait(false);
1✔
143
        }
1✔
144

145
        internal abstract StreamingTransport CreateStreamingTransport(IDuplexPipe application);
146

147
        /// <summary>
148
        /// Disposes objects used by the <see cref="StreamingTransportClient"/>.
149
        /// </summary>
150
        /// <param name="disposing">Whether called from a Dispose method (its value is true), or, from a finalizer (its value is false).</param>
151
        /// <remarks>
152
        /// The disposing parameter should be false when called from a finalizer, and true when called from the IDisposable.Dispose method.
153
        /// In other words, it is true when deterministically called, and false when non-deterministically called.
154
        /// </remarks>
155
        protected virtual void Dispose(bool disposing)
156
        {
157
            if (_disposed)
1✔
158
            {
159
                return;
×
160
            }
161

162
            if (disposing)
1✔
163
            {
164
                try
165
                {
166
                    Disconnect();
1✔
167
                    _disconnectCts.Cancel();
1✔
168
                }
1✔
169
                finally
170
                {
171
                    _transport.Dispose();
1✔
172
                    _application.Dispose();
1✔
173
                    _disconnectCts.Dispose();
1✔
174
                }
1✔
175
            }
176

177
            _disposed = true;
1✔
178
        }
1✔
179

180
        private static bool IsSuccessResponse(ReceiveResponse response)
181
        {
182
            return response != null && response.StatusCode >= 200 && response.StatusCode <= 299;
×
183
        }
184

185
        private async Task ConnectImplAsync(Func<StreamingTransport, Action<bool>, Task> connectFunc, CancellationToken cancellationToken)
186
        {
187
            CheckDisposed();
1✔
188

189
            _transport?.Dispose();
×
190
            _application?.Dispose();
×
191

192
            TimerAwaitable timer = null;
1✔
193
            Task timerTask = null;
1✔
194

195
            try
196
            {
197
                // Pipes
198
                var duplexPipePair = DuplexPipe.CreateConnectionPair(PipeOptions.Default, PipeOptions.Default);
1✔
199

200
                // Transport
201
                _transport = CreateStreamingTransport(duplexPipePair.Application);
1✔
202

203
                // Application
204
                _application = new TransportHandler(duplexPipePair.Transport, Logger);
1✔
205

206
                // Session
207
                _session = new StreamingSession(_requestHandler, _application, Logger, cancellationToken);
1✔
208

209
                // Set up cancellation
210
                _disconnectCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
1✔
211

212
                // Start transport and application
213
                var transportTask = connectFunc(_transport, connect => IsConnected = connect);
1✔
214
                var applicationTask = _application.ListenAsync(_disconnectCts.Token);
1✔
215
                var combinedTask = Task.WhenAll(transportTask, applicationTask);
1✔
216

217
                Log.ClientStarted(Logger, _url);
1✔
218

219
                // Periodic task: keep alive
220
                // Disposed with `timer.Stop()` in the finally block below
221
                if (_keepAlive.HasValue)
1✔
222
                {
223
                    timer = new TimerAwaitable(_keepAlive.Value, _keepAlive.Value);
1✔
224
                    timerTask = TimerLoopAsync(timer);
1✔
225
                }
226

227
                // Block until transport or application ends.
228
                await combinedTask.ConfigureAwait(false);
1✔
229

230
                // Signal that we're done
231
                _disconnectCts.Cancel();
1✔
232
                Log.ClientTransportApplicationCompleted(Logger, _url);
1✔
233
            }
234
            finally
235
            {
236
                timer?.Stop();
×
237

238
                if (timerTask != null)
1✔
239
                {
240
                    await timerTask.ConfigureAwait(false);
×
241
                }
242
            }
243

244
            Log.ClientCompleted(Logger, _url);
1✔
245
        }
1✔
246

247
        private async Task TimerLoopAsync(TimerAwaitable timer)
248
        {
249
            timer.Start();
1✔
250

251
            using (timer)
1✔
252
            {
253
                // await returns True until `timer.Stop()` is called in the `finally` block of `ReceiveLoop`
254
                while (await timer)
1✔
255
                {
256
                    try
257
                    {
258
                        // Ping server
259
                        var response = await SendAsync(StreamingRequest.CreateGet("/api/version"), _disconnectCts.Token).ConfigureAwait(false);
1✔
260

261
                        if (!IsSuccessResponse(response))
1✔
262
                        {
263
                            Log.ClientKeepAliveFail(Logger, _url, response.StatusCode);
×
264

265
                            IsConnected = false;
×
266

267
                            Disconnected?.Invoke(this, new DisconnectedEventArgs() { Reason = $"Received failure from server heartbeat: {response.StatusCode}." });
×
268
                        }
269
                        else
270
                        {
271
                            Log.ClientKeepAliveSucceed(Logger, _url);
1✔
272
                        }
273
                    }
1✔
274
#pragma warning disable CA1031 // Do not catch general exception types
275
                    catch (Exception e)
×
276
#pragma warning restore CA1031 // Do not catch general exception types
277
                    {
278
                        Log.ClientKeepAliveFail(Logger, _url, 0, e);
×
279
                        IsConnected = false;
×
280
                        Disconnected?.Invoke(this, new DisconnectedEventArgs() { Reason = $"Received failure from server heartbeat: {e}." });
×
281
                    }
×
282
                }
283
            }
×
284
        }
×
285

286
        private void CheckDisposed()
287
        {
288
            if (_disposed)
1✔
289
            {
290
                throw new ObjectDisposedException(GetType().FullName);
×
291
            }
292
        }
1✔
293

294
        /// <summary>
295
        /// Log messages for <see cref="StreamingTransport"/>.
296
        /// </summary>
297
        /// <remarks>
298
        /// Messages implemented using <see cref="LoggerMessage.Define(LogLevel, EventId, string)"/> to maximize performance.
299
        /// For more information, see https://docs.microsoft.com/en-us/aspnet/core/fundamentals/logging/loggermessage?view=aspnetcore-5.0.
300
        /// </remarks>
301
        private static class Log
302
        {
303
            private static readonly Action<ILogger, string, Exception> _clientStarted = LoggerMessage.Define<string>(
1✔
304
                LogLevel.Information, new EventId(1, nameof(ClientStarted)), "Streaming transport client connected to {string}.");
1✔
305

306
            private static readonly Action<ILogger, string, Exception> _clientCompleted = LoggerMessage.Define<string>(
1✔
307
                LogLevel.Information, new EventId(2, nameof(ClientCompleted)), "Streaming transport client connection to {string} closed.");
1✔
308

309
            private static readonly Action<ILogger, string, Exception> _clientKeepAliveSucceed = LoggerMessage.Define<string>(
1✔
310
                LogLevel.Debug, new EventId(3, nameof(ClientKeepAliveSucceed)), "Streaming transport client heartbeat to {string} succeeded.");
1✔
311

312
            private static readonly Action<ILogger, string, int, Exception> _clientKeepAliveFail = LoggerMessage.Define<string, int>(
1✔
313
                LogLevel.Error, new EventId(4, nameof(ClientKeepAliveFail)), "Streaming transport client heartbeat to {string} failed with status code {int}.");
1✔
314

315
            private static readonly Action<ILogger, string, Exception> _clientTransportApplicationCompleted = LoggerMessage.Define<string>(
1✔
316
                LogLevel.Debug, new EventId(5, nameof(ClientTransportApplicationCompleted)), "Streaming transport client to {string} completed transport and application tasks.");
1✔
317

318
            public static void ClientStarted(ILogger logger, string url) => _clientStarted(logger, url ?? string.Empty, null);
×
319

320
            public static void ClientCompleted(ILogger logger, string url) => _clientCompleted(logger, url ?? string.Empty, null);
×
321

322
            public static void ClientKeepAliveSucceed(ILogger logger, string url) => _clientKeepAliveSucceed(logger, url ?? string.Empty, null);
×
323

324
            public static void ClientKeepAliveFail(ILogger logger, string url, int statusCode = 0, Exception e = null) => _clientKeepAliveFail(logger, url ?? string.Empty, statusCode, e);
×
325

326
            public static void ClientTransportApplicationCompleted(ILogger logger, string url) => _clientTransportApplicationCompleted(logger, url ?? string.Empty, null);
×
327
        }
328
    }
329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc