• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_312964

19 Feb 2025 07:31PM UTC coverage: 90.814% (-0.3%) from 91.119%
github_pull_request_312964

Pull #8071

Evergreen

web-flow
Bump serialize-javascript and mocha

Bumps [serialize-javascript](https://github.com/yahoo/serialize-javascript) to 6.0.2 and updates ancestor dependency [mocha](https://github.com/mochajs/mocha). These dependencies need to be updated together.


Updates `serialize-javascript` from 6.0.0 to 6.0.2
- [Release notes](https://github.com/yahoo/serialize-javascript/releases)
- [Commits](https://github.com/yahoo/serialize-javascript/compare/v6.0.0...v6.0.2)

Updates `mocha` from 10.2.0 to 10.8.2
- [Release notes](https://github.com/mochajs/mocha/releases)
- [Changelog](https://github.com/mochajs/mocha/blob/main/CHANGELOG.md)
- [Commits](https://github.com/mochajs/mocha/compare/v10.2.0...v10.8.2)

---
updated-dependencies:
- dependency-name: serialize-javascript
  dependency-type: indirect
- dependency-name: mocha
  dependency-type: direct:development
...

Signed-off-by: dependabot[bot] <support@github.com>
Pull Request #8071: Bump serialize-javascript and mocha

96552 of 179126 branches covered (53.9%)

212672 of 234185 relevant lines covered (90.81%)

3115802.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.76
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
23
        : m_logger_ptr{logger_ptr}
1,722✔
24
        , m_network_logger{*m_logger_ptr}
1,722✔
25
        , m_random{random}
1,722✔
26
        , m_service{service}
1,722✔
27
        , m_user_agent{user_agent}
1,722✔
28
        , m_observer{std::move(observer)}
1,722✔
29
        , m_endpoint{std::move(endpoint)}
1,722✔
30
        , m_websocket(*this)
1,722✔
31
    {
1,722✔
32
        initiate_resolve();
1,722✔
33
    }
1,722✔
34

35
    virtual ~DefaultWebSocketImpl() = default;
1,722✔
36

37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
38
    {
48,708✔
39
        m_websocket.async_write_binary(data.data(), data.size(),
48,708✔
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
48,710✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
48,710✔
42
                                       });
48,710✔
43
    }
48,708✔
44

45
    std::string_view get_appservices_request_id() const noexcept override
46
    {
1,678✔
47
        return m_app_services_coid;
1,678✔
48
    }
1,678✔
49

50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
51
    {
4✔
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
4✔
53
    }
4✔
54

55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
59

60
private:
61
    using milliseconds_type = std::int_fast64_t;
62

63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
64
    {
1,722✔
65
        return m_logger_ptr;
1,722✔
66
    }
1,722✔
67
    std::mt19937_64& websocket_get_random() noexcept override
68
    {
50,424✔
69
        return m_random;
50,424✔
70
    }
50,424✔
71

72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
73
    {
1,680✔
74
        const std::string empty;
1,680✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
1,680✔
76
            m_app_services_coid = it->second;
780✔
77
        }
780✔
78
        auto it = headers.find("Sec-WebSocket-Protocol");
1,680✔
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
1,680✔
80
    }
1,680✔
81
    void websocket_read_error_handler(std::error_code ec) override
82
    {
276✔
83
        m_network_logger.error("Reading failed: %1", ec.message()); // Throws
276✔
84
        constexpr bool was_clean = false;
276✔
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
276✔
86
    }
276✔
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
×
89
        m_network_logger.error("Writing failed: %1", ec.message()); // Throws
×
90
        constexpr bool was_clean = false;
×
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
×
92
    }
×
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view body) override
94
    {
4✔
95
        WebSocketError error = WebSocketError::websocket_ok;
4✔
96
        bool was_clean = true;
4✔
97

98
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
4✔
99
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
4✔
100
            error = WebSocketError::websocket_moved_permanently;
4✔
101
        }
4✔
102
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
×
103
            error = WebSocketError::websocket_retry_error;
×
104
            was_clean = false;
×
105
        }
×
106
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
×
107
            error = WebSocketError::websocket_unauthorized;
×
108
        }
×
109
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
110
            error = WebSocketError::websocket_forbidden;
×
111
        }
×
112
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
×
113
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
×
114
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
×
115
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
×
116
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
×
117
            error = WebSocketError::websocket_internal_server_error;
×
118
            was_clean = false;
×
119
        }
×
120
        else {
×
121
            error = WebSocketError::websocket_fatal_error;
×
122
            was_clean = false;
×
123
        }
×
124

125
        websocket_error_and_close_handler(was_clean, error, body.empty() ? ec.message() : body);
4✔
126
    }
4✔
127
    void websocket_protocol_error_handler(std::error_code ec) override
128
    {
×
129
        constexpr bool was_clean = false;
×
130
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
131
    }
×
132
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
133
    {
52✔
134
        constexpr bool was_clean = true;
52✔
135

136
        return websocket_error_and_close_handler(was_clean, code, message);
52✔
137
    }
52✔
138
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
139
    {
340✔
140
        if (!was_clean) {
340✔
141
            m_observer->websocket_error_handler();
284✔
142
        }
284✔
143
        return m_observer->websocket_closed_handler(was_clean, code, reason);
340✔
144
    }
340✔
145
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
146
    {
38,864✔
147
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
38,864✔
148
    }
38,864✔
149

150
    static Status get_status_from_util_error(std::error_code ec)
151
    {
48,710✔
152
        if (!ec) {
48,710✔
153
            return Status::OK();
48,032✔
154
        }
48,032✔
155
        switch (ec.value()) {
678✔
156
            case util::error::operation_aborted:
678✔
157
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
678✔
158
            case util::error::address_family_not_supported:
✔
159
                [[fallthrough]];
×
160
            case util::error::invalid_argument:
✔
161
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
×
162
            case util::error::no_memory:
✔
163
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
×
164
            case util::error::connection_aborted:
✔
165
                [[fallthrough]];
×
166
            case util::error::connection_reset:
✔
167
                [[fallthrough]];
×
168
            case util::error::broken_pipe:
✔
169
                [[fallthrough]];
×
170
            case util::error::resource_unavailable_try_again:
✔
171
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
×
172
            default:
✔
173
                return {ErrorCodes::Error::UnknownError, ec.message()};
×
174
        }
678✔
175
    }
678✔
176

177
    void initiate_resolve();
178
    void handle_resolve(std::error_code, network::Endpoint::List);
179
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
180
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
181
    void initiate_http_tunnel();
182
    void initiate_websocket_or_ssl_handshake();
183
    void initiate_ssl_handshake();
184
    void handle_ssl_handshake(std::error_code);
185
    void initiate_websocket_handshake();
186
    void handle_connection_established();
187

188
    void schedule_urgent_ping();
189
    void initiate_ping_delay(milliseconds_type now);
190
    void handle_ping_delay();
191
    void initiate_pong_timeout();
192
    void handle_pong_timeout();
193

194
    const std::shared_ptr<util::Logger> m_logger_ptr;
195
    util::Logger& m_network_logger;
196
    std::mt19937_64& m_random;
197
    network::Service& m_service;
198
    const std::string m_user_agent;
199
    std::string m_app_services_coid;
200

201
    std::unique_ptr<WebSocketObserver> m_observer;
202

203
    const WebSocketEndpoint m_endpoint;
204
    util::Optional<network::Resolver> m_resolver;
205
    util::Optional<network::Socket> m_socket;
206
    util::Optional<network::ssl::Context> m_ssl_context;
207
    util::Optional<network::ssl::Stream> m_ssl_stream;
208
    network::ReadAheadBuffer m_read_ahead_buffer;
209
    websocket::Socket m_websocket;
210
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
211
};
212

213
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
214
{
89,148✔
215
    REALM_ASSERT(m_socket);
89,148✔
216
    if (m_ssl_stream) {
89,148✔
217
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
88✔
218
    }
88✔
219
    else {
89,060✔
220
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
89,060✔
221
    }
89,060✔
222
}
89,148✔
223

224

225
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
226
{
15,688✔
227
    REALM_ASSERT(m_socket);
15,688✔
228
    if (m_ssl_stream) {
15,688✔
229
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
56✔
230
    }
56✔
231
    else {
15,632✔
232
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
15,632✔
233
    }
15,632✔
234
}
15,688✔
235

236

237
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
238
{
50,420✔
239
    REALM_ASSERT(m_socket);
50,420✔
240
    if (m_ssl_stream) {
50,420✔
241
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
40✔
242
    }
40✔
243
    else {
50,380✔
244
        m_socket->async_write(data, size, std::move(handler)); // Throws
50,380✔
245
    }
50,380✔
246
}
50,420✔
247

248

249
void DefaultWebSocketImpl::initiate_resolve()
250
{
1,722✔
251
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
1,722✔
252
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
1,722✔
253

254
    if (m_endpoint.proxy) {
1,722✔
255
        // logger.detail("Using %1 proxy", proxy->type); // Throws
256
    }
×
257

258
    m_network_logger.detail("Resolving '%1:%2'", address, port); // Throws
1,722✔
259

260
    network::Resolver::Query query(address, util::to_string(port)); // Throws
1,722✔
261
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
1,722✔
262
        // If the operation is aborted, the connection object may have been
263
        // destroyed.
264
        if (ec != util::error::operation_aborted)
1,722✔
265
            handle_resolve(ec, std::move(endpoints)); // Throws
1,722✔
266
    };
1,722✔
267
    m_resolver.emplace(m_service);                                   // Throws
1,722✔
268
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
1,722✔
269
}
1,722✔
270

271

272
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
273
{
1,722✔
274
    if (ec) {
1,722✔
275
        m_network_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port,
2✔
276
                               ec.message()); // Throws
2✔
277
        constexpr bool was_clean = false;
2✔
278
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
2✔
279
                                          ec.message()); // Throws
2✔
280
        return;
2✔
281
    }
2✔
282

283
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
1,720✔
284
}
1,720✔
285

286

287
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
288
{
1,720✔
289
    REALM_ASSERT(i < endpoints.size());
1,720✔
290

291
    network::Endpoint ep = *(endpoints.begin() + i);
1,720✔
292
    std::size_t n = endpoints.size();
1,720✔
293
    m_socket.emplace(m_service); // Throws
1,720✔
294
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
1,720✔
295
        // If the operation is aborted, the connection object may have been
296
        // destroyed.
297
        if (ec != util::error::operation_aborted)
1,720✔
298
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
1,720✔
299
    });
1,720✔
300
    m_network_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
1,720✔
301
}
1,720✔
302

303
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
304
{
1,720✔
305
    REALM_ASSERT(i < endpoints.size());
1,720✔
306
    const network::Endpoint& ep = *(endpoints.begin() + i);
1,720✔
307
    if (ec) {
1,720✔
308
        m_network_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
×
309
                               ec.message()); // Throws
×
310
        std::size_t i_2 = i + 1;
×
311
        if (i_2 < endpoints.size()) {
×
312
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
×
313
            return;
×
314
        }
×
315
        // All endpoints failed
316
        m_network_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address,
×
317
                               m_endpoint.port);
×
318
        constexpr bool was_clean = false;
×
319
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
320
                                          ec.message()); // Throws
×
321
        return;
×
322
    }
×
323

324
    REALM_ASSERT(m_socket);
1,720✔
325
    network::Endpoint ep_2 = m_socket->local_endpoint();
1,720✔
326
    m_network_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
1,720✔
327
                          ep_2.port()); // Throws
1,720✔
328

329
    // TODO: Handle HTTPS proxies
330
    if (m_endpoint.proxy) {
1,720✔
331
        initiate_http_tunnel(); // Throws
×
332
        return;
×
333
    }
×
334

335
    initiate_websocket_or_ssl_handshake(); // Throws
1,720✔
336
}
1,720✔
337

338
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
339
{
1,720✔
340
    if (m_endpoint.is_ssl) {
1,720✔
341
        initiate_ssl_handshake(); // Throws
14✔
342
    }
14✔
343
    else {
1,706✔
344
        initiate_websocket_handshake(); // Throws
1,706✔
345
    }
1,706✔
346
}
1,720✔
347

348
void DefaultWebSocketImpl::initiate_http_tunnel()
349
{
×
350
    HTTPRequest req;
×
351
    req.method = HTTPMethod::Connect;
×
352
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
×
353
    // TODO handle proxy authorization
354

355
    m_proxy_client.emplace(*this, m_logger_ptr);
×
356
    auto handler = [this](HTTPResponse response, std::error_code ec) {
×
357
        if (ec && ec != util::error::operation_aborted) {
×
358
            m_network_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
×
359
            constexpr bool was_clean = false;
×
360
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
361
                                              ec.message()); // Throws
×
362
            return;
×
363
        }
×
364

365
        if (response.status != HTTPStatus::Ok) {
×
366
            m_network_logger.error("Proxy server returned response '%1 %2'", response.status,
×
367
                                   response.reason); // Throws
×
368
            constexpr bool was_clean = false;
×
369
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
370
                                              response.reason); // Throws
×
371
            return;
×
372
        }
×
373

374
        initiate_websocket_or_ssl_handshake(); // Throws
×
375
    };
×
376

377
    m_proxy_client->async_request(req, std::move(handler)); // Throws
×
378
}
×
379

380
void DefaultWebSocketImpl::initiate_ssl_handshake()
381
{
14✔
382
    using namespace network::ssl;
14✔
383

384
    if (!m_ssl_context) {
14✔
385
        m_ssl_context.emplace(); // Throws
14✔
386
        if (m_endpoint.verify_servers_ssl_certificate) {
14✔
387
            if (m_endpoint.ssl_trust_certificate_path) {
12✔
388
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
4✔
389
            }
4✔
390
            else if (!m_endpoint.ssl_verify_callback) {
8✔
391
                m_ssl_context->use_default_verify(); // Throws
2✔
392
#if REALM_INCLUDE_CERTS
2✔
393
                // On platforms like Windows or Android where OpenSSL is not normally found
394
                // `use_default_verify()` won't actually be able to load any default certificates.
395
                // That's why we bundle a set of trusted certificates ourselves.
396
                m_ssl_context->use_included_certificate_roots(); // Throws
2✔
397
#endif
2✔
398
            }
2✔
399
        }
12✔
400
    }
14✔
401

402
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
14✔
403
    m_ssl_stream->set_logger(m_logger_ptr.get());
14✔
404
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
14✔
405
    if (m_endpoint.verify_servers_ssl_certificate) {
14✔
406
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
12✔
407
        m_ssl_stream->set_server_port(m_endpoint.port);
12✔
408
        if (!m_endpoint.ssl_trust_certificate_path) {
12✔
409
            if (m_endpoint.ssl_verify_callback) {
8✔
410
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
6✔
411
            }
6✔
412
        }
8✔
413
    }
12✔
414

415
    auto handler = [this](std::error_code ec) {
14✔
416
        // If the operation is aborted, the connection object may have been
417
        // destroyed.
418
        if (ec != util::error::operation_aborted)
14✔
419
            handle_ssl_handshake(ec); // Throws
14✔
420
    };
14✔
421
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
14✔
422

423
    // FIXME: We also need to perform the SSL shutdown operation somewhere
424
}
14✔
425

426

427
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
428
{
14✔
429
    if (ec) {
14✔
430
        REALM_ASSERT(ec != util::error::operation_aborted);
6✔
431
        constexpr bool was_clean = false;
6✔
432
        WebSocketError parsed_error_code;
6✔
433
        if (ec == network::ssl::Errors::tls_handshake_failed) {
6✔
434
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
6✔
435
        }
6✔
436
        else {
×
437
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
438
        }
×
439

440
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
6✔
441
        return;
6✔
442
    }
6✔
443

444
    initiate_websocket_handshake(); // Throws
8✔
445
}
8✔
446

447

448
void DefaultWebSocketImpl::initiate_websocket_handshake()
449
{
1,714✔
450
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
1,714✔
451
    headers["User-Agent"] = m_user_agent;
1,714✔
452

453
    // Compute the value of the "Host" header.
454
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
1,714✔
455
    auto host = m_endpoint.port == default_port ? m_endpoint.address
1,714✔
456
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
1,714✔
457

458
    // Convert the list of protocols to a string
459
    std::ostringstream protocol_list;
1,714✔
460
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
1,714✔
461
    protocol_list.imbue(std::locale::classic());
1,714✔
462
    if (m_endpoint.protocols.size() > 1)
1,714✔
463
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
1,714✔
464
                  std::ostream_iterator<std::string>(protocol_list, ", "));
1,714✔
465
    protocol_list << m_endpoint.protocols.back();
1,714✔
466

467
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
1,714✔
468
                                          std::move(headers)); // Throws
1,714✔
469
}
1,714✔
470
} // namespace
471

472
///
473
/// DefaultSocketProvider - default socket provider implementation
474
///
475

476
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
477
                                             const std::string& user_agent,
478
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
479
                                             AutoStart auto_start)
480
    : m_logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger)}
4,910✔
481
    , m_observer_ptr{observer_ptr}
4,910✔
482
    , m_user_agent{user_agent}
4,910✔
483
    , m_state{State::Stopped}
4,910✔
484
{
4,910✔
485
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
4,910✔
486
    util::seed_prng_nondeterministically(m_random); // Throws
4,910✔
487
    if (auto_start) {
4,910✔
488
        start();
4,528✔
489
    }
4,528✔
490
}
4,910✔
491

492
DefaultSocketProvider::~DefaultSocketProvider()
493
{
4,910✔
494
    m_logger_ptr->trace("Default event loop teardown");
4,910✔
495
    // Wait for the thread to stop
496
    stop(true);
4,910✔
497
    // Shutting down - no need to lock mutex before check
498
    REALM_ASSERT(m_state == State::Stopped);
4,910✔
499
}
4,910✔
500

501
void DefaultSocketProvider::start()
502
{
4,912✔
503
    util::CheckedUniqueLock lock(m_mutex);
4,912✔
504
    // Has the thread already been started or is running
505
    if (m_state == State::Starting || m_state == State::Running)
4,912✔
506
        return; // early return
×
507

508
    // If the thread has been previously run, make sure it has been joined first
509
    if (m_thread.joinable()) {
4,912✔
510
        state_wait_for(lock, State::Stopped);
2✔
511
        m_thread.join();
2✔
512
    }
2✔
513

514
    m_logger_ptr->trace("Default event loop: start()");
4,912✔
515
    REALM_ASSERT(m_state == State::Stopped);
4,912✔
516

517
    do_state_update(State::Starting);
4,912✔
518
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
4,912✔
519
    // Wait for the thread to start before continuing
520
    state_wait_for(lock, State::Running);
4,912✔
521
}
4,912✔
522

523
void DefaultSocketProvider::event_loop()
524
{
4,912✔
525
    m_logger_ptr->trace("Default event loop: thread running");
4,912✔
526
    // Calls will_destroy_thread() when destroyed
527
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
4,912✔
528
        m_logger_ptr->trace("Default event loop: thread exiting");
4,912✔
529
        if (m_observer_ptr)
4,912✔
530
            m_observer_ptr->will_destroy_thread();
4✔
531

532
        {
4,912✔
533
            util::CheckedLockGuard lock(m_mutex);
4,912✔
534
            // Did we get here due to an unhandled exception?
535
            if (m_state != State::Stopping) {
4,912✔
536
                m_logger_ptr->error("Default event loop: thread exited unexpectedly");
×
537
            }
×
538
            m_state = State::Stopped;
4,912✔
539
        }
4,912✔
540
        m_state_cv.notify_all();
4,912✔
541
    });
4,912✔
542

543
    if (m_observer_ptr)
4,912✔
544
        m_observer_ptr->did_create_thread();
4✔
545

546
    {
4,912✔
547
        util::CheckedLockGuard lock(m_mutex);
4,912✔
548
        REALM_ASSERT(m_state == State::Starting);
4,912✔
549
    }
4,912✔
550

551
    // We update the state to Running from inside the event loop so that start() is blocked until
552
    // the event loop is actually ready to receive work.
553
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
4,912✔
554
        if (status == ErrorCodes::OperationAborted) {
4,912✔
555
            return;
×
556
        }
×
557

558
        REALM_ASSERT(status.is_ok());
4,912✔
559

560
        util::CheckedLockGuard lock(m_mutex);
4,912✔
561
        // This is a callback from a previous generation
562
        if (m_event_loop_generation != my_generation) {
4,912✔
563
            return;
×
564
        }
×
565
        if (m_state == State::Stopping) {
4,912✔
566
            return;
×
567
        }
×
568
        m_logger_ptr->trace("Default event loop: service run");
4,912✔
569
        REALM_ASSERT(m_state == State::Starting);
4,912✔
570
        do_state_update(State::Running);
4,912✔
571
    });
4,912✔
572

573
    // If there is no event loop observer or handle_error function registered, then just
574
    // allow the exception to bubble to the top so we can get a true stack trace
575
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
4,912✔
576
        m_service.run_until_stopped(); // Throws
4,908✔
577
    }
4,908✔
578
    else {
4✔
579
        try {
4✔
580
            m_service.run_until_stopped(); // Throws
4✔
581
        }
4✔
582
        catch (const std::exception& e) {
4✔
583
            {
2✔
584
                util::CheckedLockGuard lock(m_mutex);
2✔
585
                // Service is no longer running, event loop thread is stopping
586
                m_state = State::Stopping;
2✔
587
            }
2✔
588
            m_state_cv.notify_all();
2✔
589
            m_logger_ptr->error("Default event loop exception: ", e.what());
2✔
590
            // If the error was not handled by the thread loop observer, then rethrow
591
            if (!m_observer_ptr->handle_error(e))
2✔
592
                throw;
×
593
        }
2✔
594
    }
4✔
595
}
4,912✔
596

597
void DefaultSocketProvider::stop(bool wait_for_stop)
598
{
4,912✔
599
    util::CheckedUniqueLock lock(m_mutex);
4,912✔
600

601
    // Do nothing if the thread is not started or running or stop has already been called
602
    if (m_state == State::Starting || m_state == State::Running) {
4,912✔
603
        m_logger_ptr->trace("Default event loop: stop()");
4,910✔
604
        do_state_update(State::Stopping);
4,910✔
605
        // Updating state to Stopping will free a start() if it is waiting for the thread to
606
        // start and may cause the thread to exit early before calling service.run()
607
        m_service.stop(); // Unblocks m_service.run()
4,910✔
608
    }
4,910✔
609

610
    // Wait until the thread is stopped (exited) if requested
611
    if (wait_for_stop) {
4,912✔
612
        m_logger_ptr->trace("Default event loop: wait for stop");
4,912✔
613
        state_wait_for(lock, State::Stopped);
4,912✔
614
        if (m_thread.joinable()) {
4,912✔
615
            m_thread.join();
4,910✔
616
        }
4,910✔
617
    }
4,912✔
618
}
4,912✔
619

620
//                    +---------------------------------------+
621
//                   \/                                       |
622
// State Machine: Stopped -> Starting -> Running -> Stopping -+
623
//                              |           |          ^
624
//                              +----------------------+
625

626
void DefaultSocketProvider::do_state_update(State new_state)
627
{
14,734✔
628
    m_state = new_state;
14,734✔
629
    m_state_cv.notify_all(); // Let any waiters check the state
14,734✔
630
}
14,734✔
631

632
void DefaultSocketProvider::state_wait_for(util::CheckedUniqueLock& lock, State expected_state)
633
{
9,826✔
634
    m_state_cv.wait(lock.native_handle(), [this, expected_state]() REQUIRES(m_mutex) {
19,648✔
635
        return m_state >= expected_state;
19,648✔
636
    });
19,648✔
637
}
9,826✔
638

639
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
640
                                                                   WebSocketEndpoint&& endpoint)
641
{
1,722✔
642
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
1,722✔
643
                                                  std::move(observer), std::move(endpoint));
1,722✔
644
}
1,722✔
645

646
} // namespace realm::sync::websocket
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc