• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1731

04 Oct 2023 01:46PM UTC coverage: 91.599% (-0.02%) from 91.615%
1731

push

Evergreen

web-flow
Initial ObjectStore Class class (#6521)

94282 of 173446 branches covered (0.0%)

30 of 70 new or added lines in 10 files covered. (42.86%)

94 existing lines in 16 files now uncovered.

230401 of 251533 relevant lines covered (91.6%)

6665525.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.29
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/random.hpp>
8
#include <realm/util/scope_exit.hpp>
9

10
namespace realm::sync::websocket {
11

12
namespace {
13

14
///
15
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
16
///
17
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
18
public:
19
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
20
                         std::mt19937_64& random, const std::string user_agent,
21
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
22
        : m_logger_ptr{logger_ptr}
23
        , m_logger{*m_logger_ptr}
24
        , m_random{random}
25
        , m_service{service}
26
        , m_user_agent{user_agent}
27
        , m_observer{std::move(observer)}
28
        , m_endpoint{std::move(endpoint)}
29
        , m_websocket(*this)
30
    {
3,274✔
31
        initiate_resolve();
3,274✔
32
    }
3,274✔
33

34
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
35
    {
99,582✔
36
        m_websocket.async_write_binary(data.data(), data.size(), [handler = std::move(handler)]() {
99,004✔
37
            handler(Status::OK());
98,086✔
38
        });
98,086✔
39
    }
99,582✔
40

41
    std::string_view get_appservices_request_id() const noexcept override
42
    {
3,156✔
43
        return m_app_services_coid;
3,156✔
44
    }
3,156✔
45

46
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
47
    {
12✔
48
        m_websocket.force_handshake_response_for_testing(status_code, body);
12✔
49
    }
12✔
50

51
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
52
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
53
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
54
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
55

56
private:
57
    using milliseconds_type = std::int_fast64_t;
58

59
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
60
    {
3,272✔
61
        return m_logger_ptr;
3,272✔
62
    }
3,272✔
63
    std::mt19937_64& websocket_get_random() noexcept override
64
    {
102,838✔
65
        return m_random;
102,838✔
66
    }
102,838✔
67

68
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
69
    {
3,160✔
70
        const std::string empty;
3,160✔
71
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
3,160✔
72
            m_app_services_coid = it->second;
1,274✔
73
        }
1,274✔
74
        auto it = headers.find("Sec-WebSocket-Protocol");
3,160✔
75
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
3,160✔
76
    }
3,160✔
77
    void websocket_read_error_handler(std::error_code ec) override
78
    {
510✔
79
        m_logger.error("Reading failed: %1", ec.message()); // Throws
510✔
80
        constexpr bool was_clean = false;
510✔
81
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
510✔
82
    }
510✔
83
    void websocket_write_error_handler(std::error_code ec) override
84
    {
×
85
        m_logger.error("Writing failed: %1", ec.message()); // Throws
×
86
        constexpr bool was_clean = false;
×
87
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
×
88
    }
×
89
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
90
                                           const std::string_view* body) override
91
    {
48✔
92
        WebSocketError error = WebSocketError::websocket_ok;
48✔
93
        bool was_clean = true;
48✔
94

24✔
95
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
48✔
96
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
44✔
97
            error = WebSocketError::websocket_moved_permanently;
12✔
98
        }
12✔
99
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
36✔
100
            error = WebSocketError::websocket_retry_error;
×
101
            was_clean = false;
×
102
        }
×
103
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
36✔
104
            error = WebSocketError::websocket_unauthorized;
36✔
105
        }
36✔
106
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
107
            error = WebSocketError::websocket_forbidden;
×
108
        }
×
109
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
×
110
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
×
111
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
×
112
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
×
113
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
×
114
            error = WebSocketError::websocket_internal_server_error;
×
115
            was_clean = false;
×
116
        }
×
117
        else {
×
118
            error = WebSocketError::websocket_fatal_error;
×
119
            was_clean = false;
×
120
            if (body) {
×
121
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
×
122
                auto i = body->find(identifier);
×
123
                if (i != std::string_view::npos) {
×
124
                    std::string_view rest = body->substr(i + identifier.size());
×
125
                    // FIXME: Use std::string_view::begins_with() in C++20.
126
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
×
127
                        return (string.size() >= prefix.size() &&
×
128
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
129
                    };
×
130
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
×
131
                        error = WebSocketError::websocket_client_too_old;
×
132
                    }
×
133
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
×
134
                        error = WebSocketError::websocket_client_too_new;
×
135
                    }
×
136
                    else {
×
137
                        // Other more complicated forms of mismatch
138
                        error = WebSocketError::websocket_protocol_mismatch;
×
139
                    }
×
140
                    was_clean = true;
×
141
                }
×
142
            }
×
143
        }
×
144

24✔
145
        websocket_error_and_close_handler(was_clean, error, ec.message());
48✔
146
    }
48✔
147
    void websocket_protocol_error_handler(std::error_code ec) override
148
    {
×
149
        constexpr bool was_clean = false;
×
150
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
151
    }
×
152
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
153
    {
78✔
154
        constexpr bool was_clean = true;
78✔
155

40✔
156
        return websocket_error_and_close_handler(was_clean, code, message);
78✔
157
    }
78✔
158
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
159
    {
650✔
160
        if (!was_clean) {
650✔
161
            m_observer->websocket_error_handler();
524✔
162
        }
524✔
163
        return m_observer->websocket_closed_handler(was_clean, code, reason);
650✔
164
    }
650✔
165
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
166
    {
79,666✔
167
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
79,666✔
168
    }
79,666✔
169

170
    void initiate_resolve();
171
    void handle_resolve(std::error_code, network::Endpoint::List);
172
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
173
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
174
    void initiate_http_tunnel();
175
    void initiate_websocket_or_ssl_handshake();
176
    void initiate_ssl_handshake();
177
    void handle_ssl_handshake(std::error_code);
178
    void initiate_websocket_handshake();
179
    void handle_connection_established();
180

181
    void schedule_urgent_ping();
182
    void initiate_ping_delay(milliseconds_type now);
183
    void handle_ping_delay();
184
    void initiate_pong_timeout();
185
    void handle_pong_timeout();
186

187
    const std::shared_ptr<util::Logger> m_logger_ptr;
188
    util::Logger& m_logger;
189
    std::mt19937_64& m_random;
190
    network::Service& m_service;
191
    const std::string m_user_agent;
192
    std::string m_app_services_coid;
193

194
    std::unique_ptr<WebSocketObserver> m_observer;
195

196
    const WebSocketEndpoint m_endpoint;
197
    util::Optional<network::Resolver> m_resolver;
198
    util::Optional<network::Socket> m_socket;
199
    util::Optional<network::ssl::Context> m_ssl_context;
200
    util::Optional<network::ssl::Stream> m_ssl_stream;
201
    network::ReadAheadBuffer m_read_ahead_buffer;
202
    websocket::Socket m_websocket;
203
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
204
};
205

206

207
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
208
{
178,032✔
209
    REALM_ASSERT(m_socket);
178,032✔
210
    if (m_ssl_stream) {
178,032✔
211
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
154✔
212
    }
154✔
213
    else {
177,878✔
214
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
177,878✔
215
    }
177,878✔
216
}
178,032✔
217

218

219
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
220
{
27,798✔
221
    REALM_ASSERT(m_socket);
27,798✔
222
    if (m_ssl_stream) {
27,798✔
223
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
98✔
224
    }
98✔
225
    else {
27,700✔
226
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
27,700✔
227
    }
27,700✔
228
}
27,798✔
229

230

231
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
232
{
102,838✔
233
    REALM_ASSERT(m_socket);
102,838✔
234
    if (m_ssl_stream) {
102,838✔
235
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
84✔
236
    }
84✔
237
    else {
102,754✔
238
        m_socket->async_write(data, size, std::move(handler)); // Throws
102,754✔
239
    }
102,754✔
240
}
102,838✔
241

242

243
void DefaultWebSocketImpl::initiate_resolve()
244
{
3,272✔
245
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
3,272✔
246
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
3,272✔
247

1,616✔
248
    if (m_endpoint.proxy) {
3,272✔
249
        // logger.detail("Using %1 proxy", proxy->type); // Throws
250
    }
×
251

1,616✔
252
    m_logger.detail("Resolving '%1:%2'", address, port); // Throws
3,272✔
253

1,616✔
254
    network::Resolver::Query query(address, util::to_string(port)); // Throws
3,272✔
255
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
3,274✔
256
        // If the operation is aborted, the connection object may have been
1,614✔
257
        // destroyed.
1,614✔
258
        if (ec != util::error::operation_aborted)
3,272✔
259
            handle_resolve(ec, std::move(endpoints)); // Throws
3,270✔
260
    };
3,272✔
261
    m_resolver.emplace(m_service);                                   // Throws
3,272✔
262
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
3,272✔
263
}
3,272✔
264

265

266
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
267
{
3,270✔
268
    if (ec) {
3,270✔
269
        m_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port, ec.message()); // Throws
4✔
270
        constexpr bool was_clean = false;
4✔
271
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
4✔
272
                                          ec.message()); // Throws
4✔
273
        return;
4✔
274
    }
4✔
275

1,612✔
276
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
3,266✔
277
}
3,266✔
278

279

280
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
281
{
3,266✔
282
    REALM_ASSERT(i < endpoints.size());
3,266✔
283

1,612✔
284
    network::Endpoint ep = *(endpoints.begin() + i);
3,266✔
285
    std::size_t n = endpoints.size();
3,266✔
286
    m_socket.emplace(m_service); // Throws
3,266✔
287
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
3,266✔
288
        // If the operation is aborted, the connection object may have been
1,612✔
289
        // destroyed.
1,612✔
290
        if (ec != util::error::operation_aborted)
3,266✔
291
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
3,262✔
292
    });
3,266✔
293
    m_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
3,266✔
294
}
3,266✔
295

296

297
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
298
{
3,262✔
299
    REALM_ASSERT(i < endpoints.size());
3,262✔
300
    const network::Endpoint& ep = *(endpoints.begin() + i);
3,262✔
301
    if (ec) {
3,262✔
UNCOV
302
        m_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
×
UNCOV
303
                       ec.message()); // Throws
×
UNCOV
304
        std::size_t i_2 = i + 1;
×
UNCOV
305
        if (i_2 < endpoints.size()) {
×
UNCOV
306
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
×
UNCOV
307
            return;
×
UNCOV
308
        }
×
309
        // All endpoints failed
UNCOV
310
        m_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address, m_endpoint.port);
×
UNCOV
311
        constexpr bool was_clean = false;
×
UNCOV
312
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
UNCOV
313
                                          ec.message()); // Throws
×
UNCOV
314
        return;
×
UNCOV
315
    }
×
316

1,608✔
317
    REALM_ASSERT(m_socket);
3,262✔
318
    network::Endpoint ep_2 = m_socket->local_endpoint();
3,262✔
319
    m_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
3,262✔
320
                  ep_2.port()); // Throws
3,262✔
321

1,608✔
322
    // TODO: Handle HTTPS proxies
1,608✔
323
    if (m_endpoint.proxy) {
3,262✔
324
        initiate_http_tunnel(); // Throws
×
325
        return;
×
326
    }
×
327

1,608✔
328
    initiate_websocket_or_ssl_handshake(); // Throws
3,262✔
329
}
3,262✔
330

331
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
332
{
3,262✔
333
    if (m_endpoint.is_ssl) {
3,262✔
334
        initiate_ssl_handshake(); // Throws
24✔
335
    }
24✔
336
    else {
3,238✔
337
        initiate_websocket_handshake(); // Throws
3,238✔
338
    }
3,238✔
339
}
3,262✔
340

341
void DefaultWebSocketImpl::initiate_http_tunnel()
342
{
×
343
    HTTPRequest req;
×
344
    req.method = HTTPMethod::Connect;
×
345
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
×
346
    // TODO handle proxy authorization
347

348
    m_proxy_client.emplace(*this, m_logger_ptr);
×
349
    auto handler = [this](HTTPResponse response, std::error_code ec) {
×
350
        if (ec && ec != util::error::operation_aborted) {
×
351
            m_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
×
352
            constexpr bool was_clean = false;
×
353
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
354
                                              ec.message()); // Throws
×
355
            return;
×
356
        }
×
357

358
        if (response.status != HTTPStatus::Ok) {
×
359
            m_logger.error("Proxy server returned response '%1 %2'", response.status, response.reason); // Throws
×
360
            constexpr bool was_clean = false;
×
361
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
362
                                              response.reason); // Throws
×
363
            return;
×
364
        }
×
365

366
        initiate_websocket_or_ssl_handshake(); // Throws
×
367
    };
×
368

369
    m_proxy_client->async_request(req, std::move(handler)); // Throws
×
370
}
×
371

372
void DefaultWebSocketImpl::initiate_ssl_handshake()
373
{
24✔
374
    using namespace network::ssl;
24✔
375

14✔
376
    if (!m_ssl_context) {
24✔
377
        m_ssl_context.emplace(); // Throws
24✔
378
        if (m_endpoint.verify_servers_ssl_certificate) {
24✔
379
            if (m_endpoint.ssl_trust_certificate_path) {
20✔
380
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
10✔
381
            }
10✔
382
            else if (!m_endpoint.ssl_verify_callback) {
10✔
383
                m_ssl_context->use_default_verify(); // Throws
4✔
384
            }
4✔
385
        }
20✔
386
    }
24✔
387

14✔
388
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
24✔
389
    m_ssl_stream->set_logger(m_logger_ptr.get());
24✔
390
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
24✔
391
    if (m_endpoint.verify_servers_ssl_certificate) {
24✔
392
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
20✔
393
        m_ssl_stream->set_server_port(m_endpoint.port);
20✔
394
        if (!m_endpoint.ssl_trust_certificate_path) {
20✔
395
            if (m_endpoint.ssl_verify_callback) {
10✔
396
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
6✔
397
            }
6✔
398
            else {
4✔
399
                // The included certificates are used if neither the trust
2✔
400
                // certificate nor the callback function is set.
2✔
401
#if REALM_INCLUDE_CERTS
2✔
402
                m_ssl_stream->use_included_certificates(); // Throws
2✔
403
#endif
2✔
404
            }
4✔
405
        }
10✔
406
    }
20✔
407

14✔
408
    auto handler = [this](std::error_code ec) {
24✔
409
        // If the operation is aborted, the connection object may have been
14✔
410
        // destroyed.
14✔
411
        if (ec != util::error::operation_aborted)
24✔
412
            handle_ssl_handshake(ec); // Throws
24✔
413
    };
24✔
414
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
415

14✔
416
    // FIXME: We also need to perform the SSL shutdown operation somewhere
14✔
417
}
24✔
418

419

420
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
421
{
24✔
422
    if (ec) {
24✔
423
        REALM_ASSERT(ec != util::error::operation_aborted);
10✔
424
        constexpr bool was_clean = false;
10✔
425
        WebSocketError parsed_error_code;
10✔
426
        if (ec == network::ssl::Errors::tls_handshake_failed) {
10✔
427
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
10✔
428
        }
10✔
429
        else {
×
430
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
431
        }
×
432

6✔
433
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
10✔
434
        return;
10✔
435
    }
10✔
436

8✔
437
    initiate_websocket_handshake(); // Throws
14✔
438
}
14✔
439

440

441
void DefaultWebSocketImpl::initiate_websocket_handshake()
442
{
3,252✔
443
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
3,252✔
444
    headers["User-Agent"] = m_user_agent;
3,252✔
445

1,602✔
446
    // Compute the value of the "Host" header.
1,602✔
447
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
3,246✔
448
    auto host = m_endpoint.port == default_port ? m_endpoint.address
1,602✔
449
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
3,252✔
450

1,602✔
451
    // Convert the list of protocols to a string
1,602✔
452
    std::ostringstream protocol_list;
3,252✔
453
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
3,252✔
454
    protocol_list.imbue(std::locale::classic());
3,252✔
455
    if (m_endpoint.protocols.size() > 1)
3,252✔
456
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
3,252✔
457
                  std::ostream_iterator<std::string>(protocol_list, ", "));
3,252✔
458
    protocol_list << m_endpoint.protocols.back();
3,252✔
459

1,602✔
460
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
3,252✔
461
                                          std::move(headers)); // Throws
3,252✔
462
}
3,252✔
463
} // namespace
464

465
///
466
/// DefaultSocketProvider - default socket provider implementation
467
///
468

469
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
470
                                             const std::string user_agent,
471
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
472
                                             AutoStart auto_start)
473
    : m_logger_ptr{logger}
474
    , m_observer_ptr{observer_ptr}
475
    , m_service{}
476
    , m_random{}
477
    , m_user_agent{user_agent}
478
    , m_mutex{}
479
    , m_state{State::Stopped}
480
    , m_state_cv{}
481
    , m_thread{}
482
{
8,954✔
483
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
8,954✔
484
    util::seed_prng_nondeterministically(m_random); // Throws
8,954✔
485
    if (auto_start) {
8,954✔
486
        start();
8,206✔
487
    }
8,206✔
488
}
8,954✔
489

490
DefaultSocketProvider::~DefaultSocketProvider()
491
{
8,954✔
492
    m_logger_ptr->trace("Default event loop teardown");
8,954✔
493
    // Wait for the thread to stop
4,408✔
494
    stop(true);
8,954✔
495
    // Shutting down - no need to lock mutex before check
4,408✔
496
    REALM_ASSERT(m_state == State::Stopped);
8,954✔
497
}
8,954✔
498

499
void DefaultSocketProvider::start()
500
{
8,954✔
501
    std::unique_lock<std::mutex> lock(m_mutex);
8,954✔
502
    // Has the thread already been started or is running
4,408✔
503
    if (m_state == State::Starting || m_state == State::Running)
8,954✔
504
        return; // early return
×
505

4,408✔
506
    // If the thread has been previously run, make sure it has been joined first
4,408✔
507
    if (m_state == State::Stopping) {
8,954✔
508
        state_wait_for(lock, State::Stopped);
×
509
    }
×
510

4,408✔
511
    m_logger_ptr->trace("Default event loop: start()");
8,954✔
512
    REALM_ASSERT(m_state == State::Stopped);
8,954✔
513

4,408✔
514
    do_state_update(lock, State::Starting);
8,954✔
515
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
8,954✔
516
    // Wait for the thread to start before continuing
4,408✔
517
    state_wait_for(lock, State::Running);
8,954✔
518
}
8,954✔
519

520
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
521
{
4✔
522
    {
4✔
523
        std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
524
        REALM_ASSERT(provider->m_state == State::Stopped);
4✔
525
        provider->do_state_update(lk, State::Starting);
4✔
526
    }
4✔
527

2✔
528
    provider->event_loop();
4✔
529
}
4✔
530

531
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
532
{
4✔
533
    std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
534
    REALM_ASSERT(provider->m_state == State::Stopped);
4✔
535
    provider->m_service.reset();
4✔
536
}
4✔
537

538
void DefaultSocketProvider::event_loop()
539
{
8,958✔
540
    m_logger_ptr->trace("Default event loop: thread running");
8,958✔
541
    // Calls will_destroy_thread() when destroyed
4,410✔
542
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
8,958✔
543
        m_logger_ptr->trace("Default event loop: thread exiting");
8,958✔
544
        if (m_observer_ptr)
8,958✔
545
            m_observer_ptr->will_destroy_thread();
×
546

4,410✔
547
        std::unique_lock<std::mutex> lock(m_mutex);
8,958✔
548
        // Did we get here due to an unhandled exception?
4,410✔
549
        if (m_state != State::Stopping) {
8,958✔
550
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
4✔
551
        }
4✔
552
        m_state = State::Stopped;
8,958✔
553
        lock.unlock();
8,958✔
554
        m_state_cv.notify_all();
8,958✔
555
    });
8,958✔
556

4,410✔
557
    if (m_observer_ptr)
8,958✔
558
        m_observer_ptr->did_create_thread();
×
559

4,410✔
560
    {
8,958✔
561
        std::lock_guard<std::mutex> lock(m_mutex);
8,958✔
562
        REALM_ASSERT(m_state == State::Starting);
8,958✔
563
    }
8,958✔
564

4,410✔
565
    // We update the state to Running from inside the event loop so that start() is blocked until
4,410✔
566
    // the event loop is actually ready to receive work.
4,410✔
567
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
8,958✔
568
        if (status == ErrorCodes::OperationAborted) {
8,958✔
569
            return;
×
570
        }
×
571

4,410✔
572
        REALM_ASSERT(status.is_ok());
8,958✔
573

4,410✔
574
        std::unique_lock<std::mutex> lock(m_mutex);
8,958✔
575
        // This is a callback from a previous generation
4,410✔
576
        if (m_event_loop_generation != my_generation) {
8,958✔
577
            return;
4✔
578
        }
4✔
579
        if (m_state == State::Stopping) {
8,954✔
580
            return;
×
581
        }
×
582
        m_logger_ptr->trace("Default event loop: service run");
8,954✔
583
        REALM_ASSERT(m_state == State::Starting);
8,954✔
584
        do_state_update(lock, State::Running);
8,954✔
585
    });
8,954✔
586

4,410✔
587
    // If there is no event loop observer or handle_error function registered, then just
4,410✔
588
    // allow the exception to bubble to the top so we can get a true stack trace
4,410✔
589
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
8,958!
590
        m_service.run_until_stopped(); // Throws
8,958✔
591
    }
8,958✔
592
    else {
×
593
        try {
×
594
            m_service.run_until_stopped(); // Throws
×
595
        }
×
596
        catch (const std::exception& e) {
×
597
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
×
598
            std::unique_lock<std::mutex> lock(m_mutex);
×
599
            // Service is no longer running, event loop thread is stopping
600
            do_state_update(lock, State::Stopping);
×
601
            lock.unlock();
×
602
            m_logger_ptr->error("Default event loop exception: ", e.what());
×
603
            // If the error was not handled by the thread loop observer, then rethrow
604
            if (!m_observer_ptr->handle_error(e))
×
605
                throw;
×
606
        }
×
607
    }
×
608
}
8,958✔
609

610
void DefaultSocketProvider::stop(bool wait_for_stop)
611
{
8,958✔
612
    std::unique_lock<std::mutex> lock(m_mutex);
8,958✔
613

4,410✔
614
    // Do nothing if the thread is not started or running or stop has already been called
4,410✔
615
    if (m_state == State::Starting || m_state == State::Running) {
8,958✔
616
        m_logger_ptr->trace("Default event loop: stop()");
8,954✔
617
        do_state_update(lock, State::Stopping);
8,954✔
618
        // Updating state to Stopping will free a start() if it is waiting for the thread to
4,408✔
619
        // start and may cause the thread to exit early before calling service.run()
4,408✔
620
        m_service.stop(); // Unblocks m_service.run()
8,954✔
621
    }
8,954✔
622

4,410✔
623
    // Wait until the thread is stopped (exited) if requested
4,410✔
624
    if (wait_for_stop) {
8,958✔
625
        m_logger_ptr->trace("Default event loop: wait for stop");
8,958✔
626
        state_wait_for(lock, State::Stopped);
8,958✔
627
        if (m_thread.joinable()) {
8,958✔
628
            m_thread.join();
8,954✔
629
        }
8,954✔
630
    }
8,958✔
631
}
8,958✔
632

633
//                    +---------------------------------------+
634
//                   \/                                       |
635
// State Machine: Stopped -> Starting -> Running -> Stopping -+
636
//                              |           |          ^
637
//                              +----------------------+
638

639
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
640
{
26,866✔
641
    // m_state_mutex should already be locked...
13,226✔
642
    m_state = new_state;
26,866✔
643
    m_state_cv.notify_all(); // Let any waiters check the state
26,866✔
644
}
26,866✔
645

646
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
647
{
17,912✔
648
    // Check for condition already met or superseded
8,818✔
649
    if (m_state >= expected_state)
17,912✔
650
        return;
4✔
651

8,816✔
652
    m_state_cv.wait(lock, [this, expected_state]() {
35,816✔
653
        // are we there yet?
17,632✔
654
        if (m_state < expected_state)
35,816✔
655
            return false;
17,908✔
656
        return true;
17,908✔
657
    });
17,908✔
658
}
17,908✔
659

660
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
661
                                                                   WebSocketEndpoint&& endpoint)
662
{
3,274✔
663
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
3,274✔
664
                                                  std::move(observer), std::move(endpoint));
3,274✔
665
}
3,274✔
666

667
} // namespace realm::sync::websocket
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc