• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

net-daemon / netdaemon / 5914873856

09 Aug 2023 06:45PM UTC coverage: 81.685% (-0.5%) from 82.144%
5914873856

push

github

web-flow
Merge pull request #908

* Generate docs for all packagable projects and resolved some issues

* Update build to do a Release build

* github ....

* Removed warnaserror for now

* Merge all code coverage by outputting 2 files

* chore: Code coverage in build summary

* chore: Wrong file, please stash this pr

792 of 1114 branches covered (71.1%)

Branch coverage included in aggregate %.

2901 of 3407 relevant lines covered (85.15%)

46.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.36
/src/Client/NetDaemon.HassClient/Internal/Net/WebSocketTransportPipeline.cs
1
namespace NetDaemon.Client.Internal.Net;
2

3
internal class WebSocketClientTransportPipeline : IWebSocketClientTransportPipeline
4
{
5
    /// <summary>
6
    ///     Default Json serialization options, Hass expects intended
7
    /// </summary>
8
    private readonly JsonSerializerOptions _defaultSerializerOptions = new()
29✔
9
    {
29✔
10
        WriteIndented = false,
29✔
11
        DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull
29✔
12
    };
29✔
13

14
    private readonly CancellationTokenSource _internalCancelSource = new();
29✔
15
    private readonly Pipe _pipe = new();
29✔
16
    private readonly IWebSocketClient _ws;
17

18
    public WebSocketClientTransportPipeline(IWebSocketClient clientWebSocket)
29✔
19
    {
20
        _ws = clientWebSocket ?? throw new ArgumentNullException(nameof(clientWebSocket));
29!
21
    }
29✔
22

23
    private static int DefaultTimeOut => 5000;
43✔
24

25
    public WebSocketState WebSocketState => _ws.State;
19✔
26

27
    public async Task CloseAsync()
28
    {
29
        await SendCorrectCloseFrameToRemoteWebSocket().ConfigureAwait(false);
21✔
30
    }
21✔
31

32
    public async ValueTask DisposeAsync()
33
    {
34
        try
35
        {
36
            // In case we are just "disposing" without disconnect first
37
            // we call the close and fail silently if so
38
            await SendCorrectCloseFrameToRemoteWebSocket().ConfigureAwait(false);
21✔
39
        }
20✔
40
        catch
1✔
41
        {
42
            // Ignore all error in dispose
43
        }
1✔
44

45
        await _ws.DisposeAsync().ConfigureAwait(false);
21✔
46
    }
21✔
47

48
    public async ValueTask<T[]> GetNextMessagesAsync<T>(CancellationToken cancelToken) where T : class
49
    {
50
        if (_ws.State != WebSocketState.Open)
105✔
51
            throw new ApplicationException("Cannot send data on a closed socket!");
18✔
52

53
        using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
87✔
54
            _internalCancelSource.Token,
87✔
55
            cancelToken
87✔
56
        );
87✔
57
        try
58
        {
59
            // First we start the serialization task that will process
60
            // the pipeline for new data written from websocket input 
61
            // We want the processing to start before we read data
62
            // from the websocket so the pipeline is not getting full
63
            var serializeTask = ReadMessagesFromPipelineAndSerializeAsync<T>(combinedTokenSource.Token);
87✔
64
            await ReadMessageFromWebSocketAndWriteToPipelineAsync(combinedTokenSource.Token).ConfigureAwait(false);
87✔
65
            var result = await serializeTask.ConfigureAwait(false);
87✔
66
            // File.WriteAllText("./json_result.json", JsonSerializer.Serialize<T>(result, _defaultSerializerOptions));
67
            // We need to make sure the serialize task is finished before we throw the exception
68
            combinedTokenSource.Token.ThrowIfCancellationRequested();
84✔
69
            return result;
84✔
70
        }
71
        finally
72
        {
73
            _pipe.Reset();
87✔
74
        }
75
    }
84✔
76

77
    public Task SendMessageAsync<T>(T message, CancellationToken cancelToken) where T : class
78
    {
79
        if (cancelToken.IsCancellationRequested || _ws.State != WebSocketState.Open || _ws.CloseStatus.HasValue)
59✔
80
            throw new ApplicationException("Sending message on closed socket!");
1✔
81

82
        using var combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(
58✔
83
            _internalCancelSource.Token,
58✔
84
            cancelToken
58✔
85
        );
58✔
86

87
        var result = JsonSerializer.SerializeToUtf8Bytes(message, message.GetType(),
58✔
88
            _defaultSerializerOptions);
58✔
89

90
        return _ws.SendAsync(result, WebSocketMessageType.Text, true, combinedTokenSource.Token);
58✔
91
    }
58✔
92

93
    /// <summary>
94
    ///     Continuously reads the data from the pipe and serialize to object
95
    ///     from the json that are read
96
    /// </summary>
97
    /// <param name="cancelToken">Cancellation token</param>
98
    /// <typeparam name="T">The type to serialize to</typeparam>
99
    private async Task<T[]> ReadMessagesFromPipelineAndSerializeAsync<T>(CancellationToken cancelToken)
100
    {
101
        try
102
        {
103
            var message = await JsonSerializer.DeserializeAsync<JsonElement?>(_pipe.Reader.AsStream(),
87!
104
                              cancellationToken: cancelToken).ConfigureAwait(false)
87✔
105
                          ?? throw new ApplicationException(
87✔
106
                              "Deserialization of websocket returned empty result (null)");
87✔
107
            if (message.ValueKind == JsonValueKind.Array)
84!
108
            {
109
                // This is a coalesced message containing multiple messages so we need to
110
                // deserialize it as an array
111
                var obj = message.Deserialize<T[]>() ?? throw new ApplicationException(
×
112
                    "Deserialization of websocket returned empty result (null)");
×
113
                return obj;
×
114
            }
115
            else
116
            {
117
                // This is normal message and we deserialize it as object
118
                var obj = message.Deserialize<T>() ?? throw new ApplicationException(
84!
119
                    "Deserialization of websocket returned empty result (null)");
84✔
120
                return new T[] { obj };
84✔
121
            }
122
        }
123
        finally
124
        {
125
            // Always complete the reader
126
            await _pipe.Reader.CompleteAsync().ConfigureAwait(false);
87!
127
        }
128
    }
84✔
129

130
    /// <summary>
131
    ///     Read one or more chunks of a message and writes the result
132
    ///     to the pipeline
133
    /// </summary>
134
    /// <remarks>
135
    ///     A websocket message can be 1 to several chunks of data.
136
    ///     As data are read it is written on the pipeline for
137
    ///     the json serializer in function ReadMessageFromPipelineAndSerializeAsync
138
    ///     to continuously serialize. Using pipes is very efficient
139
    ///     way to reuse memory and get speedy results
140
    /// </remarks>
141
    private async Task ReadMessageFromWebSocketAndWriteToPipelineAsync(CancellationToken cancelToken)
142
    {
143
        try
144
        {
145
            while (!cancelToken.IsCancellationRequested && !_ws.CloseStatus.HasValue)
104✔
146
            {
147
                var memory = _pipe.Writer.GetMemory();
101✔
148
                var result = await _ws.ReceiveAsync(memory, cancelToken).ConfigureAwait(false);
101✔
149
                if (
101✔
150
                    _ws.State == WebSocketState.Open &&
101✔
151
                    result.MessageType != WebSocketMessageType.Close)
101✔
152
                {
153
                    _pipe.Writer.Advance(result.Count);
98✔
154

155
                    await _pipe.Writer.FlushAsync(cancelToken).ConfigureAwait(false);
98✔
156

157
                    if (result.EndOfMessage) break;
182✔
158
                }
159
                else if (_ws.State == WebSocketState.CloseReceived)
3✔
160
                {
161
                    // We got a close message from server or if it still open we got canceled
162
                    // in both cases it is important to send back the close message
163
                    await SendCorrectCloseFrameToRemoteWebSocket().ConfigureAwait(false);
1✔
164

165
                    // Cancel so the write thread is canceled before pipe is complete
166
                    _internalCancelSource.Cancel();
1✔
167
                }
168
            }
17✔
169
        }
170
        finally
171
        {
172
            // We have successfully read the whole message, 
173
            // make available to reader 
174
            // even if failure or we cannot reset the pipe
175
            await _pipe.Writer.CompleteAsync().ConfigureAwait(false);
87✔
176
        }
177
    }
87✔
178

179
    /// <summary>
180
    ///     Closes correctly the websocket depending on websocket state
181
    /// </summary>
182
    /// <remarks>
183
    ///     <para>
184
    ///         Closing a websocket has special handling. When the client
185
    ///         wants to close it calls CloseAsync and the websocket takes
186
    ///         care of the proper close handling.
187
    ///     </para>
188
    ///     <para>
189
    ///         If the remote websocket wants to close the connection dotnet
190
    ///         implementation requires you to use CloseOutputAsync instead.
191
    ///     </para>
192
    ///     <para>
193
    ///         We do not want to cancel operations until we get closed state
194
    ///         this is why own timer cancellation token is used and we wait
195
    ///         for correct state before returning and disposing any connections
196
    ///     </para>
197
    /// </remarks>
198
    private async Task SendCorrectCloseFrameToRemoteWebSocket()
199
    {
200
        using var timeout = new CancellationTokenSource(DefaultTimeOut);
43✔
201

202
        try
203
        {
204
            switch (_ws.State)
43✔
205
            {
206
                case WebSocketState.CloseReceived:
207
                {
208
                    // after this, the socket state which change to CloseSent
209
                    await _ws.CloseOutputAsync(WebSocketCloseStatus.NormalClosure, "Closing", timeout.Token)
1✔
210
                        .ConfigureAwait(false);
1✔
211
                    // now we wait for the server response, which will close the socket
212
                    while (_ws.State != WebSocketState.Closed && !timeout.Token.IsCancellationRequested)
1!
213
                        await Task.Delay(100).ConfigureAwait(false);
×
214
                    break;
×
215
                }
216
                case WebSocketState.Open:
217
                {
218
                    // Do full close 
219
                    await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "Closing", timeout.Token)
23✔
220
                        .ConfigureAwait(false);
23✔
221
                    if (_ws.State != WebSocketState.Closed)
22!
222
                        throw new ApplicationException("Expected the websocket to be closed!");
×
223
                    break;
224
                }
225
            }
226
        }
42✔
227
        catch (OperationCanceledException)
×
228
        {
229
            // normal upon task/token cancellation, disregard
230
        }
×
231
        finally
232
        {
233
            // After the websocket is properly closed
234
            // we can safely cancel all actions
235
            if (!_internalCancelSource.IsCancellationRequested)
43✔
236
                _internalCancelSource.Cancel();
24✔
237
        }
238
    }
42✔
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc