• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_3237

21 May 2024 01:18PM UTC coverage: 90.836% (+0.01%) from 90.823%
jonathan.reams_3237

push

Evergreen

web-flow
Upload also the realm file when the fuzzer fails (#7700)

* upload realm file if fuzzer reports a crash

* better comment and delete realm file once fuzzer has finished

* fix upload file name

101830 of 180184 branches covered (56.51%)

214950 of 236635 relevant lines covered (90.84%)

5628660.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.49
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
1,700✔
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
1,700✔
23
        : m_logger_ptr{logger_ptr}
1,700✔
24
        , m_network_logger{*m_logger_ptr}
1,700✔
25
        , m_random{random}
1,700✔
26
        , m_service{service}
1,700✔
27
        , m_user_agent{user_agent}
1,700✔
28
        , m_observer{std::move(observer)}
1,700✔
29
        , m_endpoint{std::move(endpoint)}
3,632✔
30
        , m_websocket(*this)
3,632✔
31
    {
3,632✔
32
        initiate_resolve();
3,632✔
33
    }
34

35
    virtual ~DefaultWebSocketImpl() = default;
678✔
36

678✔
37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
678✔
38
    {
39
        m_websocket.async_write_binary(data.data(), data.size(),
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
100,650✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
100,650✔
42
                                       });
100,650✔
43
    }
✔
44

×
45
    std::string_view get_appservices_request_id() const noexcept override
×
46
    {
×
47
        return m_app_services_coid;
✔
48
    }
×
49

✔
50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
×
51
    {
×
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
100,648✔
53
    }
100,648✔
54

100,650✔
55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
100,648✔
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
100,648✔
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
100,648✔
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
100,648✔
59

100,648✔
60
private:
100,476✔
61
    using milliseconds_type = std::int_fast64_t;
1,246✔
62

1,246✔
63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
1,246✔
64
    {
1,246✔
65
        return m_logger_ptr;
99,230✔
66
    }
99,230✔
67
    std::mt19937_64& websocket_get_random() noexcept override
118✔
68
    {
118✔
69
        return m_random;
99,112✔
70
    }
99,112✔
71

100,648✔
72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
73
    {
74
        const std::string empty;
3,542✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
3,542✔
76
            m_app_services_coid = it->second;
3,542✔
77
        }
78
        auto it = headers.find("Sec-WebSocket-Protocol");
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
12✔
80
    }
12✔
81
    void websocket_read_error_handler(std::error_code ec) override
12✔
82
    {
83
        m_network_logger.error("Reading failed: %1", ec.message()); // Throws
84
        constexpr bool was_clean = false;
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
86
    }
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
89
        m_network_logger.error("Writing failed: %1", ec.message()); // Throws
90
        constexpr bool was_clean = false;
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
92
    }
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view body) override
94
    {
95
        WebSocketError error = WebSocketError::websocket_ok;
3,632✔
96
        bool was_clean = true;
3,632✔
97

3,632✔
98
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
99
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
107,164✔
100
            error = WebSocketError::websocket_moved_permanently;
107,164✔
101
        }
107,164✔
102
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
103
            error = WebSocketError::websocket_retry_error;
104
            was_clean = false;
3,546✔
105
        }
3,546✔
106
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
×
107
            error = WebSocketError::websocket_unauthorized;
×
108
        }
3,546✔
109
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
3,546✔
110
            error = WebSocketError::websocket_forbidden;
1,522✔
111
        }
1,522✔
112
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
3,546✔
113
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
3,546✔
114
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
3,546✔
115
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
3,546✔
116
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
117
            error = WebSocketError::websocket_internal_server_error;
548✔
118
            was_clean = false;
548✔
119
        }
538✔
120
        else {
538✔
121
            error = WebSocketError::websocket_fatal_error;
548✔
122
            was_clean = false;
548✔
123
            if (!body.empty()) {
548✔
124
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
125
                auto i = body.find(identifier);
×
126
                if (i != std::string_view::npos) {
×
127
                    std::string_view rest = body.substr(i + identifier.size());
×
128
                    // FIXME: Use std::string_view::begins_with() in C++20.
×
129
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
×
130
                        return (string.size() >= prefix.size() &&
×
131
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
132
                    };
133
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
12✔
134
                        error = WebSocketError::websocket_client_too_old;
12✔
135
                    }
12✔
136
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
137
                        error = WebSocketError::websocket_client_too_new;
12✔
138
                    }
12✔
139
                    else {
12✔
140
                        // Other more complicated forms of mismatch
12✔
141
                        error = WebSocketError::websocket_protocol_mismatch;
×
142
                    }
×
143
                    was_clean = true;
×
144
                }
×
145
            }
×
146
        }
×
147

×
148
        websocket_error_and_close_handler(was_clean, error, ec.message());
×
149
    }
×
150
    void websocket_protocol_error_handler(std::error_code ec) override
×
151
    {
×
152
        constexpr bool was_clean = false;
×
153
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
154
    }
×
155
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
×
156
    {
×
157
        constexpr bool was_clean = true;
×
158

×
159
        return websocket_error_and_close_handler(was_clean, code, message);
×
160
    }
×
161
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
×
162
    {
×
163
        if (!was_clean) {
×
164
            m_observer->websocket_error_handler();
×
165
        }
×
166
        return m_observer->websocket_closed_handler(was_clean, code, reason);
×
167
    }
168
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
×
169
    {
×
170
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
×
171
    }
×
172

×
173
    static Status get_status_from_util_error(std::error_code ec)
×
174
    {
×
175
        if (!ec) {
×
176
            return Status::OK();
×
177
        }
×
178
        switch (ec.value()) {
×
179
            case util::error::operation_aborted:
180
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
×
181
            case util::error::address_family_not_supported:
×
182
                [[fallthrough]];
×
183
            case util::error::invalid_argument:
×
184
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
×
185
            case util::error::no_memory:
×
186
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
187
            case util::error::connection_aborted:
12✔
188
                [[fallthrough]];
12✔
189
            case util::error::connection_reset:
190
                [[fallthrough]];
×
191
            case util::error::broken_pipe:
×
192
                [[fallthrough]];
×
193
            case util::error::resource_unavailable_try_again:
×
194
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
195
            default:
104✔
196
                return {ErrorCodes::Error::UnknownError, ec.message()};
104✔
197
        }
198
    }
104✔
199

104✔
200
    void initiate_resolve();
201
    void handle_resolve(std::error_code, network::Endpoint::List);
678✔
202
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
678✔
203
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
22✔
204
    void initiate_http_tunnel();
22✔
205
    void initiate_websocket_or_ssl_handshake();
12✔
206
    void initiate_ssl_handshake();
12✔
207
    void handle_ssl_handshake(std::error_code);
22✔
208
    void initiate_websocket_handshake();
22✔
209
    void handle_connection_established();
22✔
210

211
    void schedule_urgent_ping();
656✔
212
    void initiate_ping_delay(milliseconds_type now);
656✔
213
    void handle_ping_delay();
104✔
214
    void initiate_pong_timeout();
104✔
215
    void handle_pong_timeout();
104✔
216

217
    const std::shared_ptr<util::Logger> m_logger_ptr;
656✔
218
    util::Logger& m_network_logger;
219
    std::mt19937_64& m_random;
656✔
220
    network::Service& m_service;
552✔
221
    const std::string m_user_agent;
552✔
222
    std::string m_app_services_coid;
656✔
223

656✔
224
    std::unique_ptr<WebSocketObserver> m_observer;
678✔
225

226
    const WebSocketEndpoint m_endpoint;
80,436✔
227
    util::Optional<network::Resolver> m_resolver;
80,436✔
228
    util::Optional<network::Socket> m_socket;
528✔
229
    util::Optional<network::ssl::Context> m_ssl_context;
528✔
230
    util::Optional<network::ssl::Stream> m_ssl_stream;
79,908✔
231
    network::ReadAheadBuffer m_read_ahead_buffer;
79,908✔
232
    websocket::Socket m_websocket;
80,436✔
233
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
234
};
235

99,114✔
236
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
99,114✔
237
{
99,112✔
238
    REALM_ASSERT(m_socket);
99,112✔
239
    if (m_ssl_stream) {
2✔
240
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
✔
241
    }
×
242
    else {
✔
243
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
×
244
    }
✔
245
}
×
246

✔
247

×
248
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
✔
249
{
×
250
    REALM_ASSERT(m_socket);
✔
251
    if (m_ssl_stream) {
×
252
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
✔
253
    }
×
254
    else {
✔
255
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
×
256
    }
✔
257
}
×
258

2✔
259

2✔
260
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
261
{
262
    REALM_ASSERT(m_socket);
263
    if (m_ssl_stream) {
3,070✔
264
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
3,070✔
265
    }
3,070✔
266
    else {
3,070✔
267
        m_socket->async_write(data, size, std::move(handler)); // Throws
3,070✔
268
    }
3,070✔
269
}
3,070✔
270

3,070✔
271

3,070✔
272
void DefaultWebSocketImpl::initiate_resolve()
2,916✔
273
{
2,916✔
274
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
3,070✔
275
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
276

277
    if (m_endpoint.proxy) {
278
        // logger.detail("Using %1 proxy", proxy->type); // Throws
279
    }
280

281
    m_network_logger.detail("Resolving '%1:%2'", address, port); // Throws
282

283
    network::Resolver::Query query(address, util::to_string(port)); // Throws
284
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
285
        // If the operation is aborted, the connection object may have been
286
        // destroyed.
287
        if (ec != util::error::operation_aborted)
288
            handle_resolve(ec, std::move(endpoints)); // Throws
289
    };
290
    m_resolver.emplace(m_service);                                   // Throws
291
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
292
}
293

294

295
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
296
{
297
    if (ec) {
298
        m_network_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port,
299
                               ec.message()); // Throws
300
        constexpr bool was_clean = false;
301
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
302
                                          ec.message()); // Throws
303
        return;
304
    }
305

306
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
307
}
308

309

310
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
311
{
312
    REALM_ASSERT(i < endpoints.size());
313

314
    network::Endpoint ep = *(endpoints.begin() + i);
315
    std::size_t n = endpoints.size();
316
    m_socket.emplace(m_service); // Throws
317
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
318
        // If the operation is aborted, the connection object may have been
319
        // destroyed.
320
        if (ec != util::error::operation_aborted)
182,080✔
321
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
182,080✔
322
    });
182,080✔
323
    m_network_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
154✔
324
}
154✔
325

181,926✔
326
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
181,926✔
327
{
181,926✔
328
    REALM_ASSERT(i < endpoints.size());
182,080✔
329
    const network::Endpoint& ep = *(endpoints.begin() + i);
330
    if (ec) {
331
        m_network_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
332
                               ec.message()); // Throws
32,478✔
333
        std::size_t i_2 = i + 1;
32,478✔
334
        if (i_2 < endpoints.size()) {
32,478✔
335
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
98✔
336
            return;
98✔
337
        }
32,380✔
338
        // All endpoints failed
32,380✔
339
        m_network_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address,
32,380✔
340
                               m_endpoint.port);
32,478✔
341
        constexpr bool was_clean = false;
342
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
343
                                          ec.message()); // Throws
344
        return;
107,152✔
345
    }
107,152✔
346

107,152✔
347
    REALM_ASSERT(m_socket);
98✔
348
    network::Endpoint ep_2 = m_socket->local_endpoint();
98✔
349
    m_network_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
107,054✔
350
                          ep_2.port()); // Throws
107,054✔
351

107,054✔
352
    // TODO: Handle HTTPS proxies
107,152✔
353
    if (m_endpoint.proxy) {
354
        initiate_http_tunnel(); // Throws
355
        return;
356
    }
3,632✔
357

3,632✔
358
    initiate_websocket_or_ssl_handshake(); // Throws
3,632✔
359
}
360

3,632✔
361
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
362
{
×
363
    if (m_endpoint.is_ssl) {
364
        initiate_ssl_handshake(); // Throws
3,632✔
365
    }
366
    else {
3,632✔
367
        initiate_websocket_handshake(); // Throws
3,632✔
368
    }
369
}
370

3,632✔
371
void DefaultWebSocketImpl::initiate_http_tunnel()
3,622✔
372
{
3,632✔
373
    HTTPRequest req;
3,632✔
374
    req.method = HTTPMethod::Connect;
3,632✔
375
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
3,632✔
376
    // TODO handle proxy authorization
377

378
    m_proxy_client.emplace(*this, m_logger_ptr);
379
    auto handler = [this](HTTPResponse response, std::error_code ec) {
3,622✔
380
        if (ec && ec != util::error::operation_aborted) {
3,622✔
381
            m_network_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
4✔
382
            constexpr bool was_clean = false;
4✔
383
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
4✔
384
                                              ec.message()); // Throws
4✔
385
            return;
4✔
386
        }
4✔
387

4✔
388
        if (response.status != HTTPStatus::Ok) {
389
            m_network_logger.error("Proxy server returned response '%1 %2'", response.status,
3,618✔
390
                                   response.reason); // Throws
3,618✔
391
            constexpr bool was_clean = false;
3,618✔
392
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
393
                                              response.reason); // Throws
394
            return;
395
        }
3,618✔
396

3,618✔
397
        initiate_websocket_or_ssl_handshake(); // Throws
398
    };
3,618✔
399

3,618✔
400
    m_proxy_client->async_request(req, std::move(handler)); // Throws
3,618✔
401
}
3,618✔
402

403
void DefaultWebSocketImpl::initiate_ssl_handshake()
404
{
3,618✔
405
    using namespace network::ssl;
3,616✔
406

3,618✔
407
    if (!m_ssl_context) {
3,618✔
408
        m_ssl_context.emplace(); // Throws
3,618✔
409
        if (m_endpoint.verify_servers_ssl_certificate) {
410
            if (m_endpoint.ssl_trust_certificate_path) {
411
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
3,616✔
412
            }
3,616✔
413
            else if (!m_endpoint.ssl_verify_callback) {
3,616✔
414
                m_ssl_context->use_default_verify(); // Throws
3,616✔
415
#if REALM_INCLUDE_CERTS
×
416
                // On platforms like Windows or Android where OpenSSL is not normally found
×
417
                // `use_default_verify()` won't actually be able to load any default certificates.
×
418
                // That's why we bundle a set of trusted certificates ourselves.
×
419
                m_ssl_context->use_included_certificate_roots(); // Throws
×
420
#endif
×
421
            }
×
422
        }
423
    }
×
424

×
425
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
×
426
    m_ssl_stream->set_logger(m_logger_ptr.get());
×
427
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
×
428
    if (m_endpoint.verify_servers_ssl_certificate) {
×
429
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
×
430
        m_ssl_stream->set_server_port(m_endpoint.port);
431
        if (!m_endpoint.ssl_trust_certificate_path) {
3,616✔
432
            if (m_endpoint.ssl_verify_callback) {
3,616✔
433
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
3,616✔
434
            }
3,616✔
435
        }
436
    }
437

3,616✔
438
    auto handler = [this](std::error_code ec) {
×
439
        // If the operation is aborted, the connection object may have been
×
440
        // destroyed.
×
441
        if (ec != util::error::operation_aborted)
442
            handle_ssl_handshake(ec); // Throws
3,616✔
443
    };
3,616✔
444
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
445

446
    // FIXME: We also need to perform the SSL shutdown operation somewhere
3,614✔
447
}
3,614✔
448

24✔
449

24✔
450
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
3,590✔
451
{
3,590✔
452
    if (ec) {
3,590✔
453
        REALM_ASSERT(ec != util::error::operation_aborted);
3,614✔
454
        constexpr bool was_clean = false;
455
        WebSocketError parsed_error_code;
456
        if (ec == network::ssl::Errors::tls_handshake_failed) {
×
457
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
×
458
        }
×
459
        else {
×
460
            parsed_error_code = WebSocketError::websocket_connection_failed;
461
        }
462

×
463
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
×
464
        return;
×
465
    }
×
466

×
467
    initiate_websocket_handshake(); // Throws
×
468
}
×
469

×
470

×
471
void DefaultWebSocketImpl::initiate_websocket_handshake()
472
{
×
473
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
×
474
    headers["User-Agent"] = m_user_agent;
×
475

×
476
    // Compute the value of the "Host" header.
×
477
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
×
478
    auto host = m_endpoint.port == default_port ? m_endpoint.address
×
479
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
×
480

481
    // Convert the list of protocols to a string
×
482
    std::ostringstream protocol_list;
×
483
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
484
    protocol_list.imbue(std::locale::classic());
×
485
    if (m_endpoint.protocols.size() > 1)
×
486
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
487
                  std::ostream_iterator<std::string>(protocol_list, ", "));
488
    protocol_list << m_endpoint.protocols.back();
24✔
489

24✔
490
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
491
                                          std::move(headers)); // Throws
24✔
492
}
24✔
493
} // namespace
24✔
494

20✔
495
///
10✔
496
/// DefaultSocketProvider - default socket provider implementation
10✔
497
///
10✔
498

4✔
499
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
2✔
500
                                             const std::string& user_agent,
501
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
502
                                             AutoStart auto_start)
503
    : m_logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger)}
2✔
504
    , m_observer_ptr{observer_ptr}
2✔
505
    , m_user_agent{user_agent}
4✔
506
    , m_state{State::Stopped}
20✔
507
{
24✔
508
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
509
    util::seed_prng_nondeterministically(m_random); // Throws
24✔
510
    if (auto_start) {
24✔
511
        start();
24✔
512
    }
24✔
513
}
20✔
514

20✔
515
DefaultSocketProvider::~DefaultSocketProvider()
20✔
516
{
10✔
517
    m_logger_ptr->trace("Default event loop teardown");
6✔
518
    // Wait for the thread to stop
6✔
519
    stop(true);
10✔
520
    // Shutting down - no need to lock mutex before check
20✔
521
    REALM_ASSERT(m_state == State::Stopped);
522
}
24✔
523

524
void DefaultSocketProvider::start()
525
{
24✔
526
    std::unique_lock<std::mutex> lock(m_mutex);
24✔
527
    // Has the thread already been started or is running
24✔
528
    if (m_state == State::Starting || m_state == State::Running)
24✔
529
        return; // early return
530

531
    // If the thread has been previously run, make sure it has been joined first
24✔
532
    if (m_state == State::Stopping) {
533
        state_wait_for(lock, State::Stopped);
534
    }
535

24✔
536
    m_logger_ptr->trace("Default event loop: start()");
24✔
537
    REALM_ASSERT(m_state == State::Stopped);
10✔
538

10✔
539
    do_state_update(lock, State::Starting);
10✔
540
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
10✔
541
    // Wait for the thread to start before continuing
10✔
542
    state_wait_for(lock, State::Running);
10✔
543
}
×
544

×
545
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
×
546
{
547
    {
10✔
548
        std::unique_lock<std::mutex> lk(provider->m_mutex);
10✔
549
        REALM_ASSERT(provider->m_state == State::Stopped);
10✔
550
        provider->do_state_update(lk, State::Starting);
551
    }
14✔
552

14✔
553
    provider->event_loop();
554
}
555

556
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
3,606✔
557
{
3,606✔
558
    std::unique_lock<std::mutex> lk(provider->m_mutex);
3,606✔
559
    REALM_ASSERT(provider->m_state == State::Stopped);
560
    provider->m_service.reset();
561
}
3,606✔
562

3,606✔
563
void DefaultSocketProvider::event_loop()
3,606✔
564
{
565
    m_logger_ptr->trace("Default event loop: thread running");
566
    // Calls will_destroy_thread() when destroyed
3,606✔
567
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
3,606✔
568
        m_logger_ptr->trace("Default event loop: thread exiting");
3,606✔
569
        if (m_observer_ptr)
3,606✔
570
            m_observer_ptr->will_destroy_thread();
3,604✔
571

3,604✔
572
        std::unique_lock<std::mutex> lock(m_mutex);
3,606✔
573
        // Did we get here due to an unhandled exception?
574
        if (m_state != State::Stopping) {
3,606✔
575
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
3,606✔
576
        }
3,606✔
577
        m_state = State::Stopped;
578
        lock.unlock();
579
        m_state_cv.notify_all();
4,160✔
580
    });
4,160✔
581

4,160✔
582
    if (m_observer_ptr)
4,160✔
583
        m_observer_ptr->did_create_thread();
2,916✔
584

2,916✔
585
    {
2,916✔
586
        std::lock_guard<std::mutex> lock(m_mutex);
2,916✔
587
        REALM_ASSERT(m_state == State::Starting);
2,916✔
588
    }
2,916✔
589

2,916✔
590
    // We update the state to Running from inside the event loop so that start() is blocked until
×
591
    // the event loop is actually ready to receive work.
×
592
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
×
593
        if (status == ErrorCodes::OperationAborted) {
×
594
            return;
×
595
        }
×
596

×
597
        REALM_ASSERT(status.is_ok());
×
598

×
599
        std::unique_lock<std::mutex> lock(m_mutex);
×
600
        // This is a callback from a previous generation
2,916✔
601
        if (m_event_loop_generation != my_generation) {
602
            return;
4,160✔
603
        }
2,910✔
604
        if (m_state == State::Stopping) {
2,910✔
605
            return;
2,910✔
606
        }
6✔
607
        m_logger_ptr->trace("Default event loop: service run");
6✔
608
        REALM_ASSERT(m_state == State::Starting);
2,904!
609
        do_state_update(lock, State::Running);
2,904✔
610
    });
2,904✔
611

2,904✔
612
    // If there is no event loop observer or handle_error function registered, then just
2,904✔
613
    // allow the exception to bubble to the top so we can get a true stack trace
2,904✔
614
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
2,910✔
615
        m_service.run_until_stopped(); // Throws
4,160✔
616
    }
617
    else {
618
        try {
3,630✔
619
            m_service.run_until_stopped(); // Throws
3,630✔
620
        }
3,630✔
621
        catch (const std::exception& e) {
3,630✔
622
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
623
            std::unique_lock<std::mutex> lock(m_mutex);
624
            // Service is no longer running, event loop thread is stopping
3,654✔
625
            do_state_update(lock, State::Stopping);
3,654✔
626
            lock.unlock();
60✔
627
            m_logger_ptr->error("Default event loop exception: ", e.what());
60✔
628
            // If the error was not handled by the thread loop observer, then rethrow
10✔
629
            if (!m_observer_ptr->handle_error(e))
10✔
630
                throw;
10✔
631
        }
10✔
632
    }
10✔
633
}
10✔
634

50✔
635
void DefaultSocketProvider::stop(bool wait_for_stop)
50✔
636
{
50✔
637
    std::unique_lock<std::mutex> lock(m_mutex);
2,914✔
638

2,914✔
639
    // Do nothing if the thread is not started or running or stop has already been called
2,914✔
640
    if (m_state == State::Starting || m_state == State::Running) {
2,914✔
641
        m_logger_ptr->trace("Default event loop: stop()");
2,914✔
642
        do_state_update(lock, State::Stopping);
10✔
643
        // Updating state to Stopping will free a start() if it is waiting for the thread to
10✔
644
        // start and may cause the thread to exit early before calling service.run()
10✔
645
        m_service.stop(); // Unblocks m_service.run()
668✔
646
    }
668✔
647

668✔
648
    // Wait until the thread is stopped (exited) if requested
668✔
649
    if (wait_for_stop) {
3,654✔
650
        m_logger_ptr->trace("Default event loop: wait for stop");
3,654✔
651
        state_wait_for(lock, State::Stopped);
652
        if (m_thread.joinable()) {
653
            m_thread.join();
654
        }
655
    }
656
}
4,890✔
657

4,890✔
658
//                    +---------------------------------------+
4,890✔
659
//                   \/                                       |
4,890✔
660
// State Machine: Stopped -> Starting -> Running -> Stopping -+
9,920✔
661
//                              |           |          ^
9,920✔
662
//                              +----------------------+
9,920✔
663

9,920✔
664
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
9,158✔
665
{
9,158✔
666
    // m_state_mutex should already be locked...
9,920✔
667
    m_state = new_state;
668
    m_state_cv.notify_all(); // Let any waiters check the state
669
}
9,910✔
670

9,910✔
671
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
672
{
9,910✔
673
    // Check for condition already met or superseded
674
    if (m_state >= expected_state)
9,910✔
675
        return;
9,910✔
676

677
    m_state_cv.wait(lock, [this, expected_state]() {
678
        // are we there yet?
9,922✔
679
        if (m_state < expected_state)
9,922✔
680
            return false;
681
        return true;
9,922✔
682
    });
×
683
}
684

685
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
9,922✔
686
                                                                   WebSocketEndpoint&& endpoint)
×
687
{
×
688
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
689
                                                  std::move(observer), std::move(endpoint));
9,922✔
690
}
9,922✔
691

692
} // namespace realm::sync::websocket
9,922✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc