• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 18812120513

20 Oct 2025 07:33PM UTC coverage: 84.034% (-0.04%) from 84.073%
18812120513

push

github

web-flow
Remove mqtt warning (#1333)

The warning of mqtt config is not a great thing to have. Remove it.

885 of 1175 branches covered (75.32%)

Branch coverage included in aggregate %.

3373 of 3892 relevant lines covered (86.66%)

1257.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.24
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
namespace NetDaemon.Client.Internal;
2

3
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
4
{
5
    #region -- private declarations -
6

7
    private volatile bool _isDisposed;
8

9
    private readonly ILogger<IHomeAssistantConnection> _logger;
10
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
11
    private readonly IHomeAssistantApiManager _apiManager;
12
    private readonly ResultMessageHandler _resultMessageHandler;
13
    private readonly CancellationTokenSource _internalCancelSource = new();
35✔
14

15
    private readonly Subject<HassMessage> _hassMessageSubject = new();
35✔
16
    private readonly Task _handleNewMessagesTask;
17

18
    private const int WaitForResultTimeout = 20000;
19

20
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
35✔
21
    private int _messageId = 1;
35✔
22
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
23

24
    #endregion
25

26
    /// <summary>
27
    ///     Default constructor
28
    /// </summary>
29
    /// <param name="logger">A logger instance</param>
30
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
31
    /// <param name="apiManager">The api manager</param>
32
    public HomeAssistantConnection(
35✔
33
        ILogger<IHomeAssistantConnection> logger,
35✔
34
        IWebSocketClientTransportPipeline pipeline,
35✔
35
        IHomeAssistantApiManager apiManager
35✔
36
    )
35✔
37
    {
38
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
35!
39
        _apiManager = apiManager;
35✔
40
        _logger = logger;
35✔
41

42
        _resultMessageHandler = new ResultMessageHandler(_logger, TimeProvider.System);
35✔
43

44
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
45
        // to all events. If people wants that they can provide a "*" type and get the same thing
46
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
35✔
47
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
46✔
48

49
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
35✔
50
            throw new ApplicationException(
1✔
51
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
52

53
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
68✔
54
            TaskCreationOptions.LongRunning);
34✔
55
    }
34✔
56

57
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
58
        CancellationToken cancelToken)
59
    {
60
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
61
        if (string.IsNullOrEmpty(eventType))
31✔
62
            return await _lazyAllEventsObservable.Value;
28✔
63

64
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
65
    }
31✔
66

67
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
68
        CancellationToken cancelToken)
69
    {
70
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
14✔
71
            cancelToken,
14✔
72
            _internalCancelSource.Token
14✔
73
        );
14✔
74

75
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
14✔
76
            combinedTokenSource.Token).ConfigureAwait(false);
14✔
77

78
        // The id if the message we used to subscribe should be used as the filter for the event messages
79
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
1,534!
80
            .Select(n => n.Event!);
967✔
81

82
        return observableResult;
14✔
83
    }
14✔
84

85
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
86
    {
87
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
12✔
88
            cancelToken,
12✔
89
            _internalCancelSource.Token
12✔
90
        );
12✔
91

92
        // Just wait for token source (internal och provided one)
93
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
12✔
94
    }
×
95

96
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
97
    {
98
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
19✔
99
        _resultMessageHandler.HandleResult(returnMessageTask, command);
18✔
100
    }
18✔
101

102
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
103
        where T : CommandMessage
104
    {
105
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
118✔
106

107
        return result is not null ? result.Value.Deserialize<TResult>() : default;
117✔
108
    }
117✔
109

110
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
111
        where T : CommandMessage
112
    {
113
        var hassMessage =
123✔
114
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
123✔
115

116
        // The SendCommandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
117
        return hassMessage?.ResultElement;
122!
118
    }
122✔
119

120
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
121
        CancellationToken cancelToken)
122
        where T : CommandMessage
123
    {
124
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
137✔
125
        var result = await resultMessageTask.WaitAsync(TimeSpan.FromMilliseconds(WaitForResultTimeout), cancelToken);
137✔
126

127
        if (result.Success ?? false)
137✔
128
            return result;
136✔
129

130
        // Non successful command should throw exception
131
        throw new InvalidOperationException(
1✔
132
            $"Failed command ({command.Type}) error: {result.Error}.  Sent command is {command.ToJsonElement()}");
1✔
133
    }
136✔
134

135
    public async ValueTask DisposeAsync()
136
    {
137
        if (_isDisposed) return;
36✔
138
        _isDisposed = true;
34✔
139

140
        try
141
        {
142
            await CloseAsync().ConfigureAwait(false);
34✔
143
        }
34✔
144
        catch (Exception e)
×
145
        {
146
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
×
147
        }
×
148

149
        if (!_internalCancelSource.IsCancellationRequested)
34✔
150
            await _internalCancelSource.CancelAsync();
12✔
151

152
        // Gracefully wait for task or timeout
153
        await Task.WhenAny(
34✔
154
            _handleNewMessagesTask,
34✔
155
            Task.Delay(5000)
34✔
156
        ).ConfigureAwait(false);
34✔
157

158
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
34✔
159
        _internalCancelSource.Dispose();
34✔
160
        _hassMessageSubject.Dispose();
34✔
161
        await _resultMessageHandler.DisposeAsync();
34✔
162
        _messageIdSemaphore.Dispose();
34✔
163
    }
35✔
164

165
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
166
    {
167
        ObjectDisposedException.ThrowIf(_isDisposed, this);
34✔
168
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
33✔
169
    }
170

171
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
172
    {
173
        ObjectDisposedException.ThrowIf(_isDisposed, this);
2✔
174
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
175
    }
176

177
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
1✔
178

179
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
180
    {
181
        ObjectDisposedException.ThrowIf(_isDisposed, this);
156✔
182

183
        // The semaphore can fail to be taken in rare cases so we need
184
        // to keep this out of the try/finally block so it will not be released
185
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
155✔
186
        try
187
        {
188
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
189
            // increasing the messageId and Sending the message
190
            command.Id = ++_messageId;
155✔
191

192
            // We make a task that subscribe for the return result message
193
            // this task will be returned and handled by caller
194
            var resultEvent = _hassMessageSubject
155✔
195
                .Where(n => n.Type == "result" && n.Id == command.Id)
249✔
196
                .FirstAsync().ToTask(CancellationToken.None);
155✔
197
            // We dont want to pass the incoming CancellationToken here because it will throw a TaskCanceledException
198
            // when calling services from an Apps Dispose(Async) and hide possible actual exceptions
199

200
            await _transportPipeline.SendMessageAsync(command, cancelToken);
155✔
201

202
            return resultEvent;
155✔
203
        }
204
        finally
205
        {
206
            _messageIdSemaphore.Release();
155✔
207
        }
208
    }
155✔
209

210
    private async Task HandleNewMessages()
211
    {
212
        try
213
        {
214
            while (!_internalCancelSource.IsCancellationRequested)
235!
215
            {
216
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
235✔
217
                    .ConfigureAwait(false);
235✔
218
                try
219
                {
220
                    foreach (var obj in msg)
1,000✔
221
                    {
222
                        _hassMessageSubject.OnNext(obj);
299✔
223
                    }
224
                }
201✔
225
                catch (Exception e)
×
226
                {
227
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
228
                }
×
229
            }
230
        }
×
231
        catch (OperationCanceledException)
10✔
232
        {
233
            // Normal case just exit
234
        }
10✔
235
        finally
236
        {
237
            _logger.LogTrace("Stop processing new messages");
30✔
238
            // make sure we always cancel any blocking operations
239
            if (!_internalCancelSource.IsCancellationRequested)
30✔
240
                await _internalCancelSource.CancelAsync();
22✔
241
        }
242
    }
10✔
243

244
    private Task CloseAsync()
245
    {
246
        return _transportPipeline.CloseAsync();
34✔
247
    }
248
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc