• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 27791342827

18 Jun 2026 09:46PM UTC coverage: 84.068% (+0.2%) from 83.827%
27791342827

Pull #1389

github

web-flow
Merge 2d80d9aa1 into 0116c9815
Pull Request #1389: [codex] Improve high-event hot paths

923 of 1213 branches covered (76.09%)

Branch coverage included in aggregate %.

55 of 56 new or added lines in 2 files covered. (98.21%)

3 existing lines in 1 file now uncovered.

3425 of 3959 relevant lines covered (86.51%)

584.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.71
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
using System.Collections.Concurrent;
2

3
namespace NetDaemon.Client.Internal;
4

5
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
6
{
7
    #region -- private declarations -
8

9
    private volatile bool _isDisposed;
10

11
    private readonly ILogger<IHomeAssistantConnection> _logger;
12
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
13
    private readonly IHomeAssistantApiManager _apiManager;
14
    private readonly ResultMessageHandler _resultMessageHandler;
15
    private readonly CancellationTokenSource _internalCancelSource = new();
43✔
16

17
    private readonly Subject<HassMessage> _hassMessageSubject = new();
43✔
18
    private readonly ConcurrentDictionary<int, TaskCompletionSource<HassMessage>> _pendingResults = new();
43✔
19
    private readonly Task _handleNewMessagesTask;
20

21
    private const int WaitForResultTimeout = 20000;
22

23
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
43✔
24
    private int _messageId = 1;
43✔
25
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
26

27
    #endregion
28

29
    /// <summary>
30
    ///     Default constructor
31
    /// </summary>
32
    /// <param name="logger">A logger instance</param>
33
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
34
    /// <param name="apiManager">The api manager</param>
35
    public HomeAssistantConnection(
43✔
36
        ILogger<IHomeAssistantConnection> logger,
43✔
37
        IWebSocketClientTransportPipeline pipeline,
43✔
38
        IHomeAssistantApiManager apiManager
43✔
39
    )
43✔
40
    {
41
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
43!
42
        _apiManager = apiManager;
43✔
43
        _logger = logger;
43✔
44

45
        _resultMessageHandler = new ResultMessageHandler(_logger, TimeProvider.System);
43✔
46

47
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
48
        // to all events. If people wants that they can provide a "*" type and get the same thing
49
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
43✔
50
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
54✔
51

52
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
43✔
53
            throw new ApplicationException(
1✔
54
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
55

56
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
84✔
57
            TaskCreationOptions.LongRunning);
42✔
58
    }
42✔
59

60
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
61
        CancellationToken cancelToken)
62
    {
63
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
64
        if (string.IsNullOrEmpty(eventType))
31✔
65
            return await _lazyAllEventsObservable.Value;
28✔
66

67
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
68
    }
31✔
69

70
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
71
        CancellationToken cancelToken)
72
    {
73
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
14✔
74
            cancelToken,
14✔
75
            _internalCancelSource.Token
14✔
76
        );
14✔
77

78
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
14✔
79
            combinedTokenSource.Token).ConfigureAwait(false);
14✔
80

81
        // The id if the message we used to subscribe should be used as the filter for the event messages
82
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
1,491!
83
            .Select(n => n.Event!);
924✔
84

85
        return observableResult;
14✔
86
    }
14✔
87

88
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
89
    {
90
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
12✔
91
            cancelToken,
12✔
92
            _internalCancelSource.Token
12✔
93
        );
12✔
94

95
        // Just wait for token source (internal och provided one)
96
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
12✔
97
    }
×
98

99
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
100
    {
101
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
19✔
102
        _resultMessageHandler.HandleResult(returnMessageTask, command);
18✔
103
    }
18✔
104

105
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
106
        where T : CommandMessage
107
    {
108
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
118✔
109

110
        return result is not null ? result.Value.Deserialize<TResult>() : default;
117✔
111
    }
117✔
112

113
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
114
        where T : CommandMessage
115
    {
116
        var hassMessage =
123✔
117
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
123✔
118

119
        // The SendCommandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
120
        return hassMessage?.ResultElement;
122!
121
    }
122✔
122

123
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
124
        CancellationToken cancelToken)
125
        where T : CommandMessage
126
    {
127
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
148✔
128
        var result = await resultMessageTask.WaitAsync(TimeSpan.FromMilliseconds(WaitForResultTimeout), cancelToken);
146✔
129

130
        if (result.Success ?? false)
145✔
131
            return result;
143✔
132

133
        // Non successful command should throw exception
134
        throw new InvalidOperationException(
2✔
135
            $"Failed command ({command.Type}) error: {result.Error}.  Sent command is {command.ToJsonElement()}");
2✔
136
    }
143✔
137

138
    public async ValueTask DisposeAsync()
139
    {
140
        if (_isDisposed) return;
44✔
141
        _isDisposed = true;
42✔
142

143
        try
144
        {
145
            await CloseAsync().ConfigureAwait(false);
42✔
146
        }
41✔
147
        catch (Exception e)
1✔
148
        {
149
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
1✔
150
        }
1✔
151

152
        if (!_internalCancelSource.IsCancellationRequested)
42✔
153
            await _internalCancelSource.CancelAsync();
29✔
154
        CancelPendingResults();
42✔
155

156
        // Gracefully wait for task or timeout
157
        await Task.WhenAny(
42✔
158
            _handleNewMessagesTask,
42✔
159
            Task.Delay(5000)
42✔
160
        ).ConfigureAwait(false);
42✔
161

162
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
42✔
163
        _internalCancelSource.Dispose();
42✔
164
        _hassMessageSubject.Dispose();
42✔
165
        await _resultMessageHandler.DisposeAsync();
42✔
166
        _messageIdSemaphore.Dispose();
42✔
167
    }
43✔
168

169
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
170
    {
171
        ObjectDisposedException.ThrowIf(_isDisposed, this);
34✔
172
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
33✔
173
    }
174

175
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
176
    {
177
        ObjectDisposedException.ThrowIf(_isDisposed, this);
2✔
178
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
179
    }
180

181
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
4✔
182

183
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
184
    {
185
        ObjectDisposedException.ThrowIf(_isDisposed, this);
167✔
186

187
        // The semaphore can fail to be taken in rare cases so we need
188
        // to keep this out of the try/finally block so it will not be released
189
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
166✔
190
        try
191
        {
192
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
193
            // increasing the messageId and Sending the message
194
            command.Id = ++_messageId;
166✔
195

196
            // Complete result waiters directly from the receive loop instead of creating
197
            // one filtered Rx subscription per command.
198
            var resultEvent = new TaskCompletionSource<HassMessage>(
166✔
199
                TaskCreationOptions.RunContinuationsAsynchronously);
166✔
200
            if (!_pendingResults.TryAdd(command.Id, resultEvent))
166!
NEW
201
                throw new InvalidOperationException($"A command with id {command.Id} is already pending");
×
202

203
            try
204
            {
205
                await _transportPipeline.SendMessageAsync(command, cancelToken);
166✔
206
            }
164✔
207
            catch (OperationCanceledException)
1✔
208
            {
209
                if (_pendingResults.TryRemove(command.Id, out var pendingResult))
1✔
210
                    pendingResult.TrySetCanceled(cancelToken);
1✔
211
                throw;
1✔
212
            }
213
            catch (Exception e)
1✔
214
            {
215
                if (_pendingResults.TryRemove(command.Id, out var pendingResult))
1✔
216
                    pendingResult.TrySetException(e);
1✔
217
                throw;
1✔
218
            }
219

220
            return resultEvent.Task;
164✔
221
        }
222
        finally
223
        {
224
            _messageIdSemaphore.Release();
166✔
225
        }
226
    }
164✔
227

228
    private async Task HandleNewMessages()
229
    {
230
        try
231
        {
232
            while (!_internalCancelSource.IsCancellationRequested)
245✔
233
            {
234
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
243✔
235
                    .ConfigureAwait(false);
243✔
236
                try
237
                {
238
                    foreach (var obj in msg)
1,010✔
239
                    {
240
                        HandleIncomingMessage(obj);
302✔
241
                    }
242
                }
203✔
243
                catch (Exception e)
×
244
                {
245
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
246
                }
×
247
            }
248
        }
2✔
249
        catch (OperationCanceledException)
8✔
250
        {
251
            // Normal case just exit
252
        }
8✔
253
        finally
254
        {
255
            _logger.LogTrace("Stop processing new messages");
31✔
256
            CancelPendingResults();
31✔
257
            // make sure we always cancel any blocking operations
258
            if (!_internalCancelSource.IsCancellationRequested)
31✔
259
                await _internalCancelSource.CancelAsync();
13✔
260
        }
261
    }
10✔
262

263
    private void HandleIncomingMessage(HassMessage message)
264
    {
265
        if (message.Type == "result" && _pendingResults.TryRemove(message.Id, out var completionSource))
302✔
266
            completionSource.TrySetResult(message);
161✔
267

268
        _hassMessageSubject.OnNext(message);
302✔
269
    }
302✔
270

271
    private void CancelPendingResults()
272
    {
273
        foreach (var pendingResult in _pendingResults)
154✔
274
        {
275
            if (_pendingResults.TryRemove(pendingResult.Key, out var completionSource))
4✔
276
                completionSource.TrySetCanceled();
3✔
277
        }
278
    }
73✔
279

280
    private Task CloseAsync()
281
    {
282
        return _transportPipeline.CloseAsync();
42✔
283
    }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc