• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

microsoft / botbuilder-dotnet / 348700

21 Apr 2023 10:15PM UTC coverage: 79.096% (-0.009%) from 79.105%
348700

Pull #6621

CI-PR build

GitHub
Merge 25a3b40f3 into 45ee363e6
Pull Request #6621: Support MeetingTabIconSurface in Meeting Notification API

25828 of 32654 relevant lines covered (79.1%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/libraries/Microsoft.Bot.Streaming/Payloads/StreamManager.cs
1
// Copyright (c) Microsoft Corporation. All rights reserved.
2
// Licensed under the MIT License.
3

4
using System;
5
using System.Collections.Concurrent;
6
using System.IO;
7

8
namespace Microsoft.Bot.Streaming.Payloads
9
{
10
    /// <summary>
11
    /// StreamManagers are used to provide access to the objects involved in processing incoming <see cref="PayloadStream"/>s.
12
    /// </summary>
13
    public class StreamManager : IStreamManager
14
    {
15
        private readonly ConcurrentDictionary<Guid, PayloadStreamAssembler> _activeAssemblers;
16
        private readonly Action<PayloadStreamAssembler> _onCancelStream;
17

18
        /// <summary>
19
        /// Initializes a new instance of the <see cref="StreamManager"/> class.
20
        /// </summary>
21
        /// <param name="onCancelStream">Optional action to trigger if the managed stream is cancelled.</param>
22
        public StreamManager(Action<PayloadStreamAssembler> onCancelStream = null)
1✔
23
        {
24
            // If no callback is defined, make it a noop to avoid null checking everywhere.
25
            _onCancelStream = onCancelStream ?? ((a) => { });
×
26
            _activeAssemblers = new ConcurrentDictionary<Guid, PayloadStreamAssembler>();
1✔
27
        }
1✔
28

29
        /// <inheritdoc/>
30
        public PayloadStreamAssembler GetPayloadAssembler(Guid id)
31
        {
32
            if (!_activeAssemblers.TryGetValue(id, out var assembler))
1✔
33
            {
34
                // a new id has come in, start a new task to process it
35
                assembler = new PayloadStreamAssembler(this, id);
1✔
36
                if (!_activeAssemblers.TryAdd(id, assembler))
1✔
37
                {
38
                    // Don't need to dispose the assembler because it was never used
39
                    // Get the one that is in use
40
                    _activeAssemblers.TryGetValue(id, out assembler);
×
41
                }
42
            }
43

44
            return assembler;
1✔
45
        }
46

47
        /// <inheritdoc/>
48
        public Stream GetPayloadStream(Header header)
49
        {
50
            var assembler = GetPayloadAssembler(header.Id);
1✔
51

52
            return assembler.GetPayloadAsStream();
1✔
53
        }
54

55
        /// <inheritdoc/>
56
        public void OnReceive(Header header, Stream contentStream, int contentLength)
57
        {
58
            if (_activeAssemblers.TryGetValue(header.Id, out var assembler))
1✔
59
            {
60
                assembler.OnReceive(header, contentStream, contentLength);
1✔
61
            }
62
        }
1✔
63

64
        /// <inheritdoc/>
65
        public void CloseStream(Guid id)
66
        {
67
            if (_activeAssemblers.TryRemove(id, out var assembler))
1✔
68
            {
69
                // decide whether to cancel it or not
70
                var stream = assembler.GetPayloadAsStream();
1✔
71
                if ((assembler.ContentLength.HasValue && stream.Length < assembler.ContentLength.Value) ||
1✔
72
                    !assembler.End)
1✔
73
                {
74
                    _onCancelStream(assembler);
1✔
75
                }
76
            }
77
        }
1✔
78
    }
79
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc