• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 6674202581

27 Oct 2023 02:42PM UTC coverage: 80.663% (-0.04%) from 80.706%
6674202581

push

github

web-flow
Set CurrentConnection to null before submitting OnDisconnected event (#962)

* Set CurrentConnection to null before submitting OnDisconnected event

* Mark tests a Flaky

804 of 1145 branches covered (0.0%)

Branch coverage included in aggregate %.

20 of 20 new or added lines in 1 file covered. (100.0%)

2946 of 3504 relevant lines covered (84.08%)

50.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.64
/src/Client/NetDaemon.HassClient/Internal/HomeAssistantConnection.cs
1
namespace NetDaemon.Client.Internal;
2

3
internal class HomeAssistantConnection : IHomeAssistantConnection, IHomeAssistantHassMessages
4
{
5
    #region -- private declarations -
6

7
    private readonly ILogger<IHomeAssistantConnection> _logger;
8
    private readonly IWebSocketClientTransportPipeline _transportPipeline;
9
    private readonly IHomeAssistantApiManager _apiManager;
10
    private readonly ResultMessageHandler _resultMessageHandler;
11
    private readonly CancellationTokenSource _internalCancelSource = new();
20✔
12

13
    private readonly Subject<HassMessage> _hassMessageSubject = new();
20✔
14
    private readonly Task _handleNewMessagesTask;
15

16
    private const int WaitForResultTimeout = 20000;
17

18
    private readonly SemaphoreSlim _messageIdSemaphore = new(1, 1);
20✔
19
    private int _messageId = 1;
20✔
20
    private readonly AsyncLazy<IObservable<HassEvent>> _lazyAllEventsObservable;
21

22
    #endregion
23

24
    /// <summary>
25
    ///     Default constructor
26
    /// </summary>
27
    /// <param name="logger">A logger instance</param>
28
    /// <param name="pipeline">The pipeline to use for websocket communication</param>
29
    /// <param name="apiManager">The api manager</param>
30
    public HomeAssistantConnection(
20✔
31
        ILogger<IHomeAssistantConnection> logger,
20✔
32
        IWebSocketClientTransportPipeline pipeline,
20✔
33
        IHomeAssistantApiManager apiManager
20✔
34
    )
20✔
35
    {
36
        _transportPipeline = pipeline ?? throw new ArgumentNullException(nameof(pipeline));
20!
37
        _apiManager = apiManager;
20✔
38
        _logger = logger;
20✔
39

40
        _resultMessageHandler = new ResultMessageHandler(_logger);
20✔
41

42
        // We lazily cache same observable for all events. There are no reason we should use multiple subscriptions
43
        // to all events. If people wants that they can provide a "*" type and get the same thing
44
        _lazyAllEventsObservable = new AsyncLazy<IObservable<HassEvent>>(async Task<IObservable<HassEvent>>() =>
20✔
45
            await SubscribeToHomeAssistantEventsInternalAsync(null, _internalCancelSource.Token));
23✔
46

47
        if (_transportPipeline.WebSocketState != WebSocketState.Open)
20✔
48
            throw new ApplicationException(
1✔
49
                $"Expected WebSocket state 'Open' got '{_transportPipeline.WebSocketState}'");
1✔
50

51
        _handleNewMessagesTask = Task.Factory.StartNew(async () => await HandleNewMessages().ConfigureAwait(false),
38✔
52
            TaskCreationOptions.LongRunning);
19✔
53
    }
19✔
54

55
    public async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsAsync(string? eventType,
56
        CancellationToken cancelToken)
57
    {
58
        // When subscribe all events, optimize using the same IObservable<HassEvent> if we subscribe multiple times
59
        if (string.IsNullOrEmpty(eventType))
7✔
60
            return await _lazyAllEventsObservable.Value;
4✔
61

62
        return await SubscribeToHomeAssistantEventsInternalAsync(eventType, cancelToken);
3✔
63
    }
7✔
64

65
    private async Task<IObservable<HassEvent>> SubscribeToHomeAssistantEventsInternalAsync(string? eventType,
66
        CancellationToken cancelToken)
67
    {
68
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
6✔
69
            cancelToken,
6✔
70
            _internalCancelSource.Token
6✔
71
        );
6✔
72

73
        var result = await SendCommandAndReturnHassMessageResponseAsync(new SubscribeEventCommand(),
6✔
74
            combinedTokenSource.Token).ConfigureAwait(false);
6✔
75

76
        // The id if the message we used to subscribe should be used as the filter for the event messages
77
        var observableResult = _hassMessageSubject.Where(n => n.Type == "event" && n.Id == result?.Id)
13!
78
            .Select(n => n.Event!);
12✔
79

80
        return observableResult;
6✔
81
    }
6✔
82

83
    public async Task WaitForConnectionToCloseAsync(CancellationToken cancelToken)
84
    {
85
        var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
4✔
86
            cancelToken,
4✔
87
            _internalCancelSource.Token
4✔
88
        );
4✔
89

90
        // Just wait for token source (internal och provided one)
91
        await combinedTokenSource.Token.AsTask().ConfigureAwait(false);
4✔
92
    }
×
93

94
    public async Task SendCommandAsync<T>(T command, CancellationToken cancelToken) where T : CommandMessage
95
    {
96
        var returnMessageTask = await SendCommandAsyncInternal(command, cancelToken);
2✔
97
        _resultMessageHandler.HandleResult(returnMessageTask, command);
2✔
98
    }
2✔
99

100
    public async Task<TResult?> SendCommandAndReturnResponseAsync<T, TResult>(T command, CancellationToken cancelToken)
101
        where T : CommandMessage
102
    {
103
        var result = await SendCommandAndReturnResponseRawAsync(command, cancelToken).ConfigureAwait(false);
28✔
104

105
        return result is not null ? result.Value.Deserialize<TResult>() : default;
27✔
106
    }
27✔
107

108
    public async Task<JsonElement?> SendCommandAndReturnResponseRawAsync<T>(T command, CancellationToken cancelToken)
109
        where T : CommandMessage
110
    {
111
        var hassMessage =
29✔
112
            await SendCommandAndReturnHassMessageResponseAsync(command, cancelToken).ConfigureAwait(false);
29✔
113

114
        // The SendCommmandsAndReturnHAssMessageResponse will throw if not successful so just ignore errors here
115
        return hassMessage?.ResultElement;
28!
116
    }
28✔
117

118
    public async Task<HassMessage?> SendCommandAndReturnHassMessageResponseAsync<T>(T command,
119
        CancellationToken cancelToken)
120
        where T : CommandMessage
121
    {
122
        var resultMessageTask = await SendCommandAsyncInternal(command, cancelToken);
35✔
123

124
        var awaitedTask = await Task.WhenAny(resultMessageTask, Task.Delay(WaitForResultTimeout, cancelToken));
35✔
125

126
        if (awaitedTask != resultMessageTask)
35!
127
            // We have a timeout
128
            throw new InvalidOperationException(
×
129
                $"Send command ({command.Type}) did not get response in timely fashion. Sent command is {command.ToJsonElement()}");
×
130

131
        if (resultMessageTask.Result.Success ?? false)
35✔
132
            return resultMessageTask.Result;
34✔
133

134
        // Non successful command should throw exception
135
        throw new InvalidOperationException(
1✔
136
            $"Failed command ({command.Type}) error: {resultMessageTask.Result.Error}.  Sent command is {command.ToJsonElement()}");
1✔
137
    }
34✔
138

139
    public async ValueTask DisposeAsync()
140
    {
141
        try
142
        {
143
            await CloseAsync().ConfigureAwait(false);
19✔
144
        }
19✔
145
        catch (Exception e)
×
146
        {
147
            _logger.LogDebug(e, "Failed to close HomeAssistantConnection");
×
148
        }
×
149

150
        if (!_internalCancelSource.IsCancellationRequested)
19✔
151
            _internalCancelSource.Cancel();
2✔
152

153
        // Gracefully wait for task or timeout
154
        await Task.WhenAny(
19✔
155
            _handleNewMessagesTask,
19✔
156
            Task.Delay(5000)
19✔
157
        ).ConfigureAwait(false);
19✔
158

159
        await _transportPipeline.DisposeAsync().ConfigureAwait(false);
19✔
160
        _internalCancelSource.Dispose();
19✔
161
    }
19✔
162

163
    public Task<T?> GetApiCallAsync<T>(string apiPath, CancellationToken cancelToken)
164
    {
165
        return _apiManager.GetApiCallAsync<T>(apiPath, cancelToken);
1✔
166
    }
167

168
    public Task<T?> PostApiCallAsync<T>(string apiPath, CancellationToken cancelToken, object? data = null)
169
    {
170
        return _apiManager.PostApiCallAsync<T>(apiPath, cancelToken, data);
1✔
171
    }
172

173
    public IObservable<HassMessage> OnHassMessage => _hassMessageSubject;
1✔
174

175
    private async Task<Task<HassMessage>> SendCommandAsyncInternal<T>(T command, CancellationToken cancelToken) where T : CommandMessage
176
    {
177
        // The semaphore can fail to be taken in rare cases so we need
178
        // to keep this out of the try/finally block so it will not be released
179
        await _messageIdSemaphore.WaitAsync(cancelToken).ConfigureAwait(false);
37✔
180
        try
181
        {
182
            // We need to make sure messages to HA are send with increasing Ids therefore we need to synchronize
183
            // increasing the messageId and Sending the message
184
            command.Id = ++_messageId;
37✔
185

186
            // We make a task that subscribe for the return result message
187
            // this task will be returned and handled by caller
188
            var resultEvent = _hassMessageSubject
37✔
189
                .Where(n => n.Type == "result" && n.Id == command.Id)
38✔
190
                .FirstAsync().ToTask(CancellationToken.None);
37✔
191
            // We dont want to pass the incoming CancellationToken here because it will throw a TaskCanceledException
192
            // when calling services from an Apps Dispose(Async) and hide possible actual exceptions
193

194
            await _transportPipeline.SendMessageAsync(command, cancelToken);
37✔
195

196
            return resultEvent;
37✔
197
        }
198
        finally
199
        {
200
            _messageIdSemaphore.Release();
37✔
201
        }
202
    }
37✔
203

204
    private async Task HandleNewMessages()
205
    {
206
        try
207
        {
208
            while (!_internalCancelSource.IsCancellationRequested)
61!
209
            {
210
                var msg = await _transportPipeline.GetNextMessagesAsync<HassMessage>(_internalCancelSource.Token)
61✔
211
                    .ConfigureAwait(false);
61✔
212
                try
213
                {
214
                    foreach (var obj in msg)
168✔
215
                    {
216
                        _hassMessageSubject.OnNext(obj);
42✔
217
                    }
218
                }
42✔
219
                catch (Exception e)
×
220
                {
221
                    _logger.LogError(e, "Failed processing new message from Home Assistant");
×
222
                }
×
223
            }
224
        }
×
225
        catch (OperationCanceledException)
2✔
226
        {
227
            // Normal case just exit
228
        }
2✔
229
        finally
230
        {
231
            _logger.LogTrace("Stop processing new messages");
19✔
232
            // make sure we always cancel any blocking operations
233
            if (!_internalCancelSource.IsCancellationRequested)
19✔
234
                _internalCancelSource.Cancel();
17✔
235
        }
236
    }
2✔
237

238
    private Task CloseAsync()
239
    {
240
        return _transportPipeline.CloseAsync();
19✔
241
    }
242
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc