• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 27790362746

18 Jun 2026 09:27PM UTC coverage: 83.817% (-0.01%) from 83.827%
27790362746

Pull #1389

github

web-flow
Merge a6ceed159 into 0116c9815
Pull Request #1389: [codex] Improve high-event hot paths

915 of 1213 branches covered (75.43%)

Branch coverage included in aggregate %.

50 of 56 new or added lines in 2 files covered. (89.29%)

3 existing lines in 1 file now uncovered.

3420 of 3959 relevant lines covered (86.39%)

1490.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.18
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
using System.Collections.Concurrent;
2

3
namespace NetDaemon.Client.Internal;
4

5
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
6
{
7
    #region -- private declarations -
8

9
    private volatile bool _isDisposed;
10

11
    private readonly ILogger<IHomeAssistantConnection> _logger;
12
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
13
    private readonly IHomeAssistantApiManager _apiManager;
14
    private readonly ResultMessageHandler _resultMessageHandler;
15
    private readonly CancellationTokenSource _internalCancelSource = new();
39✔
16

17
    private readonly Subject<HassMessage> _hassMessageSubject = new();
39✔
18
    private readonly ConcurrentDictionary<int, TaskCompletionSource<HassMessage>> _pendingResults = new();
39✔
19
    private readonly Task _handleNewMessagesTask;
20

21
    private const int WaitForResultTimeout = 20000;
22

23
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
39✔
24
    private int _messageId = 1;
39✔
25
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
26

27
    #endregion
28

29
    /// <summary>
30
    ///     Default constructor
31
    /// </summary>
32
    /// <param name="logger">A logger instance</param>
33
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
34
    /// <param name="apiManager">The api manager</param>
35
    public HomeAssistantConnection(
39✔
36
        ILogger<IHomeAssistantConnection> logger,
39✔
37
        IWebSocketClientTransportPipeline pipeline,
39✔
38
        IHomeAssistantApiManager apiManager
39✔
39
    )
39✔
40
    {
41
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
39!
42
        _apiManager = apiManager;
39✔
43
        _logger = logger;
39✔
44

45
        _resultMessageHandler = new ResultMessageHandler(_logger, TimeProvider.System);
39✔
46

47
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
48
        // to all events. If people wants that they can provide a "*" type and get the same thing
49
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
39✔
50
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
50✔
51

52
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
39✔
53
            throw new ApplicationException(
1✔
54
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
55

56
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
76✔
57
            TaskCreationOptions.LongRunning);
38✔
58
    }
38✔
59

60
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
61
        CancellationToken cancelToken)
62
    {
63
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
64
        if (string.IsNullOrEmpty(eventType))
31✔
65
            return await _lazyAllEventsObservable.Value;
28✔
66

67
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
68
    }
31✔
69

70
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
71
        CancellationToken cancelToken)
72
    {
73
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
14✔
74
            cancelToken,
14✔
75
            _internalCancelSource.Token
14✔
76
        );
14✔
77

78
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
14✔
79
            combinedTokenSource.Token).ConfigureAwait(false);
14✔
80

81
        // The id if the message we used to subscribe should be used as the filter for the event messages
82
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
1,491!
83
            .Select(n => n.Event!);
924✔
84

85
        return observableResult;
14✔
86
    }
14✔
87

88
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
89
    {
90
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
12✔
91
            cancelToken,
12✔
92
            _internalCancelSource.Token
12✔
93
        );
12✔
94

95
        // Just wait for token source (internal och provided one)
96
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
12✔
97
    }
×
98

99
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
100
    {
101
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
19✔
102
        _resultMessageHandler.HandleResult(returnMessageTask, command);
18✔
103
    }
18✔
104

105
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
106
        where T : CommandMessage
107
    {
108
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
118✔
109

110
        return result is not null ? result.Value.Deserialize<TResult>() : default;
117✔
111
    }
117✔
112

113
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
114
        where T : CommandMessage
115
    {
116
        var hassMessage =
123✔
117
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
123✔
118

119
        // The SendCommandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
120
        return hassMessage?.ResultElement;
122!
121
    }
122✔
122

123
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
124
        CancellationToken cancelToken)
125
        where T : CommandMessage
126
    {
127
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
144✔
128
        var result = await resultMessageTask.WaitAsync(TimeSpan.FromMilliseconds(WaitForResultTimeout), cancelToken);
143✔
129

130
        if (result.Success ?? false)
142✔
131
            return result;
141✔
132

133
        // Non successful command should throw exception
134
        throw new InvalidOperationException(
1✔
135
            $"Failed command ({command.Type}) error: {result.Error}.  Sent command is {command.ToJsonElement()}");
1✔
136
    }
141✔
137

138
    public async ValueTask DisposeAsync()
139
    {
140
        if (_isDisposed) return;
40✔
141
        _isDisposed = true;
38✔
142

143
        try
144
        {
145
            await CloseAsync().ConfigureAwait(false);
38✔
146
        }
37✔
147
        catch (Exception e)
1✔
148
        {
149
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
1✔
150
        }
1✔
151

152
        if (!_internalCancelSource.IsCancellationRequested)
38✔
153
            await _internalCancelSource.CancelAsync();
25✔
154
        CancelPendingResults();
38✔
155

156
        // Gracefully wait for task or timeout
157
        await Task.WhenAny(
38✔
158
            _handleNewMessagesTask,
38✔
159
            Task.Delay(5000)
38✔
160
        ).ConfigureAwait(false);
38✔
161

162
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
38✔
163
        _internalCancelSource.Dispose();
38✔
164
        _hassMessageSubject.Dispose();
38✔
165
        await _resultMessageHandler.DisposeAsync();
38✔
166
        _messageIdSemaphore.Dispose();
38✔
167
    }
39✔
168

169
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
170
    {
171
        ObjectDisposedException.ThrowIf(_isDisposed, this);
34✔
172
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
33✔
173
    }
174

175
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
176
    {
177
        ObjectDisposedException.ThrowIf(_isDisposed, this);
2✔
178
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
179
    }
180

181
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
2✔
182

183
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
184
    {
185
        ObjectDisposedException.ThrowIf(_isDisposed, this);
163✔
186

187
        // The semaphore can fail to be taken in rare cases so we need
188
        // to keep this out of the try/finally block so it will not be released
189
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
162✔
190
        try
191
        {
192
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
193
            // increasing the messageId and Sending the message
194
            command.Id = ++_messageId;
162✔
195

196
            // Complete result waiters directly from the receive loop instead of creating
197
            // one filtered Rx subscription per command.
198
            var resultEvent = new TaskCompletionSource<HassMessage>(
162✔
199
                TaskCreationOptions.RunContinuationsAsynchronously);
162✔
200
            if (!_pendingResults.TryAdd(command.Id, resultEvent))
162!
NEW
201
                throw new InvalidOperationException($"A command with id {command.Id} is already pending");
×
202

203
            try
204
            {
205
                await _transportPipeline.SendMessageAsync(command, cancelToken);
162✔
206
            }
161✔
NEW
207
            catch (OperationCanceledException)
×
208
            {
NEW
209
                if (_pendingResults.TryRemove(command.Id, out var pendingResult))
×
NEW
210
                    pendingResult.TrySetCanceled(cancelToken);
×
NEW
211
                throw;
×
212
            }
213
            catch (Exception e)
1✔
214
            {
215
                if (_pendingResults.TryRemove(command.Id, out var pendingResult))
1✔
216
                    pendingResult.TrySetException(e);
1✔
217
                throw;
1✔
218
            }
219

220
            return resultEvent.Task;
161✔
221
        }
222
        finally
223
        {
224
            _messageIdSemaphore.Release();
162✔
225
        }
226
    }
161✔
227

228
    private async Task HandleNewMessages()
229
    {
230
        try
231
        {
232
            while (!_internalCancelSource.IsCancellationRequested)
235✔
233
            {
234
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
234✔
235
                    .ConfigureAwait(false);
234✔
236
                try
237
                {
238
                    foreach (var obj in msg)
988✔
239
                    {
240
                        HandleIncomingMessage(obj);
297✔
241
                    }
242
                }
197✔
243
                catch (Exception e)
×
244
                {
245
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
246
                }
×
247
            }
248
        }
1✔
249
        catch (OperationCanceledException)
8✔
250
        {
251
            // Normal case just exit
252
        }
8✔
253
        finally
254
        {
255
            _logger.LogTrace("Stop processing new messages");
30✔
256
            CancelPendingResults();
30✔
257
            // make sure we always cancel any blocking operations
258
            if (!_internalCancelSource.IsCancellationRequested)
30✔
259
                await _internalCancelSource.CancelAsync();
13✔
260
        }
261
    }
9✔
262

263
    private void HandleIncomingMessage(HassMessage message)
264
    {
265
        if (message.Type == "result" && _pendingResults.TryRemove(message.Id, out var completionSource))
297✔
266
            completionSource.TrySetResult(message);
158✔
267

268
        _hassMessageSubject.OnNext(message);
297✔
269
    }
297✔
270

271
    private void CancelPendingResults()
272
    {
273
        foreach (var pendingResult in _pendingResults)
142✔
274
        {
275
            if (_pendingResults.TryRemove(pendingResult.Key, out var completionSource))
3✔
276
                completionSource.TrySetCanceled();
3✔
277
        }
278
    }
68✔
279

280
    private Task CloseAsync()
281
    {
282
        return _transportPipeline.CloseAsync();
38✔
283
    }
284
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc