• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 5773335806

pending completion
5773335806

push

github

web-flow
Add media player to default meta (#903)

* Added media_player to default metadata and deprecated attribute base classes

* Spelling error

* Suppress obsolete warnings in our own code :)

* Fix tests to ignore obsolete warnings.

791 of 1112 branches covered (71.13%)

Branch coverage included in aggregate %.

3 of 3 new or added lines in 1 file covered. (100.0%)

3710 of 4342 relevant lines covered (85.44%)

48.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.63
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
namespace NetDaemon.Client.Internal;
2

3
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
4
{
5
    #region -- private declarations -
6

7
    private readonly ILogger<IHomeAssistantConnection> _logger;
8
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
9
    private readonly IHomeAssistantApiManager _apiManager;
10
    private readonly ResultMessageHandler _resultMessageHandler;
11
    private readonly CancellationTokenSource _internalCancelSource = new();
20✔
12

13
    private readonly Subject<HassMessage> _hassMessageSubject = new();
20✔
14
    private readonly Task _handleNewMessagesTask;
15

16
    private const int WaitForResultTimeout = 20000;
17

18
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
20✔
19
    private int _messageId = 1;
20✔
20
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
21

22
    #endregion
23
    
24
    /// <summary>
25
    ///     Default constructor
26
    /// </summary>
27
    /// <param name="logger">A logger instance</param>
28
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
29
    /// <param name="apiManager">The api manager</param>
30
    /// <param name="resultMessageHandler">Handler for result message</param>
31
    public HomeAssistantConnection(
20✔
32
        ILogger<IHomeAssistantConnection> logger,
20✔
33
        IWebSocketClientTransportPipeline pipeline,
20✔
34
        IHomeAssistantApiManager apiManager
20✔
35
    )
20✔
36
    {
20✔
37
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
20!
38
        _apiManager = apiManager;
20✔
39
        _logger = logger;
20✔
40

41
        _resultMessageHandler = new ResultMessageHandler(_logger);
20✔
42

43
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
44
        // to all events. If people wants that they can provide a "*" type and get the same thing
45
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
20✔
46
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
23✔
47
            
48
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
20✔
49
            throw new ApplicationException(
1✔
50
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
51

52
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
38✔
53
            TaskCreationOptions.LongRunning);
19✔
54
    }
19✔
55

56
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
57
        CancellationToken cancelToken)
58
    {
7✔
59
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
60
        if (string.IsNullOrEmpty(eventType))
7✔
61
            return await _lazyAllEventsObservable.Value;
4✔
62

63
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
64
    }
7✔
65

66
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
67
        CancellationToken cancelToken)
68
    {
6✔
69
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
6✔
70
            cancelToken,
6✔
71
            _internalCancelSource.Token
6✔
72
        );
6✔
73

74
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
6✔
75
            combinedTokenSource.Token).ConfigureAwait(false);
6✔
76

77
        // The id if the message we used to subscribe should be used as the filter for the event messages
78
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
13!
79
            .Select(n => n.Event!);
12✔
80
        
81
        return observableResult;
6✔
82
    }
6✔
83
    
84
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
85
    {
4✔
86
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
4✔
87
            cancelToken,
4✔
88
            _internalCancelSource.Token
4✔
89
        );
4✔
90

91
        // Just wait for token source (internal och provided one)
92
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
4✔
93
    }
×
94

95
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
96
    {
2✔
97
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
2✔
98
        _resultMessageHandler.HandleResult(returnMessageTask, command);
2✔
99
    }
2✔
100

101
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
102
        where T : CommandMessage
103
    {
28✔
104
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
28✔
105

106
        return result is not null ? result.Value.Deserialize<TResult>() : default;
27✔
107
    }
27✔
108

109
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
110
        where T : CommandMessage
111
    {
29✔
112
        var hassMessage =
29✔
113
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
29✔
114

115
        // The SendCommmandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
116
        return hassMessage?.ResultElement;
28!
117
    }
28✔
118

119
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
120
        CancellationToken cancelToken)
121
        where T : CommandMessage
122
    {
35✔
123
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
35✔
124

125
        var awaitedTask = await Task.WhenAny(resultMessageTask, Task.Delay(WaitForResultTimeout, cancelToken));
35✔
126

127
        if (awaitedTask != resultMessageTask)
35!
128
            // We have a timeout
129
            throw new InvalidOperationException(
×
130
                $"Send command ({command.Type}) did not get response in timely fashion. Sent command is {command.ToJsonElement()}");
×
131

132
        if (resultMessageTask.Result.Success ?? false)
35✔
133
            return resultMessageTask.Result;
34✔
134

135
        // Non successful command should throw exception
136
        throw new InvalidOperationException(
1✔
137
            $"Failed command ({command.Type}) error: {resultMessageTask.Result.Error}.  Sent command is {command.ToJsonElement()}");
1✔
138
    }
34✔
139

140
    public async ValueTask DisposeAsync()
141
    {
19✔
142
        try
143
        {
19✔
144
            await CloseAsync().ConfigureAwait(false);
19✔
145
        }
19✔
146
        catch (Exception e)
×
147
        {
×
148
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
×
149
        }
×
150

151
        if (!_internalCancelSource.IsCancellationRequested)
19✔
152
            _internalCancelSource.Cancel();
3✔
153

154
        // Gracefully wait for task or timeout
155
        await Task.WhenAny(
19✔
156
            _handleNewMessagesTask,
19✔
157
            Task.Delay(5000)
19✔
158
        ).ConfigureAwait(false);
19✔
159

160
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
19✔
161
        _internalCancelSource.Dispose();
19✔
162
    }
19✔
163

164
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
165
    {
1✔
166
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
1✔
167
    }
1✔
168

169
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
170
    {
1✔
171
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
172
    }
1✔
173

174
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
1✔
175

176
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
177
    {
37✔
178
        // The semaphore can fail to be taken in rare cases so we need
179
        // to keep this out of the try/finally block so it will not be released
180
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
37✔
181
        try
182
        {
37✔
183
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
184
            // increasing the messageId and Sending the message
185
            command.Id = ++_messageId;
37✔
186

187
            // We make a task that subscribe for the return result message
188
            // this task will be returned and handled by caller
189
            var resultEvent = _hassMessageSubject
37✔
190
                .Where(n => n.Type == "result" && n.Id == command.Id)
38✔
191
                .FirstAsync().ToTask(CancellationToken.None);
37✔
192
            // We dont want to pass the incoming CancellationToken here because it will throw a TaskCanceledException
193
            // when calling services from an Apps Dispose(Async) and hide possible actual exceptions
194

195
            await _transportPipeline.SendMessageAsync(command, cancelToken);
37✔
196

197
            return resultEvent;
37✔
198
        }
199
        finally
200
        {
37✔
201
            _messageIdSemaphore.Release();
37✔
202
        }
37✔
203
    }
37✔
204
    
205
    private async Task HandleNewMessages()
206
    {
19✔
207
        try
208
        {
19✔
209
            while (!_internalCancelSource.IsCancellationRequested)
61!
210
            {
61✔
211
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
61✔
212
                    .ConfigureAwait(false);
61✔
213
                try
214
                {
42✔
215
                    foreach (var obj in msg)
210✔
216
                    {
42✔
217
                        _hassMessageSubject.OnNext(obj);
42✔
218
                    }
42✔
219
                }
42✔
220
                catch (Exception e)
×
221
                {
×
222
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
223
                }
×
224
            }
42✔
225
        }
×
226
        catch (OperationCanceledException)
4✔
227
        {
4✔
228
            // Normal case just exit
229
        }
4✔
230
        finally
231
        {
19✔
232
            _logger.LogTrace("Stop processing new messages");
19✔
233
            // make sure we always cancel any blocking operations
234
            if (!_internalCancelSource.IsCancellationRequested)
19✔
235
                _internalCancelSource.Cancel();
16✔
236
        }
19✔
237
    }
4✔
238

239
    private Task CloseAsync()
240
    {
19✔
241
        return _transportPipeline.CloseAsync();
19✔
242
    }
19✔
243
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc