• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 6727508659

29 Oct 2023 10:48AM UTC coverage: 79.505%. Remained the same
6727508659

push

github

web-flow
On release created now activates a pre-release push to docker and nuget (#980)

* On release created now activates a pre-release push to docker and nuget

* removed bad comment

803 of 1143 branches covered (0.0%)

Branch coverage included in aggregate %.

2894 of 3507 relevant lines covered (82.52%)

50.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.77
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
namespace NetDaemon.Client.Internal;
2

3
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
4
{
5
    #region -- private declarations -
6

7
    private readonly ILogger<IHomeAssistantConnection> _logger;
8
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
9
    private readonly IHomeAssistantApiManager _apiManager;
10
    private readonly ResultMessageHandler _resultMessageHandler;
11
    private readonly CancellationTokenSource _internalCancelSource = new();
20✔
12

13
    private readonly Subject<HassMessage> _hassMessageSubject = new();
20✔
14
    private readonly Task _handleNewMessagesTask;
15

16
    private const int WaitForResultTimeout = 20000;
17

18
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
20✔
19
    private int _messageId = 1;
20✔
20
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
21

22
    #endregion
23

24
    /// <summary>
25
    ///     Default constructor
26
    /// </summary>
27
    /// <param name="logger">A logger instance</param>
28
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
29
    /// <param name="apiManager">The api manager</param>
30
    public HomeAssistantConnection(
20✔
31
        ILogger<IHomeAssistantConnection> logger,
20✔
32
        IWebSocketClientTransportPipeline pipeline,
20✔
33
        IHomeAssistantApiManager apiManager
20✔
34
    )
20✔
35
    {
36
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
20!
37
        _apiManager = apiManager;
20✔
38
        _logger = logger;
20✔
39

40
        _resultMessageHandler = new ResultMessageHandler(_logger);
20✔
41

42
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
43
        // to all events. If people wants that they can provide a "*" type and get the same thing
44
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
20✔
45
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
23✔
46

47
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
20✔
48
            throw new ApplicationException(
1✔
49
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
50

51
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
38✔
52
            TaskCreationOptions.LongRunning);
19✔
53
    }
19✔
54

55
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
56
        CancellationToken cancelToken)
57
    {
58
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
59
        if (string.IsNullOrEmpty(eventType))
7✔
60
            return await _lazyAllEventsObservable.Value;
4✔
61

62
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
63
    }
7✔
64

65
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
66
        CancellationToken cancelToken)
67
    {
68
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
6✔
69
            cancelToken,
6✔
70
            _internalCancelSource.Token
6✔
71
        );
6✔
72

73
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
6✔
74
            combinedTokenSource.Token).ConfigureAwait(false);
6✔
75

76
        // The id if the message we used to subscribe should be used as the filter for the event messages
77
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
13!
78
            .Select(n => n.Event!);
12✔
79

80
        return observableResult;
6✔
81
    }
6✔
82

83
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
84
    {
85
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
4✔
86
            cancelToken,
4✔
87
            _internalCancelSource.Token
4✔
88
        );
4✔
89

90
        // Just wait for token source (internal och provided one)
91
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
4✔
92
    }
×
93

94
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
95
    {
96
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
2✔
97
        _resultMessageHandler.HandleResult(returnMessageTask, command);
2✔
98
    }
2✔
99

100
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
101
        where T : CommandMessage
102
    {
103
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
28✔
104

105
        return result is not null ? result.Value.Deserialize<TResult>() : default;
27✔
106
    }
27✔
107

108
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
109
        where T : CommandMessage
110
    {
111
        var hassMessage =
29✔
112
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
29✔
113

114
        // The SendCommmandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
115
        return hassMessage?.ResultElement;
28!
116
    }
28✔
117

118
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
119
        CancellationToken cancelToken)
120
        where T : CommandMessage
121
    {
122
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
35✔
123
        var result = await resultMessageTask.WaitAsync(TimeSpan.FromMilliseconds(WaitForResultTimeout), cancelToken);
35✔
124

125
        if (result.Success ?? false)
35✔
126
            return result;
34✔
127

128
        // Non successful command should throw exception
129
        throw new InvalidOperationException(
1✔
130
            $"Failed command ({command.Type}) error: {result.Error}.  Sent command is {command.ToJsonElement()}");
1✔
131
    }
34✔
132

133
    public async ValueTask DisposeAsync()
134
    {
135
        try
136
        {
137
            await CloseAsync().ConfigureAwait(false);
19✔
138
        }
19✔
139
        catch (Exception e)
×
140
        {
141
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
×
142
        }
×
143

144
        if (!_internalCancelSource.IsCancellationRequested)
19✔
145
            await _internalCancelSource.CancelAsync();
13✔
146

147
        // Gracefully wait for task or timeout
148
        await Task.WhenAny(
19✔
149
            _handleNewMessagesTask,
19✔
150
            Task.Delay(5000)
19✔
151
        ).ConfigureAwait(false);
19✔
152

153
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
19✔
154
        _internalCancelSource.Dispose();
19✔
155
        _hassMessageSubject.Dispose();
19✔
156
        await _resultMessageHandler.DisposeAsync();
19✔
157
        _messageIdSemaphore.Dispose();
19✔
158
    }
19✔
159

160
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
161
    {
162
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
1✔
163
    }
164

165
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
166
    {
167
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
168
    }
169

170
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
1✔
171

172
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
173
    {
174
        // The semaphore can fail to be taken in rare cases so we need
175
        // to keep this out of the try/finally block so it will not be released
176
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
37✔
177
        try
178
        {
179
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
180
            // increasing the messageId and Sending the message
181
            command.Id = ++_messageId;
37✔
182

183
            // We make a task that subscribe for the return result message
184
            // this task will be returned and handled by caller
185
            var resultEvent = _hassMessageSubject
37✔
186
                .Where(n => n.Type == "result" && n.Id == command.Id)
38✔
187
                .FirstAsync().ToTask(CancellationToken.None);
37✔
188
            // We dont want to pass the incoming CancellationToken here because it will throw a TaskCanceledException
189
            // when calling services from an Apps Dispose(Async) and hide possible actual exceptions
190

191
            await _transportPipeline.SendMessageAsync(command, cancelToken);
37✔
192

193
            return resultEvent;
37✔
194
        }
195
        finally
196
        {
197
            _messageIdSemaphore.Release();
37✔
198
        }
199
    }
37✔
200

201
    private async Task HandleNewMessages()
202
    {
203
        try
204
        {
205
            while (!_internalCancelSource.IsCancellationRequested)
61✔
206
            {
207
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
55✔
208
                    .ConfigureAwait(false);
55✔
209
                try
210
                {
211
                    foreach (var obj in msg)
168✔
212
                    {
213
                        _hassMessageSubject.OnNext(obj);
42✔
214
                    }
215
                }
42✔
216
                catch (Exception e)
×
217
                {
218
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
219
                }
×
220
            }
221
        }
6✔
222
        catch (OperationCanceledException)
×
223
        {
224
            // Normal case just exit
225
        }
×
226
        finally
227
        {
228
            _logger.LogTrace("Stop processing new messages");
19✔
229
            // make sure we always cancel any blocking operations
230
            if (!_internalCancelSource.IsCancellationRequested)
19✔
231
                await _internalCancelSource.CancelAsync();
6✔
232
        }
233
    }
6✔
234

235
    private Task CloseAsync()
236
    {
237
        return _transportPipeline.CloseAsync();
19✔
238
    }
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc