• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-dotnet / 363877

10 Aug 2023 08:52PM UTC coverage: 79.092% (+0.1%) from 78.979%
363877

Pull #6655

CI-PR build

web-flow
Merge 94ad1d11f into fdaed8b69
Pull Request #6655: Implementation of Teams batch APIs

26094 of 32992 relevant lines covered (79.09%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.88
/libraries/Microsoft.Bot.Builder/Streaming/BotFrameworkHttpAdapterBase.cs
1
// Copyright (c) Microsoft Corporation.
2
// Licensed under the MIT License.
3

4
using System;
5
using System.Collections.Generic;
6
using System.Linq;
7
using System.Net;
8
using System.Net.Http;
9
using System.Net.WebSockets;
10
using System.Security.Claims;
11
using System.Security.Principal;
12
using System.Threading;
13
using System.Threading.Tasks;
14
using Microsoft.Bot.Connector;
15
using Microsoft.Bot.Connector.Authentication;
16
using Microsoft.Bot.Connector.Streaming.Application;
17
using Microsoft.Bot.Schema;
18
using Microsoft.Extensions.Logging;
19
using Microsoft.Rest.TransientFaultHandling;
20

21
namespace Microsoft.Bot.Builder.Streaming
22
{
23
    /// <summary>
24
    /// An HTTP adapter base class.
25
    /// </summary>
26
    /// <remarks>
27
    /// BotFrameworkAdapter is still supported but the recommended adapter is `CloudAdapter`.
28
    /// </remarks>
29
    public class BotFrameworkHttpAdapterBase : BotFrameworkAdapter, IStreamingActivityProcessor, IDisposable
30
    {
31
        private bool _disposedValue;
32

33
        /// <summary>
34
        /// Initializes a new instance of the <see cref="BotFrameworkHttpAdapterBase"/> class.
35
        /// </summary>
36
        /// <param name="credentialProvider">The credential provider.</param>
37
        /// <param name="authConfig">The authentication configuration.</param>
38
        /// <param name="channelProvider">The channel provider.</param>
39
        /// <param name="connectorClientRetryPolicy">Retry policy for retyring HTTP operations.</param>
40
        /// <param name="customHttpClient">The HTTP client.</param>
41
        /// <param name="middleware">The middleware to initially add to the adapter.</param>
42
        /// <param name="logger">The ILogger implementation this adapter should use.</param>
43
        public BotFrameworkHttpAdapterBase(
44
            ICredentialProvider credentialProvider,
45
            AuthenticationConfiguration authConfig,
46
            IChannelProvider channelProvider = null,
47
            RetryPolicy connectorClientRetryPolicy = null,
48
            HttpClient customHttpClient = null,
49
            IMiddleware middleware = null,
50
            ILogger logger = null)
51
            : base(credentialProvider, authConfig, channelProvider, connectorClientRetryPolicy, customHttpClient, middleware, logger)
1✔
52
        {
53
        }
1✔
54

55
        /// <summary>
56
        /// Initializes a new instance of the <see cref="BotFrameworkHttpAdapterBase"/> class.
57
        /// </summary>
58
        /// <param name="credentialProvider">The credential provider.</param>
59
        /// <param name="channelProvider">The channel provider.</param>
60
        /// <param name="logger">The ILogger implementation this adapter should use.</param>
61
        public BotFrameworkHttpAdapterBase(ICredentialProvider credentialProvider = null, IChannelProvider channelProvider = null, ILogger<BotFrameworkHttpAdapterBase> logger = null)
62
            : this(credentialProvider ?? new SimpleCredentialProvider(), new AuthenticationConfiguration(), channelProvider, null, null, null, logger)
×
63
        {
64
        }
1✔
65

66
        /// <summary>
67
        /// Initializes a new instance of the <see cref="BotFrameworkHttpAdapterBase"/> class.
68
        /// </summary>
69
        /// <param name="credentialProvider">The credential provider.</param>
70
        /// <param name="channelProvider">The channel provider.</param>
71
        /// <param name="httpClient">The HTTP client.</param>
72
        /// <param name="logger">The ILogger implementation this adapter should use.</param>
73
        public BotFrameworkHttpAdapterBase(ICredentialProvider credentialProvider, IChannelProvider channelProvider, HttpClient httpClient, ILogger<BotFrameworkHttpAdapterBase> logger)
74
            : this(credentialProvider ?? new SimpleCredentialProvider(), new AuthenticationConfiguration(), channelProvider, null, httpClient, null, logger)
×
75
        {
76
        }
×
77

78
        /// <summary>
79
        /// Gets or sets the bot connected to this adapter.
80
        /// </summary>
81
        /// <value>
82
        /// The bot connected to this adapter.
83
        /// </value>
84
        protected IBot ConnectedBot { get; set; }
1✔
85

86
        /// <summary>
87
        /// Gets or sets the claims identity for this adapter.
88
        /// </summary>
89
        /// <value>
90
        /// The claims identity for this adapter.
91
        /// </value>
92
        protected ClaimsIdentity ClaimsIdentity { get; set; }
×
93

94
        /// <summary>
95
        /// Gets or sets the request handlers for this adapter.
96
        /// </summary>
97
        /// <value>
98
        /// The request handlers for this adapter.
99
        /// </value>
100
#pragma warning disable CA2227 // Collection properties should be read only (we can't change this without breaking binary compat)
101
        protected IList<StreamingRequestHandler> RequestHandlers { get; set; } = new List<StreamingRequestHandler>();
×
102
#pragma warning restore CA2227 // Collection properties should be read only
103

104
        /// <summary>
105
        /// Primary adapter method for processing activities sent from streaming channel.
106
        /// Creates a turn context and runs the middleware pipeline for an incoming activity.
107
        /// Throws <see cref="ArgumentNullException"/> on null arguments.
108
        /// </summary>
109
        /// <param name="activity">The <see cref="Activity"/> to process.</param>
110
        /// <param name="callbackHandler">The <see cref="BotCallbackHandler"/> that will handle the activity.</param>
111
        /// <param name="cancellationToken">A cancellation token that can be used by other objects
112
        /// or threads to receive notice of cancellation.</param>
113
        /// <returns>A task that represents the work queued to execute. If the activity type
114
        /// was 'Invoke' and the corresponding key (channelId + activityId) was found
115
        /// then an InvokeResponse is returned, otherwise null is returned.</returns>
116
        /// <remarks>Call this method to reactively send a message to a conversation.
117
        /// If the task completes successfully, then if the activity's <see cref="Activity.Type"/>
118
        /// is <see cref="ActivityTypes.Invoke"/> and the corresponding key
119
        /// (<see cref="Activity.ChannelId"/> + <see cref="Activity.Id"/>) is found
120
        /// then an <see cref="InvokeResponse"/> is returned, otherwise null is returned.
121
        /// <para>This method registers the following services for the turn.<list type="bullet"/></para>
122
        /// </remarks>
123
        public async Task<InvokeResponse> ProcessStreamingActivityAsync(Activity activity, BotCallbackHandler callbackHandler, CancellationToken cancellationToken = default)
124
        {
125
            BotAssert.ActivityNotNull(activity);
1✔
126

127
            Logger.LogInformation($"Received an incoming streaming activity. ActivityId: {activity.Id}");
1✔
128
            
129
            // If a StreamingRequestHandler.Audience is a null value, then no callerId should have been generated
130
            // and GetAudienceFromCallerId returns null.
131
            // Thus we fallback to relying on the "original key", essentially $"{ServiceUrl}{Conversation.Id}",
132
            // as opposed to $"{ServiceUrl}{Audience}{Conversation.Id}" and the StreamingRequestHandler implicitly does not support skills.
133
            var audience = GetAudienceFromCallerId(activity);
1✔
134

135
            // If a conversation has moved from one connection to another for the same Channel or Skill and
136
            // hasn't been forgotten by the previous StreamingRequestHandler. The last requestHandler
137
            // the conversation has been associated with should always be the active connection.
138
            var requestHandler = RequestHandlers.Where(
1✔
139
                h => h.ServiceUrl == activity.ServiceUrl
1✔
140
                    && h.Audience == audience
1✔
141
                    && h.HasConversation(activity.Conversation.Id))
1✔
142
                .LastOrDefault();
1✔
143
            using (var context = new TurnContext(this, activity))
1✔
144
            {
145
                // TurnContextStateCollection applies a null check on value when using TurnContextStateCollection.Add().
146
                // TurnContextStateCollection.Set() doesn't perform the same null check, and is used in its place.
147
                // See https://github.com/microsoft/botbuilder-dotnet/issues/5110 for more information.
148
                context.TurnState.Set<string>(OAuthScopeKey, audience);
1✔
149

150
                // Pipes are unauthenticated. Pending to check that we are in pipes right now. Do not merge to master without that.
151
                if (ClaimsIdentity != null)
1✔
152
                {
153
                    context.TurnState.Add<IIdentity>(BotIdentityKey, ClaimsIdentity);
×
154
                }
155

156
                using (var connectorClient = CreateStreamingConnectorClient(activity, requestHandler))
1✔
157
                {
158
                    // Add connector client to be used throughout the turn
159
                    context.TurnState.Add(connectorClient);
1✔
160

161
                    await RunPipelineAsync(context, callbackHandler, cancellationToken).ConfigureAwait(false);
1✔
162

163
                    // Cleanup connector client 
164
                    context.TurnState.Set<IConnectorClient>(null);
1✔
165
                }
1✔
166

167
                if (activity.Type == ActivityTypes.Invoke)
1✔
168
                {
169
                    var activityInvokeResponse = context.TurnState.Get<Activity>(InvokeResponseKey);
×
170
                    if (activityInvokeResponse == null)
×
171
                    {
172
                        return new InvokeResponse { Status = (int)HttpStatusCode.NotImplemented };
×
173
                    }
174
                    else
175
                    {
176
                        return (InvokeResponse)activityInvokeResponse.Value;
×
177
                    }
178
                }
179

180
                return null;
1✔
181
            }
182
        }
1✔
183

184
        /// <summary>
185
        /// Sends an activity.
186
        /// </summary>
187
        /// <param name="activity">>The <see cref="Activity"/> to send.</param>
188
        /// <param name="cancellationToken">A cancellation token that can be used by other objects
189
        /// or threads to receive notice of cancellation.</param>
190
        /// <returns>A task representing the asynchronous operation.</returns>
191
        /// <remarks>If the task completes successfully, the result contains a the resource response object.</remarks>
192
        public async Task<ResourceResponse> SendStreamingActivityAsync(Activity activity, CancellationToken cancellationToken = default)
193
        {
194
            // Check to see if any of this adapter's StreamingRequestHandlers is associated with this conversation.
195
            var possibleHandlers = RequestHandlers.Where(x => x.ServiceUrl == activity.ServiceUrl).Where(y => y.HasConversation(activity.Conversation.Id));
1✔
196

197
            if (possibleHandlers.Any())
1✔
198
            {
199
                if (possibleHandlers.Count() > 1)
1✔
200
                {
201
                    // The conversation has moved to a new connection and the former StreamingRequestHandler needs to be told to forget about it.
202
                    var correctHandler = possibleHandlers.OrderBy(x => x.ConversationAddedTime(activity.Conversation.Id)).Last();
×
203
                    foreach (var handler in possibleHandlers)
×
204
                    {
205
                        if (handler != correctHandler)
×
206
                        {
207
                            handler.ForgetConversation(activity.Conversation.Id);
×
208
                        }
209
                    }
210

211
                    return await correctHandler.SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
×
212
                }
213

214
                return await possibleHandlers.First().SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
1✔
215
            }
216

217
            if (ConnectedBot != null)
1✔
218
            {
219
                // This is a proactive message that will need a new streaming connection opened.
220
                // The ServiceUrl of a streaming connection follows the pattern "urn:[ChannelName]:[Protocol]:[Host]".
221
#pragma warning disable CA2000 // Dispose objects before losing scope (we can't fix this without closing the socket connection, this should be addressed after we make StreamingRequestHandler disposable and we dispose the connector )
222
                var connection = new ClientWebSocket();
×
223
#pragma warning restore CA2000 // Dispose objects before losing scope
224
                var uri = activity.ServiceUrl.Split(':');
×
225
                var protocol = uri[uri.Length - 2];
×
226
                var host = uri[uri.Length - 1];
×
227
                await connection.ConnectAsync(new Uri(protocol + host + "/api/messages"), cancellationToken).ConfigureAwait(false);
×
228

229
#pragma warning disable CA2000 // Dispose objects before losing scope (We'll dispose this when the adapter gets disposed or when elements are removed)
230
                var handler = new StreamingRequestHandler(ConnectedBot, this, connection, Logger);
×
231
#pragma warning restore CA2000 // Dispose objects before losing scope
232

233
                if (RequestHandlers == null)
×
234
                {
235
                    RequestHandlers = new List<StreamingRequestHandler>();
×
236
                }
237

238
                RequestHandlers.Add(handler);
×
239

240
                return await handler.SendActivityAsync(activity, cancellationToken).ConfigureAwait(false);
×
241
            }
242

243
            return null;
1✔
244
        }
1✔
245

246
        /// <summary>
247
        /// Creates a new StreamingRequestHandler to listen to the specified Named Pipe
248
        /// and pass requests to this adapter.
249
        /// </summary>
250
        /// <param name="pipeName">The name of the Named Pipe to connect to.</param>
251
        /// <param name="bot">The bot to use when processing activities received over the Named Pipe.</param>
252
        /// <param name="audience">The specified recipient of all outgoing activities.</param>
253
        /// <returns>A task that completes only once the StreamingRequestHandler has stopped listening
254
        /// for incoming requests on the Named Pipe.</returns>
255
        public async Task ConnectNamedPipeAsync(string pipeName, IBot bot, string audience = null)
256
        {
257
            if (string.IsNullOrEmpty(pipeName))
×
258
            {
259
                throw new ArgumentNullException(nameof(pipeName));
×
260
            }
261

262
            ConnectedBot = bot ?? throw new ArgumentNullException(nameof(bot));
×
263
            ClaimsIdentity = ClaimsIdentity ?? new ClaimsIdentity();
×
264

265
            if (RequestHandlers == null)
×
266
            {
267
                RequestHandlers = new List<StreamingRequestHandler>();
×
268
            }
269

270
#pragma warning disable CA2000 // Dispose objects before losing scope (We'll dispose this when the adapter gets disposed or when elements are removed)
271
            var requestHandler = new StreamingRequestHandler(bot, this, new LegacyStreamingConnection(pipeName, Logger), audience, Logger);
×
272
#pragma warning restore CA2000 // Dispose objects before losing scope
273
            RequestHandlers.Add(requestHandler);
×
274

275
            await requestHandler.ListenAsync().ConfigureAwait(false);
×
276
        }
×
277

278
        /// <inheritdoc/>
279
        public void Dispose()
280
        {
281
            // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method
282
            Dispose(disposing: true);
×
283
            GC.SuppressFinalize(this);
×
284
        }
×
285

286
        /// <summary>
287
        /// Disposes resources of the <see cref="StreamingRequestHandler"/>.
288
        /// </summary>
289
        /// <param name="disposing">Whether we are disposing managed resources.</param>
290
        protected virtual void Dispose(bool disposing)
291
        {
292
            if (!_disposedValue)
×
293
            {
294
                if (disposing)
×
295
                {
296
                    if (RequestHandlers != null)
×
297
                    {
298
                        foreach (var handler in RequestHandlers)
×
299
                        {
300
                            handler.Dispose();
×
301
                        }
302
                    }
303
                }
304

305
                RequestHandlers = null;
×
306
                _disposedValue = true;
×
307
            }
308
        }
×
309

310
        /// <summary>
311
        /// Evaluates if processing an outgoing activity is possible.
312
        /// </summary>
313
        /// <remarks>If returns true, <see cref="BotFrameworkHttpAdapterBase.ProcessOutgoingActivityAsync"/> will be responsible for sending 
314
        /// the outgoing activity.</remarks>
315
        /// <param name="activity">The outgoing activity.</param>
316
        /// <returns>Whether should call ProcessOutgoingActivityAsync to send the outgoing activity.</returns>
317
        protected override bool CanProcessOutgoingActivity(Activity activity)
318
        {
319
            if (activity == null)
1✔
320
            {
321
                throw new ArgumentNullException(nameof(activity));
×
322
            }
323

324
            return activity.IsFromStreamingConnection();
1✔
325
        }
326

327
        /// <summary>
328
        /// Sends an outgoing activity.
329
        /// </summary>
330
        /// <param name="turnContext">The context object for the turn.</param>
331
        /// <param name="activity">The activity to be processed.</param>
332
        /// <param name="cancellationToken">The cancellation token.</param>
333
        /// <returns>The result of processing the activity.</returns>
334
        protected override async Task<ResourceResponse> ProcessOutgoingActivityAsync(ITurnContext turnContext, Activity activity, CancellationToken cancellationToken)
335
        {
336
            if (activity == null)
1✔
337
            {
338
                throw new ArgumentNullException(nameof(activity));
×
339
            }
340

341
            // Check if we have token responses from OAuth cards.
342
            TokenResolver.CheckForOAuthCards(this, Logger, turnContext, activity, cancellationToken);
1✔
343

344
            // The ServiceUrl for streaming channels begins with the string "urn" and contains
345
            // information unique to streaming connections. Now that we know that this is a streaming
346
            // activity, process it in the streaming pipeline.
347
            // Process streaming activity.
348
            return await SendStreamingActivityAsync(activity, cancellationToken).ConfigureAwait(false);
1✔
349
        }
1✔
350

351
        /// <summary>
352
        /// Creates a streaming specific connector client.
353
        /// </summary>
354
        private IConnectorClient CreateStreamingConnectorClient(Activity activity, StreamingRequestHandler requestHandler)
355
        {
356
            var emptyCredentials = (ChannelProvider != null && ChannelProvider.IsGovernment()) ?
×
357
                    MicrosoftGovernmentAppCredentials.Empty :
×
358
                    MicrosoftAppCredentials.Empty;
×
359
#pragma warning disable CA2000 // Dispose objects before losing scope (We need to make ConnectorClient disposable to fix this, ignoring it for now)
360
            var streamingClient = new StreamingHttpClient(requestHandler, Logger);
1✔
361
#pragma warning restore CA2000 // Dispose objects before losing scope
362
            var connectorClient = new ConnectorClient(new Uri(activity.ServiceUrl), emptyCredentials, customHttpClient: streamingClient, disposeHttpClient: false);
1✔
363
            return connectorClient;
1✔
364
        }
365

366
        /// <summary>
367
        /// Attempts to get an audience from the <see cref="Activity.CallerId"/>.
368
        /// </summary>
369
        /// <param name="activity">The incoming activity to be processed by a <see cref="StreamingRequestHandler"/>.</param>
370
        private string GetAudienceFromCallerId(Activity activity)
371
        {
372
            if (string.IsNullOrEmpty(activity.CallerId))
1✔
373
            {
374
                return null;
1✔
375
            }
376

377
            switch (activity.CallerId)
1✔
378
            {
379
                case CallerIdConstants.PublicAzureChannel:
380
                    return AuthenticationConstants.ToChannelFromBotOAuthScope;
1✔
381
                case CallerIdConstants.USGovChannel:
382
                    return GovernmentAuthenticationConstants.ToChannelFromBotOAuthScope;
×
383
                default:
384
                    if (activity.CallerId.StartsWith(CallerIdConstants.BotToBotPrefix, StringComparison.InvariantCultureIgnoreCase))
×
385
                    {
386
                        return activity.CallerId.Substring(CallerIdConstants.BotToBotPrefix.Length);
×
387
                    }
388

389
                    return null;
×
390
            }
391
        }
392
    }
393
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc