• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 9343360009

26 May 2024 02:03PM UTC coverage: 82.867% (-0.1%) from 82.999%
9343360009

push

github

web-flow
Housecleaning  (#1108)

* Housecleaning - step 1

* More small fixes

863 of 1189 branches covered (72.58%)

Branch coverage included in aggregate %.

29 of 34 new or added lines in 23 files covered. (85.29%)

6 existing lines in 2 files now uncovered.

3224 of 3743 relevant lines covered (86.13%)

81.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/src/Extensions/NetDaemon.Extensions.MqttEntityManager/MessageSubscriber.cs
1
#region
2

3
using System.Collections.Concurrent;
4
using System.Collections.ObjectModel;
5
using System.Reactive.Subjects;
6
using Microsoft.Extensions.Logging;
7
using MQTTnet;
8
using MQTTnet.Client;
9
using MQTTnet.Extensions.ManagedClient;
10
using MQTTnet.Packets;
11
using NetDaemon.Extensions.MqttEntityManager.Helpers;
12

13
#endregion
14

15
namespace NetDaemon.Extensions.MqttEntityManager;
16

17
/// <summary>
18
///     Handle subscriptions to topics within MQTT and provide access for multiple subscribers to
19
///     receive updates
20
/// </summary>
21
internal class MessageSubscriber : IMessageSubscriber, IDisposable
22
{
23
    private readonly SemaphoreSlim _subscriptionSetupLock = new SemaphoreSlim(1);
×
24
    private bool _isDisposed;
25
    private bool _subscriptionIsSetup;
26
    private readonly IAssuredMqttConnection _assuredMqttConnection;
27
    private readonly ILogger<MessageSubscriber> _logger;
28
    private readonly ConcurrentDictionary<string, Lazy<Subject<string>>> _subscribers = new();
×
29

30
    /// <summary>
31
    ///     Managed subscriptions to topics within MQTT
32
    /// </summary>
33
    /// <param name="logger"></param>
34
    /// <param name="assuredMqttConnection"></param>
35
    public MessageSubscriber(ILogger<MessageSubscriber> logger, IAssuredMqttConnection assuredMqttConnection)
×
36
    {
37
        _logger = logger;
×
38
        _assuredMqttConnection = assuredMqttConnection;
×
39
    }
×
40

41
    /// <summary>
42
    ///     Subscribe to the given topic
43
    /// </summary>
44
    /// <param name="topic"></param>
45
    public async Task<IObservable<string>> SubscribeTopicAsync(string topic)
46
    {
47
        try
48
        {
49
            var mqttClient = await _assuredMqttConnection.GetClientAsync();
×
50
            await EnsureSubscriptionAsync(mqttClient);
×
51

52
            var topicFilters = new Collection<MqttTopicFilter>
×
53
            {
×
54
                new MqttTopicFilterBuilder().WithTopic(topic).Build()
×
55
            };
×
56

57
            await mqttClient.SubscribeAsync(topicFilters);
×
58
            return _subscribers.GetOrAdd(topic, new Lazy<Subject<string>>()).Value;
×
59
        }
60
        catch (Exception e)
×
61
        {
62
            _logger.LogError(e, "Failed to subscribe to topic");
×
63
            throw;
×
64
        }
65
    }
×
66

67
    /// <summary>
68
    /// If we are not already subscribed to receive messages, set up the handler
69
    /// </summary>
70
    /// <param name="mqttClient"></param>
71
    private async Task EnsureSubscriptionAsync(IManagedMqttClient mqttClient)
72
    {
73
        await _subscriptionSetupLock.WaitAsync();
×
74
        try
75
        {
76
            if (!_subscriptionIsSetup)
×
77
            {
78
                _logger.LogInformation("Configuring message subscription");
×
79
                mqttClient.ApplicationMessageReceivedAsync += OnMessageReceivedAsync;
×
80
                _subscriptionIsSetup = true;
×
81
            }
82
        }
×
83
        catch (Exception ex)
×
84
        {
85
            _logger.LogError(ex, "Failed to set up or process message subscription");
×
86
            throw;
×
87
        }
88
        finally
89
        {
90
            _subscriptionSetupLock.Release();
×
91
        }
92
    }
×
93

94
    /// <summary>
95
    /// Message received from MQTT, so find the subscription (if any) and notify them
96
    /// </summary>
97
    /// <param name="msg"></param>
98
    /// <returns></returns>
99
    private Task OnMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs msg)
100
    {
101
        try
102
        {
NEW
103
            var payload = ByteArrayHelper.SafeToString(msg.ApplicationMessage.PayloadSegment.Array ?? []);
×
104
            var topic = msg.ApplicationMessage.Topic;
×
105
            _logger.LogTrace("Subscription received {Payload} from {Topic}", payload, topic);
×
106

107
            if (!_subscribers.ContainsKey(topic))
×
108
                _logger.LogTrace("No subscription for topic={Topic}", topic);
×
109
            else
110
            {
111
                _subscribers[topic].Value.OnNext(payload);
×
112
            }
113
        }
×
114
        catch (Exception e)
×
115
        {
116
            _logger.LogError(e, "Failed to notify subscribers");
×
117
        }
×
118

119
        return Task.CompletedTask;
×
120
    }
121

122
    public void Dispose()
123
    {
124
        if (!_isDisposed)
×
125
        {
126
            _isDisposed = true;
×
127
            foreach (var observer in _subscribers)
×
128
            {
129
                _logger.LogTrace("Disposing {Topic} subscription", observer.Key);
×
130
                observer.Value.Value.OnCompleted();
×
131
                observer.Value.Value.Dispose();
×
132
            }
133

134
            _subscriptionSetupLock.Dispose();
×
135
        }
136
    }
×
137
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc