• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_3235

21 May 2024 01:18PM UTC coverage: 90.812% (-0.01%) from 90.823%
jonathan.reams_3235

push

Evergreen

web-flow
Upload also the realm file when the fuzzer fails (#7700)

* upload realm file if fuzzer reports a crash

* better comment and delete realm file once fuzzer has finished

* fix upload file name

101820 of 180192 branches covered (56.51%)

214893 of 236634 relevant lines covered (90.81%)

5479548.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.1
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
1,750✔
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
1,750✔
23
        : m_logger_ptr{logger_ptr}
1,750✔
24
        , m_network_logger{*m_logger_ptr}
1,750✔
25
        , m_random{random}
1,750✔
26
        , m_service{service}
1,750✔
27
        , m_user_agent{user_agent}
1,750✔
28
        , m_observer{std::move(observer)}
1,750✔
29
        , m_endpoint{std::move(endpoint)}
3,638✔
30
        , m_websocket(*this)
3,638✔
31
    {
3,638✔
32
        initiate_resolve();
3,638✔
33
    }
34

35
    virtual ~DefaultWebSocketImpl() = default;
3,636✔
36

3,636✔
37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
3,636✔
38
    {
39
        m_websocket.async_write_binary(data.data(), data.size(),
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
100,698✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
100,698✔
42
                                       });
100,698✔
43
    }
✔
44

×
45
    std::string_view get_appservices_request_id() const noexcept override
×
46
    {
×
47
        return m_app_services_coid;
✔
48
    }
×
49

✔
50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
×
51
    {
✔
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
×
53
    }
×
54

100,694✔
55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
100,694✔
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
100,698✔
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
100,692✔
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
100,692✔
59

100,692✔
60
private:
100,692✔
61
    using milliseconds_type = std::int_fast64_t;
100,692✔
62

100,522✔
63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
1,234✔
64
    {
1,234✔
65
        return m_logger_ptr;
1,234✔
66
    }
1,234✔
67
    std::mt19937_64& websocket_get_random() noexcept override
1,234✔
68
    {
99,288✔
69
        return m_random;
99,288✔
70
    }
94✔
71

94✔
72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
99,194✔
73
    {
99,194✔
74
        const std::string empty;
100,692✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
76
            m_app_services_coid = it->second;
77
        }
3,558✔
78
        auto it = headers.find("Sec-WebSocket-Protocol");
3,558✔
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
3,558✔
80
    }
81
    void websocket_read_error_handler(std::error_code ec) override
82
    {
12✔
83
        m_network_logger.error("Reading failed: %1", ec.message()); // Throws
12✔
84
        constexpr bool was_clean = false;
12✔
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
86
    }
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
89
        m_network_logger.error("Writing failed: %1", ec.message()); // Throws
90
        constexpr bool was_clean = false;
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
92
    }
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view body) override
94
    {
95
        WebSocketError error = WebSocketError::websocket_ok;
96
        bool was_clean = true;
97

98
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
3,640✔
99
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
3,640✔
100
            error = WebSocketError::websocket_moved_permanently;
3,640✔
101
        }
102
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
107,258✔
103
            error = WebSocketError::websocket_retry_error;
107,258✔
104
            was_clean = false;
107,258✔
105
        }
106
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
107
            error = WebSocketError::websocket_unauthorized;
3,562✔
108
        }
3,562✔
109
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
110
            error = WebSocketError::websocket_forbidden;
×
111
        }
3,562✔
112
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
3,562✔
113
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
1,522✔
114
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
1,522✔
115
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
3,562✔
116
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
3,562✔
117
            error = WebSocketError::websocket_internal_server_error;
3,562✔
118
            was_clean = false;
3,562✔
119
        }
120
        else {
3,450✔
121
            error = WebSocketError::websocket_fatal_error;
3,450✔
122
            was_clean = false;
524✔
123
            if (!body.empty()) {
524✔
124
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
3,450✔
125
                auto i = body.find(identifier);
3,450✔
126
                if (i != std::string_view::npos) {
3,450✔
127
                    std::string_view rest = body.substr(i + identifier.size());
128
                    // FIXME: Use std::string_view::begins_with() in C++20.
46✔
129
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
46✔
130
                        return (string.size() >= prefix.size() &&
×
131
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
132
                    };
46✔
133
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
46✔
134
                        error = WebSocketError::websocket_client_too_old;
46✔
135
                    }
136
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
12✔
137
                        error = WebSocketError::websocket_client_too_new;
12✔
138
                    }
12✔
139
                    else {
140
                        // Other more complicated forms of mismatch
12✔
141
                        error = WebSocketError::websocket_protocol_mismatch;
12✔
142
                    }
12✔
143
                    was_clean = true;
12✔
144
                }
×
145
            }
×
146
        }
×
147

×
148
        websocket_error_and_close_handler(was_clean, error, ec.message());
×
149
    }
×
150
    void websocket_protocol_error_handler(std::error_code ec) override
×
151
    {
×
152
        constexpr bool was_clean = false;
×
153
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
154
    }
×
155
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
×
156
    {
×
157
        constexpr bool was_clean = true;
×
158

×
159
        return websocket_error_and_close_handler(was_clean, code, message);
×
160
    }
×
161
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
×
162
    {
×
163
        if (!was_clean) {
×
164
            m_observer->websocket_error_handler();
×
165
        }
×
166
        return m_observer->websocket_closed_handler(was_clean, code, reason);
×
167
    }
×
168
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
×
169
    {
×
170
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
171
    }
×
172

×
173
    static Status get_status_from_util_error(std::error_code ec)
×
174
    {
×
175
        if (!ec) {
×
176
            return Status::OK();
×
177
        }
×
178
        switch (ec.value()) {
×
179
            case util::error::operation_aborted:
×
180
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
×
181
            case util::error::address_family_not_supported:
×
182
                [[fallthrough]];
183
            case util::error::invalid_argument:
×
184
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
×
185
            case util::error::no_memory:
×
186
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
×
187
            case util::error::connection_aborted:
×
188
                [[fallthrough]];
×
189
            case util::error::connection_reset:
190
                [[fallthrough]];
12✔
191
            case util::error::broken_pipe:
12✔
192
                [[fallthrough]];
193
            case util::error::resource_unavailable_try_again:
×
194
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
×
195
            default:
×
196
                return {ErrorCodes::Error::UnknownError, ec.message()};
×
197
        }
198
    }
108✔
199

108✔
200
    void initiate_resolve();
201
    void handle_resolve(std::error_code, network::Endpoint::List);
108✔
202
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
108✔
203
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
204
    void initiate_http_tunnel();
3,632✔
205
    void initiate_websocket_or_ssl_handshake();
3,632✔
206
    void initiate_ssl_handshake();
2,988✔
207
    void handle_ssl_handshake(std::error_code);
2,988✔
208
    void initiate_websocket_handshake();
10✔
209
    void handle_connection_established();
10✔
210

2,988✔
211
    void schedule_urgent_ping();
2,988✔
212
    void initiate_ping_delay(milliseconds_type now);
2,988✔
213
    void handle_ping_delay();
2,988✔
214
    void initiate_pong_timeout();
215
    void handle_pong_timeout();
644✔
216

644✔
217
    const std::shared_ptr<util::Logger> m_logger_ptr;
106✔
218
    util::Logger& m_network_logger;
106✔
219
    std::mt19937_64& m_random;
106✔
220
    network::Service& m_service;
221
    const std::string m_user_agent;
644✔
222
    std::string m_app_services_coid;
223

644✔
224
    std::unique_ptr<WebSocketObserver> m_observer;
538✔
225

538✔
226
    const WebSocketEndpoint m_endpoint;
644✔
227
    util::Optional<network::Resolver> m_resolver;
644✔
228
    util::Optional<network::Socket> m_socket;
3,632✔
229
    util::Optional<network::ssl::Context> m_ssl_context;
230
    util::Optional<network::ssl::Stream> m_ssl_stream;
79,912✔
231
    network::ReadAheadBuffer m_read_ahead_buffer;
79,912✔
232
    websocket::Socket m_websocket;
594✔
233
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
594✔
234
};
79,318✔
235

79,318✔
236
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
79,912✔
237
{
238
    REALM_ASSERT(m_socket);
239
    if (m_ssl_stream) {
99,196✔
240
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
99,196✔
241
    }
99,196✔
242
    else {
99,196✔
243
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
×
244
    }
×
245
}
×
246

×
247

×
248
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
×
249
{
×
250
    REALM_ASSERT(m_socket);
×
251
    if (m_ssl_stream) {
×
252
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
×
253
    }
×
254
    else {
×
255
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
×
256
    }
×
257
}
×
258

×
259

×
260
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
×
261
{
×
262
    REALM_ASSERT(m_socket);
×
263
    if (m_ssl_stream) {
×
264
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
265
    }
266
    else {
267
        m_socket->async_write(data, size, std::move(handler)); // Throws
3,086✔
268
    }
3,086✔
269
}
3,086✔
270

3,086✔
271

3,086✔
272
void DefaultWebSocketImpl::initiate_resolve()
3,086✔
273
{
3,086✔
274
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
2,932✔
275
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
2,932✔
276

3,086✔
277
    if (m_endpoint.proxy) {
278
        // logger.detail("Using %1 proxy", proxy->type); // Throws
279
    }
280

281
    m_network_logger.detail("Resolving '%1:%2'", address, port); // Throws
282

283
    network::Resolver::Query query(address, util::to_string(port)); // Throws
284
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
285
        // If the operation is aborted, the connection object may have been
286
        // destroyed.
287
        if (ec != util::error::operation_aborted)
288
            handle_resolve(ec, std::move(endpoints)); // Throws
289
    };
290
    m_resolver.emplace(m_service);                                   // Throws
291
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
292
}
293

294

295
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
296
{
297
    if (ec) {
298
        m_network_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port,
299
                               ec.message()); // Throws
300
        constexpr bool was_clean = false;
301
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
302
                                          ec.message()); // Throws
303
        return;
304
    }
305

306
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
307
}
308

309

310
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
311
{
312
    REALM_ASSERT(i < endpoints.size());
313

314
    network::Endpoint ep = *(endpoints.begin() + i);
315
    std::size_t n = endpoints.size();
316
    m_socket.emplace(m_service); // Throws
317
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
318
        // If the operation is aborted, the connection object may have been
319
        // destroyed.
320
        if (ec != util::error::operation_aborted)
321
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
322
    });
180,644✔
323
    m_network_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
180,644✔
324
}
180,644✔
325

154✔
326
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
154✔
327
{
180,490✔
328
    REALM_ASSERT(i < endpoints.size());
180,490✔
329
    const network::Endpoint& ep = *(endpoints.begin() + i);
180,490✔
330
    if (ec) {
180,644✔
331
        m_network_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
332
                               ec.message()); // Throws
333
        std::size_t i_2 = i + 1;
334
        if (i_2 < endpoints.size()) {
32,588✔
335
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
32,588✔
336
            return;
32,588✔
337
        }
98✔
338
        // All endpoints failed
98✔
339
        m_network_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address,
32,490✔
340
                               m_endpoint.port);
32,490✔
341
        constexpr bool was_clean = false;
32,490✔
342
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
32,588✔
343
                                          ec.message()); // Throws
344
        return;
345
    }
346

107,244✔
347
    REALM_ASSERT(m_socket);
107,244✔
348
    network::Endpoint ep_2 = m_socket->local_endpoint();
107,244✔
349
    m_network_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
98✔
350
                          ep_2.port()); // Throws
98✔
351

107,146✔
352
    // TODO: Handle HTTPS proxies
107,146✔
353
    if (m_endpoint.proxy) {
107,146✔
354
        initiate_http_tunnel(); // Throws
107,244✔
355
        return;
356
    }
357

358
    initiate_websocket_or_ssl_handshake(); // Throws
3,638✔
359
}
3,638✔
360

3,638✔
361
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
362
{
3,638✔
363
    if (m_endpoint.is_ssl) {
364
        initiate_ssl_handshake(); // Throws
×
365
    }
366
    else {
3,638✔
367
        initiate_websocket_handshake(); // Throws
368
    }
3,638✔
369
}
3,640✔
370

371
void DefaultWebSocketImpl::initiate_http_tunnel()
372
{
3,640✔
373
    HTTPRequest req;
3,634✔
374
    req.method = HTTPMethod::Connect;
3,640✔
375
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
3,638✔
376
    // TODO handle proxy authorization
3,638✔
377

3,638✔
378
    m_proxy_client.emplace(*this, m_logger_ptr);
379
    auto handler = [this](HTTPResponse response, std::error_code ec) {
380
        if (ec && ec != util::error::operation_aborted) {
381
            m_network_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
3,634✔
382
            constexpr bool was_clean = false;
3,634✔
383
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
4✔
384
                                              ec.message()); // Throws
4✔
385
            return;
4✔
386
        }
4✔
387

4✔
388
        if (response.status != HTTPStatus::Ok) {
4✔
389
            m_network_logger.error("Proxy server returned response '%1 %2'", response.status,
4✔
390
                                   response.reason); // Throws
391
            constexpr bool was_clean = false;
3,630✔
392
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
3,630✔
393
                                              response.reason); // Throws
3,630✔
394
            return;
395
        }
396

397
        initiate_websocket_or_ssl_handshake(); // Throws
3,630✔
398
    };
3,630✔
399

400
    m_proxy_client->async_request(req, std::move(handler)); // Throws
3,630✔
401
}
3,630✔
402

3,630✔
403
void DefaultWebSocketImpl::initiate_ssl_handshake()
3,630✔
404
{
405
    using namespace network::ssl;
406

3,630✔
407
    if (!m_ssl_context) {
3,630✔
408
        m_ssl_context.emplace(); // Throws
3,630✔
409
        if (m_endpoint.verify_servers_ssl_certificate) {
3,630✔
410
            if (m_endpoint.ssl_trust_certificate_path) {
3,630✔
411
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
412
            }
413
            else if (!m_endpoint.ssl_verify_callback) {
3,630✔
414
                m_ssl_context->use_default_verify(); // Throws
3,630✔
415
#if REALM_INCLUDE_CERTS
3,630✔
416
                // On platforms like Windows or Android where OpenSSL is not normally found
3,630✔
417
                // `use_default_verify()` won't actually be able to load any default certificates.
×
418
                // That's why we bundle a set of trusted certificates ourselves.
×
419
                m_ssl_context->use_included_certificate_roots(); // Throws
×
420
#endif
×
421
            }
×
422
        }
×
423
    }
×
424

425
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
×
426
    m_ssl_stream->set_logger(m_logger_ptr.get());
×
427
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
×
428
    if (m_endpoint.verify_servers_ssl_certificate) {
×
429
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
×
430
        m_ssl_stream->set_server_port(m_endpoint.port);
×
431
        if (!m_endpoint.ssl_trust_certificate_path) {
×
432
            if (m_endpoint.ssl_verify_callback) {
433
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
3,630✔
434
            }
3,630✔
435
        }
3,630✔
436
    }
3,630✔
437

438
    auto handler = [this](std::error_code ec) {
439
        // If the operation is aborted, the connection object may have been
3,630✔
440
        // destroyed.
×
441
        if (ec != util::error::operation_aborted)
×
442
            handle_ssl_handshake(ec); // Throws
×
443
    };
444
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
3,630✔
445

3,630✔
446
    // FIXME: We also need to perform the SSL shutdown operation somewhere
447
}
448

3,630✔
449

3,630✔
450
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
24✔
451
{
24✔
452
    if (ec) {
3,606✔
453
        REALM_ASSERT(ec != util::error::operation_aborted);
3,606✔
454
        constexpr bool was_clean = false;
3,606✔
455
        WebSocketError parsed_error_code;
3,630✔
456
        if (ec == network::ssl::Errors::tls_handshake_failed) {
457
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
458
        }
×
459
        else {
×
460
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
461
        }
×
462

463
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
464
        return;
×
465
    }
×
466

×
467
    initiate_websocket_handshake(); // Throws
×
468
}
×
469

×
470

×
471
void DefaultWebSocketImpl::initiate_websocket_handshake()
×
472
{
×
473
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
474
    headers["User-Agent"] = m_user_agent;
×
475

×
476
    // Compute the value of the "Host" header.
×
477
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
×
478
    auto host = m_endpoint.port == default_port ? m_endpoint.address
×
479
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
×
480

×
481
    // Convert the list of protocols to a string
×
482
    std::ostringstream protocol_list;
483
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
×
484
    protocol_list.imbue(std::locale::classic());
×
485
    if (m_endpoint.protocols.size() > 1)
486
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
×
487
                  std::ostream_iterator<std::string>(protocol_list, ", "));
×
488
    protocol_list << m_endpoint.protocols.back();
489

490
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
24✔
491
                                          std::move(headers)); // Throws
24✔
492
}
493
} // namespace
24✔
494

24✔
495
///
24✔
496
/// DefaultSocketProvider - default socket provider implementation
20✔
497
///
10✔
498

10✔
499
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
10✔
500
                                             const std::string& user_agent,
4✔
501
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
2✔
502
                                             AutoStart auto_start)
503
    : m_logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger)}
504
    , m_observer_ptr{observer_ptr}
505
    , m_user_agent{user_agent}
2✔
506
    , m_state{State::Stopped}
2✔
507
{
4✔
508
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
20✔
509
    util::seed_prng_nondeterministically(m_random); // Throws
24✔
510
    if (auto_start) {
511
        start();
24✔
512
    }
24✔
513
}
24✔
514

24✔
515
DefaultSocketProvider::~DefaultSocketProvider()
20✔
516
{
20✔
517
    m_logger_ptr->trace("Default event loop teardown");
20✔
518
    // Wait for the thread to stop
10✔
519
    stop(true);
6✔
520
    // Shutting down - no need to lock mutex before check
6✔
521
    REALM_ASSERT(m_state == State::Stopped);
10✔
522
}
20✔
523

524
void DefaultSocketProvider::start()
24✔
525
{
526
    std::unique_lock<std::mutex> lock(m_mutex);
527
    // Has the thread already been started or is running
24✔
528
    if (m_state == State::Starting || m_state == State::Running)
24✔
529
        return; // early return
24✔
530

24✔
531
    // If the thread has been previously run, make sure it has been joined first
532
    if (m_state == State::Stopping) {
533
        state_wait_for(lock, State::Stopped);
24✔
534
    }
535

536
    m_logger_ptr->trace("Default event loop: start()");
537
    REALM_ASSERT(m_state == State::Stopped);
24✔
538

24✔
539
    do_state_update(lock, State::Starting);
10✔
540
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
10✔
541
    // Wait for the thread to start before continuing
10✔
542
    state_wait_for(lock, State::Running);
10✔
543
}
10✔
544

10✔
545
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
×
546
{
×
547
    {
×
548
        std::unique_lock<std::mutex> lk(provider->m_mutex);
549
        REALM_ASSERT(provider->m_state == State::Stopped);
10✔
550
        provider->do_state_update(lk, State::Starting);
10✔
551
    }
10✔
552

553
    provider->event_loop();
14✔
554
}
14✔
555

556
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
557
{
558
    std::unique_lock<std::mutex> lk(provider->m_mutex);
3,620✔
559
    REALM_ASSERT(provider->m_state == State::Stopped);
3,620✔
560
    provider->m_service.reset();
3,620✔
561
}
562

563
void DefaultSocketProvider::event_loop()
3,620✔
564
{
3,620✔
565
    m_logger_ptr->trace("Default event loop: thread running");
3,620✔
566
    // Calls will_destroy_thread() when destroyed
567
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
568
        m_logger_ptr->trace("Default event loop: thread exiting");
3,620✔
569
        if (m_observer_ptr)
3,620✔
570
            m_observer_ptr->will_destroy_thread();
3,620✔
571

3,620✔
572
        std::unique_lock<std::mutex> lock(m_mutex);
3,620✔
573
        // Did we get here due to an unhandled exception?
3,620✔
574
        if (m_state != State::Stopping) {
3,620✔
575
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
576
        }
3,620✔
577
        m_state = State::Stopped;
3,620✔
578
        lock.unlock();
3,620✔
579
        m_state_cv.notify_all();
580
    });
581

4,178✔
582
    if (m_observer_ptr)
4,178✔
583
        m_observer_ptr->did_create_thread();
4,178✔
584

4,178✔
585
    {
2,942✔
586
        std::lock_guard<std::mutex> lock(m_mutex);
2,942✔
587
        REALM_ASSERT(m_state == State::Starting);
2,944✔
588
    }
2,944✔
589

2,934✔
590
    // We update the state to Running from inside the event loop so that start() is blocked until
2,934✔
591
    // the event loop is actually ready to receive work.
2,934✔
592
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
10✔
593
        if (status == ErrorCodes::OperationAborted) {
10!
594
            return;
10✔
595
        }
10✔
596

10✔
597
        REALM_ASSERT(status.is_ok());
×
598

×
599
        std::unique_lock<std::mutex> lock(m_mutex);
×
600
        // This is a callback from a previous generation
×
601
        if (m_event_loop_generation != my_generation) {
×
602
            return;
2,942✔
603
        }
604
        if (m_state == State::Stopping) {
4,178✔
605
            return;
2,936✔
606
        }
2,938✔
607
        m_logger_ptr->trace("Default event loop: service run");
2,932✔
608
        REALM_ASSERT(m_state == State::Starting);
×
609
        do_state_update(lock, State::Running);
×
610
    });
2,932✔
611

2,932✔
612
    // If there is no event loop observer or handle_error function registered, then just
2,934✔
613
    // allow the exception to bubble to the top so we can get a true stack trace
2,924✔
614
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
2,924✔
615
        m_service.run_until_stopped(); // Throws
2,932✔
616
    }
2,936✔
617
    else {
4,178✔
618
        try {
619
            m_service.run_until_stopped(); // Throws
620
        }
3,638✔
621
        catch (const std::exception& e) {
3,638✔
622
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
3,638✔
623
            std::unique_lock<std::mutex> lock(m_mutex);
3,638✔
624
            // Service is no longer running, event loop thread is stopping
625
            do_state_update(lock, State::Stopping);
626
            lock.unlock();
6,628✔
627
            m_logger_ptr->error("Default event loop exception: ", e.what());
6,628✔
628
            // If the error was not handled by the thread loop observer, then rethrow
52✔
629
            if (!m_observer_ptr->handle_error(e))
52✔
630
                throw;
6✔
631
        }
6✔
632
    }
6✔
633
}
6✔
634

6✔
635
void DefaultSocketProvider::stop(bool wait_for_stop)
6✔
636
{
46✔
637
    std::unique_lock<std::mutex> lock(m_mutex);
46✔
638

46✔
639
    // Do nothing if the thread is not started or running or stop has already been called
2,942✔
640
    if (m_state == State::Starting || m_state == State::Running) {
2,942✔
641
        m_logger_ptr->trace("Default event loop: stop()");
2,942✔
642
        do_state_update(lock, State::Stopping);
2,942✔
643
        // Updating state to Stopping will free a start() if it is waiting for the thread to
2,942✔
644
        // start and may cause the thread to exit early before calling service.run()
✔
645
        m_service.stop(); // Unblocks m_service.run()
×
646
    }
×
647

✔
648
    // Wait until the thread is stopped (exited) if requested
×
649
    if (wait_for_stop) {
×
650
        m_logger_ptr->trace("Default event loop: wait for stop");
×
651
        state_wait_for(lock, State::Stopped);
3,632✔
652
        if (m_thread.joinable()) {
3,632✔
653
            m_thread.join();
3,632✔
654
        }
3,632✔
655
    }
6,628✔
656
}
6,628✔
657

658
//                    +---------------------------------------+
659
//                   \/                                       |
660
// State Machine: Stopped -> Starting -> Running -> Stopping -+
661
//                              |           |          ^
662
//                              +----------------------+
4,892✔
663

4,892✔
664
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
4,892✔
665
{
4,892✔
666
    // m_state_mutex should already be locked...
9,922✔
667
    m_state = new_state;
9,922✔
668
    m_state_cv.notify_all(); // Let any waiters check the state
9,922✔
669
}
9,922✔
670

9,158✔
671
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
9,158✔
672
{
9,922✔
673
    // Check for condition already met or superseded
674
    if (m_state >= expected_state)
675
        return;
9,910✔
676

9,910✔
677
    m_state_cv.wait(lock, [this, expected_state]() {
678
        // are we there yet?
9,910✔
679
        if (m_state < expected_state)
680
            return false;
9,910✔
681
        return true;
9,910✔
682
    });
683
}
684

9,922✔
685
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
9,922✔
686
                                                                   WebSocketEndpoint&& endpoint)
687
{
9,922✔
688
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
×
689
                                                  std::move(observer), std::move(endpoint));
690
}
691

9,922✔
692
} // namespace realm::sync::websocket
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc