• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-dotnet / 371447

05 Oct 2023 07:41PM UTC coverage: 73.455% (+0.004%) from 73.451%
371447

Pull #6696

CI-PR build

web-flow
Merge faa7f42a2 into c2f3aea4a
Pull Request #6696: fix: [#6683] Timeout issue when using DLASE

24147 of 32873 relevant lines covered (73.46%)

0.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.2
/libraries/Microsoft.Bot.Connector.Streaming/Session/StreamingSession.cs
1
// Copyright (c) Microsoft Corporation. All rights reserved.
2
// Licensed under the MIT License.
3

4
using System;
5
using System.Buffers;
6
using System.Collections.Concurrent;
7
using System.Collections.Generic;
8
using System.IO;
9
using System.Linq;
10
using System.Net;
11
using System.Runtime.InteropServices;
12
using System.Text.Json;
13
using System.Threading;
14
using System.Threading.Tasks;
15
using Microsoft.Bot.Connector.Streaming.Payloads;
16
using Microsoft.Bot.Connector.Streaming.Transport;
17
using Microsoft.Bot.Streaming;
18
using Microsoft.Bot.Streaming.Payloads;
19
using Microsoft.Extensions.Logging;
20
using Microsoft.Extensions.Logging.Abstractions;
21
using Microsoft.Net.Http.Headers;
22

23
namespace Microsoft.Bot.Connector.Streaming.Session
24
{
25
    internal class StreamingSession
26
    {
27
        // Utf byte order mark constant as defined
28
        // Dotnet runtime: https://github.com/dotnet/runtime/blob/main/src/libraries/System.Text.Json/src/System/Text/Json/JsonConstants.cs#L35
29
        // Unicode.org spec: https://www.unicode.org/faq/utf_bom.html#bom5
30
        private static byte[] _utf8Bom = { 0xEF, 0xBB, 0xBF };
1✔
31

32
        private readonly Dictionary<Guid, StreamDefinition> _streamDefinitions = new Dictionary<Guid, StreamDefinition>();
1✔
33
        private readonly Dictionary<Guid, ReceiveRequest> _requests = new Dictionary<Guid, ReceiveRequest>();
1✔
34
        private readonly Dictionary<Guid, ReceiveResponse> _responses = new Dictionary<Guid, ReceiveResponse>();
1✔
35
        private readonly ConcurrentDictionary<Guid, TaskCompletionSource<ReceiveResponse>> _pendingResponses = new ConcurrentDictionary<Guid, TaskCompletionSource<ReceiveResponse>>();
1✔
36

37
        private readonly RequestHandler _receiver;
38
        private readonly TransportHandler _sender;
39

40
        private readonly ILogger _logger;
41
        private readonly CancellationToken _connectionCancellationToken;
42

43
        private readonly object _receiveSync = new object();
1✔
44

45
        public StreamingSession(RequestHandler receiver, TransportHandler sender, ILogger logger, CancellationToken connectionCancellationToken = default)
1✔
46
        {
47
            _receiver = receiver ?? throw new ArgumentNullException(nameof(receiver));
1✔
48
            _sender = sender ?? throw new ArgumentNullException(nameof(sender));
1✔
49
            _sender.Subscribe(new ProtocolDispatcher(this));
1✔
50

51
            _logger = logger ?? NullLogger.Instance;
×
52
            _connectionCancellationToken = connectionCancellationToken;
1✔
53
        }
1✔
54

55
        public async Task<ReceiveResponse> SendRequestAsync(StreamingRequest request, CancellationToken cancellationToken)
56
        {
57
            if (request == null)
1✔
58
            {
59
                throw new ArgumentNullException(nameof(request));
1✔
60
            }
61

62
            var payload = new RequestPayload()
1✔
63
            {
1✔
64
                Verb = request.Verb,
1✔
65
                Path = request.Path,
1✔
66
            };
1✔
67

68
            if (request.Streams != null)
1✔
69
            {
70
                payload.Streams = new List<StreamDescription>();
1✔
71
                foreach (var contentStream in request.Streams)
1✔
72
                {
73
                    var description = GetStreamDescription(contentStream);
1✔
74

75
                    payload.Streams.Add(description);
1✔
76
                }
77
            }
78

79
            var requestId = Guid.NewGuid();
1✔
80

81
            var responseCompletionSource = new TaskCompletionSource<ReceiveResponse>();
1✔
82
            _pendingResponses.TryAdd(requestId, responseCompletionSource);
1✔
83

84
            // Send request
85
            await _sender.SendRequestAsync(requestId, payload, cancellationToken).ConfigureAwait(false);
1✔
86

87
            if (request.Streams != null)
1✔
88
            {
89
                foreach (var stream in request.Streams)
1✔
90
                {
91
                    await _sender.SendStreamAsync(stream.Id, await stream.Content.ReadAsStreamAsync().ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
1✔
92
                }
93
            }
94

95
            return await responseCompletionSource.Task.DefaultTimeOutAsync().ConfigureAwait(false);
1✔
96
        }
1✔
97

98
        public async Task SendResponseAsync(Header header, StreamingResponse response, CancellationToken cancellationToken)
99
        {
100
            if (header == null)
1✔
101
            {
102
                throw new ArgumentNullException(nameof(header));
1✔
103
            }
104

105
            if (header.Type != PayloadTypes.Response)
1✔
106
            {
107
                throw new InvalidOperationException($"StreamingSession SendResponseAsync expected Response payload, but instead received a payload of type {header.Type}");
1✔
108
            }
109

110
            if (response == null)
1✔
111
            {
112
                throw new ArgumentNullException(nameof(response));
1✔
113
            }
114

115
            var payload = new ResponsePayload()
1✔
116
            {
1✔
117
                StatusCode = response.StatusCode,
1✔
118
            };
1✔
119

120
            if (response.Streams != null)
1✔
121
            {
122
                payload.Streams = new List<StreamDescription>();
×
123
                foreach (var contentStream in response.Streams)
×
124
                {
125
                    var description = GetStreamDescription(contentStream);
×
126

127
                    payload.Streams.Add(description);
×
128
                }
129
            }
130

131
            await _sender.SendResponseAsync(header.Id, payload, cancellationToken).ConfigureAwait(false);
1✔
132

133
            if (response.Streams != null)
1✔
134
            {
135
                foreach (var stream in response.Streams)
×
136
                {
137
                    await _sender.SendStreamAsync(stream.Id, await stream.Content.ReadAsStreamAsync().ConfigureAwait(false), cancellationToken).ConfigureAwait(false);
×
138
                }
139
            }
140
        }
1✔
141

142
        public virtual void ReceiveRequest(Header header, ReceiveRequest request)
143
        {
144
            if (header == null)
1✔
145
            {
146
                throw new ArgumentNullException(nameof(header));
1✔
147
            }
148

149
            if (header.Type != PayloadTypes.Request)
1✔
150
            {
151
                throw new InvalidOperationException($"StreamingSession cannot receive payload of type {header.Type} as request.");
1✔
152
            }
153

154
            if (request == null)
1✔
155
            {
156
                throw new ArgumentNullException(nameof(request));
1✔
157
            }
158

159
            Log.PayloadReceived(_logger, header);
1✔
160

161
            lock (_receiveSync)
1✔
162
            {
163
                _requests.Add(header.Id, request);
1✔
164

165
                if (request.Streams.Any())
1✔
166
                {
167
                    foreach (var streamDefinition in request.Streams)
1✔
168
                    {
169
                        _streamDefinitions.Add(streamDefinition.Id, streamDefinition as StreamDefinition);
1✔
170
                    }
171
                }
172
                else
173
                {
174
                    ProcessRequest(header.Id, request);
1✔
175
                }
176
            }
1✔
177
        }
1✔
178

179
        public virtual void ReceiveResponse(Header header, ReceiveResponse response)
180
        {
181
            if (header == null)
1✔
182
            {
183
                throw new ArgumentNullException(nameof(header));
1✔
184
            }
185

186
            if (header.Type != PayloadTypes.Response)
1✔
187
            {
188
                throw new InvalidOperationException($"StreamingSession cannot receive payload of type {header.Type} as response");
1✔
189
            }
190

191
            if (response == null)
1✔
192
            {
193
                throw new ArgumentNullException(nameof(response));
1✔
194
            }
195

196
            Log.PayloadReceived(_logger, header);
1✔
197

198
            if (response.StatusCode == (int)HttpStatusCode.Accepted)
1✔
199
            {
200
                return;
1✔
201
            }
202

203
            lock (_receiveSync)
1✔
204
            {
205
                if (!response.Streams.Any())
1✔
206
                {
207
                    if (_pendingResponses.TryGetValue(header.Id, out TaskCompletionSource<ReceiveResponse> responseTask))
1✔
208
                    {
209
                        responseTask.SetResult(response);
1✔
210
                        _pendingResponses.TryRemove(header.Id, out TaskCompletionSource<ReceiveResponse> removedResponse);
1✔
211
                    }
212
                }
213
                else
214
                {
215
                    _responses.Add(header.Id, response);
1✔
216

217
                    foreach (var streamDefinition in response.Streams)
1✔
218
                    {
219
                        _streamDefinitions.Add(streamDefinition.Id, streamDefinition as StreamDefinition);
1✔
220
                    }
221
                }
222
            }
×
223
        }
1✔
224

225
        public virtual void ReceiveStream(Header header, ArraySegment<byte> payload)
226
        {
227
            if (header == null)
1✔
228
            {
229
                throw new ArgumentNullException(nameof(header));
1✔
230
            }
231

232
            if (header.Type != PayloadTypes.Stream)
1✔
233
            {
234
                throw new InvalidOperationException($"StreamingSession cannot receive payload of type {header.Type} as stream");
1✔
235
            }
236

237
            if (payload == null)
238
            {
239
                throw new ArgumentNullException(nameof(payload));
240
            }
241

242
            Log.PayloadReceived(_logger, header);
1✔
243

244
            // Find request for incoming stream header
245
            if (_streamDefinitions.TryGetValue(header.Id, out StreamDefinition streamDefinition))
1✔
246
            {
247
                streamDefinition.Stream.Write(payload.Array, payload.Offset, payload.Count);
1✔
248

249
                // Is this the end of this stream?
250
                if (header.End)
1✔
251
                {
252
                    // Mark this stream as completed
253
                    if (streamDefinition is StreamDefinition streamDef)
1✔
254
                    {
255
                        streamDef.Complete = true;
1✔
256
                        streamDef.Stream.Seek(0, SeekOrigin.Begin);
1✔
257

258
                        List<IContentStream> streams = null;
1✔
259

260
                        // Find the request / response
261
                        if (streamDef.PayloadType == PayloadTypes.Request)
1✔
262
                        {
263
                            if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest req))
1✔
264
                            {
265
                                streams = req.Streams;
1✔
266
                            }
267
                        }
268
                        else if (streamDef.PayloadType == PayloadTypes.Response)
1✔
269
                        {
270
                            if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse res))
1✔
271
                            {
272
                                streams = res.Streams;
1✔
273
                            }
274
                        }
275

276
                        if (streams != null)
1✔
277
                        {
278
                            lock (_receiveSync)
1✔
279
                            {
280
                                // Have we completed all the streams we expect for this request?
281
                                bool allStreamsDone = streams.All(s => s is StreamDefinition streamDef && streamDef.Complete);
×
282

283
                                // If we received all the streams, then it's time to pass this request to the request handler!
284
                                // For example, if this request is a send activity, the request handler will deserialize the first stream
285
                                // into an activity and pass to the adapter.
286
                                if (allStreamsDone)
1✔
287
                                {
288
                                    if (streamDef.PayloadType == PayloadTypes.Request)
1✔
289
                                    {
290
                                        if (_requests.TryGetValue(streamDef.PayloadId, out ReceiveRequest request))
1✔
291
                                        {
292
                                            ProcessRequest(streamDef.PayloadId, request);
1✔
293
                                            _requests.Remove(streamDef.PayloadId);
1✔
294
                                        }
295
                                    }
296
                                    else if (streamDef.PayloadType == PayloadTypes.Response)
1✔
297
                                    {
298
                                        if (_responses.TryGetValue(streamDef.PayloadId, out ReceiveResponse response))
1✔
299
                                        {
300
                                            if (_pendingResponses.TryGetValue(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> responseTask))
1✔
301
                                            {
302
                                                responseTask.SetResult(response);
1✔
303
                                                _responses.Remove(streamDef.PayloadId);
1✔
304
                                                _pendingResponses.TryRemove(streamDef.PayloadId, out TaskCompletionSource<ReceiveResponse> removedResponse);
1✔
305
                                            }
306
                                        }
307
                                    }
308
                                }
309
                            }
1✔
310
                        }
311
                    }
312
                }
313
            }
314
            else
315
            {
316
                Log.OrphanedStream(_logger, header);
×
317
            }
318
        }
1✔
319

320
        private static StreamDescription GetStreamDescription(ResponseMessageStream stream)
321
        {
322
            var description = new StreamDescription()
1✔
323
            {
1✔
324
                Id = stream.Id.ToString("D"),
1✔
325
            };
1✔
326

327
            if (stream.Content.Headers.TryGetValues(HeaderNames.ContentType, out IEnumerable<string> contentType))
1✔
328
            {
329
                description.ContentType = contentType?.FirstOrDefault();
×
330
            }
331

332
            if (stream.Content.Headers.TryGetValues(HeaderNames.ContentLength, out IEnumerable<string> contentLength))
1✔
333
            {
334
                var value = contentLength?.FirstOrDefault();
×
335
                if (value != null && int.TryParse(value, out int length))
×
336
                {
337
                    description.Length = length;
×
338
                }
339
            }
340
            else
341
            {
342
                description.Length = (int?)stream.Content.Headers.ContentLength;
×
343
            }
344

345
            return description;
1✔
346
        }
347

348
        private static ArraySegment<byte> GetArraySegment(ReadOnlySequence<byte> sequence)
349
        {
350
            if (sequence.IsSingleSegment)
1✔
351
            {
352
                if (MemoryMarshal.TryGetArray(sequence.First, out ArraySegment<byte> segment))
1✔
353
                {
354
                    return segment;
1✔
355
                }
356
            }
357

358
            // Can be optimized by not copying but should be uncommon. If perf data shows that we are hitting this
359
            // code branch, then we can optimize and avoid copies and heap allocations.
360
            return new ArraySegment<byte>(sequence.ToArray());
1✔
361
        }
362

363
        private void ProcessRequest(Guid id, ReceiveRequest request)
364
        {
365
            _ = Task.Run(async () =>
1✔
366
            {
1✔
367
                // Send an HTTP 202 (Accepted) response right away, otherwise, while under high streaming load, the conversation times out due to not having a response in the request/response time frame.
1✔
368
                await SendResponseAsync(new Header { Id = id, Type = PayloadTypes.Response }, new StreamingResponse { StatusCode = (int)HttpStatusCode.Accepted }, _connectionCancellationToken).ConfigureAwait(false);
1✔
369
                var streamingResponse = await _receiver.ProcessRequestAsync(request, null).ConfigureAwait(false);
1✔
370
                await SendResponseAsync(new Header() { Id = id, Type = PayloadTypes.Response }, streamingResponse, _connectionCancellationToken).ConfigureAwait(false);
1✔
371

1✔
372
                request.Streams.ForEach(s => _streamDefinitions.Remove(s.Id));
1✔
373
            });
1✔
374
        }
1✔
375

376
        internal class ProtocolDispatcher : IObserver<(Header Header, ReadOnlySequence<byte> Payload)>
377
        {
378
            private readonly StreamingSession _streamingSession;
379

380
            public ProtocolDispatcher(StreamingSession streamingSession)
1✔
381
            {
382
                _streamingSession = streamingSession ?? throw new ArgumentNullException(nameof(streamingSession));
1✔
383
            }
1✔
384

385
            public void OnCompleted()
386
            {
387
                throw new NotImplementedException();
×
388
            }
389

390
            public void OnError(Exception error)
391
            {
392
                throw new NotImplementedException();
×
393
            }
394

395
            public void OnNext((Header Header, ReadOnlySequence<byte> Payload) frame)
396
            {
397
                var header = frame.Header;
1✔
398
                var payload = frame.Payload;
1✔
399

400
                switch (header.Type)
1✔
401
                {
402
                    case PayloadTypes.Stream:
403
                        _streamingSession.ReceiveStream(header, GetArraySegment(payload));
1✔
404

405
                        break;
1✔
406
                    case PayloadTypes.Request:
407

408
                        var requestPayload = DeserializeTo<RequestPayload>(payload);
1✔
409
                        var request = new ReceiveRequest()
1✔
410
                        {
1✔
411
                            Verb = requestPayload.Verb,
1✔
412
                            Path = requestPayload.Path,
1✔
413
                            Streams = new List<IContentStream>(),
1✔
414
                        };
1✔
415

416
                        CreatePlaceholderStreams(header, request.Streams, requestPayload.Streams);
1✔
417
                        _streamingSession.ReceiveRequest(header, request);
1✔
418

419
                        break;
1✔
420

421
                    case PayloadTypes.Response:
422

423
                        var responsePayload = DeserializeTo<ResponsePayload>(payload);
1✔
424
                        var response = new ReceiveResponse()
1✔
425
                        {
1✔
426
                            StatusCode = responsePayload.StatusCode,
1✔
427
                            Streams = new List<IContentStream>(),
1✔
428
                        };
1✔
429

430
                        CreatePlaceholderStreams(header, response.Streams, responsePayload.Streams);
1✔
431
                        _streamingSession.ReceiveResponse(header, response);
1✔
432

433
                        break;
434

435
                    case PayloadTypes.CancelAll:
436
                        break;
437

438
                    case PayloadTypes.CancelStream:
439
                        break;
440
                }
441
            }
1✔
442

443
            private static T DeserializeTo<T>(ReadOnlySequence<byte> payload)
444
            {
445
                // The payload here will likely have a UTF-8 byte-order-mark (BOM). 
446
                // The JsonSerializer and UtfJsonReader explicitly expect no BOM in this overload that takes a ReadOnlySequence<byte>.
447
                // With that in mind, we check for a UTF-8 BOM and remove it if present. The main reason to call this specific flow instead of
448
                // the stream version or using Json.Net is that the ReadOnlySequence<T> API allows us to do a no-copy deserialization.
449
                // The ReadOnlySequence was allocated from the memory pool by the transport layer and gets sent all the way here without copies.
450

451
                // Check for UTF-8 BOM and remove if present: https://docs.microsoft.com/en-us/dotnet/standard/serialization/system-text-json-use-dom-utf8jsonreader-utf8jsonwriter?pivots=dotnet-5-0#filter-data-using-utf8jsonreader 
452
                var potentialBomSequence = payload.Slice(payload.Start, _utf8Bom.Length);
1✔
453
                var potentialBomSpan = potentialBomSequence.IsSingleSegment
1✔
454
                    ? potentialBomSequence.First.Span
1✔
455
                    : potentialBomSequence.ToArray();
1✔
456

457
                ReadOnlySequence<byte> mainPayload = payload;
1✔
458

459
                if (potentialBomSpan.StartsWith(_utf8Bom))
1✔
460
                {
461
                    mainPayload = payload.Slice(_utf8Bom.Length);
1✔
462
                }
463

464
                var reader = new Utf8JsonReader(mainPayload);
1✔
465
                return System.Text.Json.JsonSerializer.Deserialize<T>(
1✔
466
                    ref reader,
1✔
467
                    new JsonSerializerOptions() { IgnoreNullValues = true, PropertyNameCaseInsensitive = true });
1✔
468
            }
469

470
            private static void CreatePlaceholderStreams(Header header, List<IContentStream> placeholders, List<StreamDescription> streamInfo)
471
            {
472
                if (streamInfo != null)
1✔
473
                {
474
                    foreach (var streamDescription in streamInfo)
1✔
475
                    {
476
                        if (!Guid.TryParse(streamDescription.Id, out Guid id))
1✔
477
                        {
478
                            throw new InvalidDataException($"Stream description id '{streamDescription.Id}' is not a Guid");
×
479
                        }
480

481
                        placeholders.Add(new StreamDefinition()
1✔
482
                        {
1✔
483
                            ContentType = streamDescription.ContentType,
1✔
484
                            Length = streamDescription.Length,
1✔
485
                            Id = Guid.Parse(streamDescription.Id),
1✔
486
                            Stream = new MemoryStream(),
1✔
487
                            PayloadType = header.Type,
1✔
488
                            PayloadId = header.Id
1✔
489
                        });
1✔
490
                    }
491
                }
492
            }
1✔
493
        }
494

495
        internal class StreamDefinition : IContentStream
496
        {
497
            public Guid Id { get; set; }
1✔
498

499
            public string ContentType { get; set; }
1✔
500

501
            public int? Length { get; set; }
1✔
502

503
            public Stream Stream { get; set; }
1✔
504

505
            public bool Complete { get; set; }
1✔
506

507
            public char PayloadType { get; set; }
1✔
508

509
            public Guid PayloadId { get; set; }
1✔
510
        }
511

512
        private class Log
513
        {
514
            private static readonly Action<ILogger, Guid, char, int, bool, Exception> _orphanedStream =
1✔
515
                LoggerMessage.Define<Guid, char, int, bool>(LogLevel.Error, new EventId(1, nameof(OrphanedStream)), "Stream has no associated payload. Header: ID {Guid} Type: {char} Payload length: {int} End: {bool}");
1✔
516

517
            private static readonly Action<ILogger, Guid, char, int, bool, Exception> _payloadReceived =
1✔
518
                LoggerMessage.Define<Guid, char, int, bool>(LogLevel.Debug, new EventId(2, nameof(PayloadReceived)), "Payload received in session. Header: ID {Guid} Type: {char} Payload length: {int} End: {bool}");
1✔
519

520
            public static void OrphanedStream(ILogger logger, Header header) => _orphanedStream(logger, header.Id, header.Type, header.PayloadLength, header.End, null);
×
521

522
            public static void PayloadReceived(ILogger logger, Header header) => _payloadReceived(logger, header.Id, header.Type, header.PayloadLength, header.End, null);
1✔
523
        }
524
    }
525
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc