• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_275

09 Apr 2024 03:33AM UTC coverage: 92.608% (+0.5%) from 92.088%
thomas.goyne_275

Pull #7300

Evergreen

tgoyne
Extract some duplicated code in PushClient
Pull Request #7300: Rework sync user handling and metadata storage

102672 of 194970 branches covered (52.66%)

3165 of 3247 new or added lines in 46 files covered. (97.47%)

34 existing lines in 9 files now uncovered.

249420 of 269329 relevant lines covered (92.61%)

45087511.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.44
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
23
        : m_logger_ptr{logger_ptr}
24
        , m_network_logger{*m_logger_ptr}
25
        , m_random{random}
26
        , m_service{service}
27
        , m_user_agent{user_agent}
28
        , m_observer{std::move(observer)}
29
        , m_endpoint{std::move(endpoint)}
30
        , m_websocket(*this)
31
    {
28,356✔
32
        initiate_resolve();
28,356✔
33
    }
28,356✔
34

35
    virtual ~DefaultWebSocketImpl() = default;
28,392✔
36

37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
38
    {
822,230✔
39
        m_websocket.async_write_binary(data.data(), data.size(),
822,230✔
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
821,464✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
821,440✔
42
                                       });
821,440✔
43
    }
822,230✔
44

45
    std::string_view get_appservices_request_id() const noexcept override
46
    {
27,568✔
47
        return m_app_services_coid;
27,568✔
48
    }
27,568✔
49

50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
51
    {
96✔
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
96✔
53
    }
96✔
54

55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
59

60
private:
61
    using milliseconds_type = std::int_fast64_t;
62

63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
64
    {
28,372✔
65
        return m_logger_ptr;
28,372✔
66
    }
28,372✔
67
    std::mt19937_64& websocket_get_random() noexcept override
68
    {
850,324✔
69
        return m_random;
850,324✔
70
    }
850,324✔
71

72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
73
    {
27,600✔
74
        const std::string empty;
27,600✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
27,600✔
76
            m_app_services_coid = it->second;
12,374✔
77
        }
12,374✔
78
        auto it = headers.find("Sec-WebSocket-Protocol");
27,600✔
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
27,600✔
80
    }
27,600✔
81
    void websocket_read_error_handler(std::error_code ec) override
82
    {
4,094✔
83
        m_network_logger.error("Reading failed: %1", ec.message()); // Throws
4,094✔
84
        constexpr bool was_clean = false;
4,094✔
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
4,094✔
86
    }
4,094✔
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
×
89
        m_network_logger.error("Writing failed: %1", ec.message()); // Throws
×
90
        constexpr bool was_clean = false;
×
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
×
92
    }
×
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
94
                                           const std::string_view* body) override
12✔
95
    {
96✔
96
        WebSocketError error = WebSocketError::websocket_ok;
96✔
97
        bool was_clean = true;
90✔
98

54✔
99
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
96✔
100
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
96✔
101
            error = WebSocketError::websocket_moved_permanently;
96✔
102
        }
84!
103
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
×
104
            error = WebSocketError::websocket_retry_error;
×
105
            was_clean = false;
×
106
        }
×
107
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
×
108
            error = WebSocketError::websocket_unauthorized;
×
109
        }
×
110
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
111
            error = WebSocketError::websocket_forbidden;
×
112
        }
×
113
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
×
114
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
×
115
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
×
116
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
×
117
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
×
118
            error = WebSocketError::websocket_internal_server_error;
×
119
            was_clean = false;
×
120
        }
×
121
        else {
×
122
            error = WebSocketError::websocket_fatal_error;
×
NEW
123
            was_clean = false;
×
124
            if (body) {
×
NEW
125
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
×
126
                auto i = body->find(identifier);
×
NEW
127
                if (i != std::string_view::npos) {
×
128
                    std::string_view rest = body->substr(i + identifier.size());
129
                    // FIXME: Use std::string_view::begins_with() in C++20.
130
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
×
131
                        return (string.size() >= prefix.size() &&
×
132
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
133
                    };
×
134
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
×
135
                        error = WebSocketError::websocket_client_too_old;
×
136
                    }
×
137
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
×
138
                        error = WebSocketError::websocket_client_too_new;
×
139
                    }
×
140
                    else {
141
                        // Other more complicated forms of mismatch
142
                        error = WebSocketError::websocket_protocol_mismatch;
×
143
                    }
×
144
                    was_clean = true;
×
145
                }
×
146
            }
×
147
        }
6✔
148

54✔
149
        websocket_error_and_close_handler(was_clean, error, ec.message());
96✔
150
    }
84✔
151
    void websocket_protocol_error_handler(std::error_code ec) override
152
    {
×
153
        constexpr bool was_clean = false;
×
154
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
155
    }
156
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
92✔
157
    {
774✔
158
        constexpr bool was_clean = true;
728✔
159

432✔
160
        return websocket_error_and_close_handler(was_clean, code, message);
774✔
161
    }
682✔
162
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
630✔
163
    {
5,118✔
164
        if (!was_clean) {
5,014✔
165
            m_observer->websocket_error_handler();
4,248✔
166
        }
4,352✔
167
        return m_observer->websocket_closed_handler(was_clean, code, reason);
5,118✔
168
    }
4,488✔
169
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
82,548✔
170
    {
650,214✔
171
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
650,214✔
172
    }
567,666✔
173

174
    static Status get_status_from_util_error(std::error_code ec)
103,904✔
175
    {
821,458✔
176
        if (!ec) {
820,054✔
177
            return Status::OK();
810,556✔
178
        }
709,460✔
179
        switch (ec.value()) {
10,900✔
180
            case util::error::operation_aborted:
10,904✔
181
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
9,502✔
182
            case util::error::address_family_not_supported:
✔
183
                [[fallthrough]];
✔
184
            case util::error::invalid_argument:
✔
185
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
✔
186
            case util::error::no_memory:
✔
187
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
✔
188
            case util::error::connection_aborted:
✔
189
                [[fallthrough]];
✔
190
            case util::error::connection_reset:
✔
191
                [[fallthrough]];
✔
192
            case util::error::broken_pipe:
✔
193
                [[fallthrough]];
✔
194
            case util::error::resource_unavailable_try_again:
✔
195
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
✔
196
            default:
✔
197
                return {ErrorCodes::Error::UnknownError, ec.message()};
1,404✔
198
        }
10,902✔
199
    }
9,498✔
200

201
    void initiate_resolve();
202
    void handle_resolve(std::error_code, network::Endpoint::List);
203
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
204
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
205
    void initiate_http_tunnel();
206
    void initiate_websocket_or_ssl_handshake();
207
    void initiate_ssl_handshake();
208
    void handle_ssl_handshake(std::error_code);
209
    void initiate_websocket_handshake();
210
    void handle_connection_established();
211

212
    void schedule_urgent_ping();
213
    void initiate_ping_delay(milliseconds_type now);
214
    void handle_ping_delay();
215
    void initiate_pong_timeout();
216
    void handle_pong_timeout();
217

218
    const std::shared_ptr<util::Logger> m_logger_ptr;
219
    util::Logger& m_network_logger;
220
    std::mt19937_64& m_random;
221
    network::Service& m_service;
222
    const std::string m_user_agent;
223
    std::string m_app_services_coid;
224

225
    std::unique_ptr<WebSocketObserver> m_observer;
226

227
    const WebSocketEndpoint m_endpoint;
228
    util::Optional<network::Resolver> m_resolver;
229
    util::Optional<network::Socket> m_socket;
230
    util::Optional<network::ssl::Context> m_ssl_context;
231
    util::Optional<network::ssl::Stream> m_ssl_stream;
232
    network::ReadAheadBuffer m_read_ahead_buffer;
233
    websocket::Socket m_websocket;
234
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
235
};
236

237
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
184,660✔
238
{
1,456,926✔
239
    REALM_ASSERT(m_socket);
1,456,926✔
240
    if (m_ssl_stream) {
1,272,428✔
241
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
1,244✔
242
    }
185,580✔
243
    else {
1,455,682✔
244
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
1,455,682✔
245
    }
1,455,844✔
246
}
1,272,266✔
247

248

249
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
31,136✔
250
{
256,464✔
251
    REALM_ASSERT(m_socket);
256,464✔
252
    if (m_ssl_stream) {
225,426✔
253
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
784✔
254
    }
31,724✔
255
    else {
255,680✔
256
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
255,680✔
257
    }
255,778✔
258
}
225,328✔
259

260

261
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
107,414✔
262
{
850,316✔
263
    REALM_ASSERT(m_socket);
850,316✔
264
    if (m_ssl_stream) {
742,986✔
265
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
672✔
266
    }
107,918✔
267
    else {
849,644✔
268
        m_socket->async_write(data, size, std::move(handler)); // Throws
849,644✔
269
    }
849,728✔
270
}
742,902✔
271

272

273
void DefaultWebSocketImpl::initiate_resolve()
3,476✔
274
{
28,376✔
275
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
28,376✔
276
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
26,616✔
277

15,752✔
278
    if (m_endpoint.proxy) {
24,900✔
279
        // logger.detail("Using %1 proxy", proxy->type); // Throws
280
    }
1,716✔
281

15,752✔
282
    m_network_logger.detail("Resolving '%1:%2'", address, port); // Throws
26,616✔
283

15,752✔
284
    network::Resolver::Query query(address, util::to_string(port)); // Throws
28,370✔
285
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
26,628✔
286
        // If the operation is aborted, the connection object may have been
14,000✔
287
        // destroyed.
15,754✔
288
        if (ec != util::error::operation_aborted)
28,382✔
289
            handle_resolve(ec, std::move(endpoints)); // Throws
28,374✔
290
    };
28,390✔
291
    m_resolver.emplace(m_service);                                   // Throws
28,376✔
292
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
28,376✔
293
}
24,900✔
294

295

296
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
3,468✔
297
{
28,376✔
298
    if (ec) {
24,912✔
299
        m_network_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port,
74✔
300
                               ec.message()); // Throws
74✔
301
        constexpr bool was_clean = false;
74✔
302
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
74✔
303
                                          ec.message()); // Throws
74✔
304
        return;
74✔
305
    }
1,782✔
306

15,730✔
307
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
28,302✔
308
}
24,838✔
309

310

311
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
3,464✔
312
{
28,302✔
313
    REALM_ASSERT(i < endpoints.size());
26,550✔
314

15,730✔
315
    network::Endpoint ep = *(endpoints.begin() + i);
28,302✔
316
    std::size_t n = endpoints.size();
28,302✔
317
    m_socket.emplace(m_service); // Throws
28,302✔
318
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
26,530✔
319
        // If the operation is aborted, the connection object may have been
13,974✔
320
        // destroyed.
15,726✔
321
        if (ec != util::error::operation_aborted)
28,268✔
322
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
28,208✔
323
    });
28,278✔
324
    m_network_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
28,302✔
325
}
24,838✔
326

327
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
3,454✔
328
{
28,196✔
329
    REALM_ASSERT(i < endpoints.size());
28,196✔
330
    const network::Endpoint& ep = *(endpoints.begin() + i);
28,196✔
331
    if (ec) {
24,742✔
332
        m_network_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
×
333
                               ec.message()); // Throws
×
334
        std::size_t i_2 = i + 1;
×
335
        if (i_2 < endpoints.size()) {
×
336
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
×
337
            return;
×
338
        }
339
        // All endpoints failed
340
        m_network_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address,
×
341
                               m_endpoint.port);
×
342
        constexpr bool was_clean = false;
×
343
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
344
                                          ec.message()); // Throws
×
345
        return;
×
346
    }
1,708✔
347

15,694✔
348
    REALM_ASSERT(m_socket);
28,196✔
349
    network::Endpoint ep_2 = m_socket->local_endpoint();
28,196✔
350
    m_network_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
28,196✔
351
                          ep_2.port()); // Throws
26,450✔
352

13,948✔
353
    // TODO: Handle HTTPS proxies
15,694✔
354
    if (m_endpoint.proxy) {
24,742✔
355
        initiate_http_tunnel(); // Throws
×
356
        return;
×
357
    }
1,708✔
358

15,694✔
359
    initiate_websocket_or_ssl_handshake(); // Throws
28,196✔
360
}
24,742✔
361

362
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
3,454✔
363
{
28,194✔
364
    if (m_endpoint.is_ssl) {
24,764✔
365
        initiate_ssl_handshake(); // Throws
192✔
366
    }
3,598✔
367
    else {
28,002✔
368
        initiate_websocket_handshake(); // Throws
28,002✔
369
    }
28,026✔
370
}
24,740✔
371

372
void DefaultWebSocketImpl::initiate_http_tunnel()
373
{
×
374
    HTTPRequest req;
×
375
    req.method = HTTPMethod::Connect;
×
376
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
377
    // TODO handle proxy authorization
378

379
    m_proxy_client.emplace(*this, m_logger_ptr);
×
380
    auto handler = [this](HTTPResponse response, std::error_code ec) {
×
381
        if (ec && ec != util::error::operation_aborted) {
×
382
            m_network_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
×
383
            constexpr bool was_clean = false;
×
384
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
385
                                              ec.message()); // Throws
×
386
            return;
×
387
        }
388

×
389
        if (response.status != HTTPStatus::Ok) {
×
390
            m_network_logger.error("Proxy server returned response '%1 %2'", response.status,
×
391
                                   response.reason); // Throws
×
392
            constexpr bool was_clean = false;
×
393
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
394
                                              response.reason); // Throws
×
395
            return;
×
396
        }
397

398
        initiate_websocket_or_ssl_handshake(); // Throws
×
399
    };
400

401
    m_proxy_client->async_request(req, std::move(handler)); // Throws
×
402
}
403

404
void DefaultWebSocketImpl::initiate_ssl_handshake()
24✔
405
{
192✔
406
    using namespace network::ssl;
182✔
407

122✔
408
    if (!m_ssl_context) {
192✔
409
        m_ssl_context.emplace(); // Throws
192✔
410
        if (m_endpoint.verify_servers_ssl_certificate) {
188✔
411
            if (m_endpoint.ssl_trust_certificate_path) {
150✔
412
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
80✔
413
            }
80✔
414
            else if (!m_endpoint.ssl_verify_callback) {
74✔
415
                m_ssl_context->use_default_verify(); // Throws
30✔
416
#if REALM_INCLUDE_CERTS
16✔
417
                // On platforms like Windows or Android where OpenSSL is not normally found
16✔
418
                // `use_default_verify()` won't actually be able to load any default certificates.
16✔
419
                // That's why we bundle a set of trusted certificates ourselves.
16✔
420
                m_ssl_context->use_included_certificate_roots(); // Throws
16✔
421
#endif
18✔
422
            }
48✔
423
        }
164✔
424
    }
182✔
425

122✔
426
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
192✔
427
    m_ssl_stream->set_logger(m_logger_ptr.get());
192✔
428
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
192✔
429
    if (m_endpoint.verify_servers_ssl_certificate) {
188✔
430
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
160✔
431
        m_ssl_stream->set_server_port(m_endpoint.port);
160✔
432
        if (!m_endpoint.ssl_trust_certificate_path) {
150✔
433
            if (m_endpoint.ssl_verify_callback) {
76✔
434
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
48✔
435
            }
52✔
436
        }
90✔
437
    }
154✔
438

122✔
439
    auto handler = [this](std::error_code ec) {
182✔
440
        // If the operation is aborted, the connection object may have been
112✔
441
        // destroyed.
122✔
442
        if (ec != util::error::operation_aborted)
192✔
443
            handle_ssl_handshake(ec); // Throws
192✔
444
    };
192✔
445
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
182✔
446

112✔
447
    // FIXME: We also need to perform the SSL shutdown operation somewhere
122✔
448
}
168✔
449

450

451
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
24✔
452
{
192✔
453
    if (ec) {
178✔
454
        REALM_ASSERT(ec != util::error::operation_aborted);
80✔
455
        constexpr bool was_clean = false;
80✔
456
        WebSocketError parsed_error_code;
80✔
457
        if (ec == network::ssl::Errors::tls_handshake_failed) {
80✔
458
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
80✔
459
        }
70✔
460
        else {
×
461
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
462
        }
6✔
463

52✔
464
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
80✔
465
        return;
80✔
466
    }
78✔
467

70✔
468
    initiate_websocket_handshake(); // Throws
112✔
469
}
98✔
470

471

472
void DefaultWebSocketImpl::initiate_websocket_handshake()
3,444✔
473
{
28,114✔
474
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
28,114✔
475
    headers["User-Agent"] = m_user_agent;
26,372✔
476

13,898✔
477
    // Compute the value of the "Host" header.
15,634✔
478
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
26,330✔
479
    auto host = m_endpoint.port == default_port ? m_endpoint.address
15,640✔
480
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
26,372✔
481

13,898✔
482
    // Convert the list of protocols to a string
15,640✔
483
    std::ostringstream protocol_list;
28,114✔
484
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
28,114✔
485
    protocol_list.imbue(std::locale::classic());
28,114✔
486
    if (m_endpoint.protocols.size() > 1)
28,114✔
487
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
28,114✔
488
                  std::ostream_iterator<std::string>(protocol_list, ", "));
28,114✔
489
    protocol_list << m_endpoint.protocols.back();
26,372✔
490

15,640✔
491
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
28,114✔
492
                                          std::move(headers)); // Throws
28,114✔
493
}
24,670✔
494
} // namespace
495

496
///
497
/// DefaultSocketProvider - default socket provider implementation
498
///
499

500
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
501
                                             const std::string user_agent,
502
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
503
                                             AutoStart auto_start)
504
    : m_logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger)}
505
    , m_observer_ptr{observer_ptr}
506
    , m_service{}
507
    , m_random{}
9,626✔
508
    , m_user_agent{user_agent}
9,626✔
509
    , m_mutex{}
9,626✔
510
    , m_state{State::Stopped}
9,626✔
511
    , m_state_cv{}
8,866✔
512
    , m_thread{}
8,866✔
513
{
78,518✔
514
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
68,892✔
515
    util::seed_prng_nondeterministically(m_random); // Throws
68,892✔
516
    if (auto_start) {
78,518✔
517
        start();
73,200✔
518
    }
68,318✔
519
}
78,518✔
520

4,744✔
521
DefaultSocketProvider::~DefaultSocketProvider()
9,626✔
522
{
78,520✔
523
    m_logger_ptr->trace("Default event loop teardown");
68,894✔
524
    // Wait for the thread to stop
33,964✔
525
    stop(true);
78,520✔
526
    // Shutting down - no need to lock mutex before check
43,590✔
527
    REALM_ASSERT(m_state == State::Stopped);
73,638✔
528
}
78,520✔
529

530
void DefaultSocketProvider::start()
4,744✔
531
{
73,638✔
532
    std::unique_lock<std::mutex> lock(m_mutex);
78,520✔
533
    // Has the thread already been started or is running
33,964✔
534
    if (m_state == State::Starting || m_state == State::Running)
68,894✔
535
        return; // early return
4,744✔
536

43,590✔
537
    // If the thread has been previously run, make sure it has been joined first
43,590✔
538
    if (m_state == State::Stopping) {
73,638✔
539
        state_wait_for(lock, State::Stopped);
9,626✔
540
    }
9,626✔
541

38,708✔
542
    m_logger_ptr->trace("Default event loop: start()");
78,520✔
543
    REALM_ASSERT(m_state == State::Stopped);
78,520✔
544

33,964✔
545
    do_state_update(lock, State::Starting);
68,894✔
546
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
68,898✔
547
    // Wait for the thread to start before continuing
33,968✔
548
    state_wait_for(lock, State::Running);
68,898✔
549
}
68,898✔
550

4✔
551
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
4✔
552
{
30✔
553
    {
32✔
554
        std::unique_lock<std::mutex> lk(provider->m_mutex);
32✔
555
        REALM_ASSERT(provider->m_state == State::Stopped);
28✔
556
        provider->do_state_update(lk, State::Starting);
28✔
557
    }
32✔
558

18✔
559
    provider->event_loop();
32✔
560
}
32✔
561

4✔
562
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
563
{
28✔
564
    std::unique_lock<std::mutex> lk(provider->m_mutex);
9,658✔
565
    REALM_ASSERT(provider->m_state == State::Stopped);
9,658✔
566
    provider->m_service.reset();
4,774✔
567
}
9,658✔
568

9,630✔
569
void DefaultSocketProvider::event_loop()
9,630✔
570
{
68,922✔
571
    m_logger_ptr->trace("Default event loop: thread running");
73,668✔
572
    // Calls will_destroy_thread() when destroyed
43,608✔
573
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
73,668✔
574
        m_logger_ptr->trace("Default event loop: thread exiting");
78,552✔
575
        if (m_observer_ptr)
68,926✔
576
            m_observer_ptr->will_destroy_thread();
4✔
577

43,608✔
578
        std::unique_lock<std::mutex> lock(m_mutex);
78,552✔
579
        // Did we get here due to an unhandled exception?
43,608✔
580
        if (m_state != State::Stopping) {
78,552✔
581
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
4,774✔
582
        }
9,658✔
583
        m_state = State::Stopped;
68,922✔
584
        lock.unlock();
73,668✔
585
        m_state_cv.notify_all();
78,552✔
586
    });
78,552✔
587

43,608✔
588
    if (m_observer_ptr)
78,552✔
589
        m_observer_ptr->did_create_thread();
4,746✔
590

38,724✔
591
    {
73,668✔
592
        std::lock_guard<std::mutex> lock(m_mutex);
78,552✔
593
        REALM_ASSERT(m_state == State::Starting);
78,552✔
594
    }
68,922✔
595

33,978✔
596
    // We update the state to Running from inside the event loop so that start() is blocked until
38,724✔
597
    // the event loop is actually ready to receive work.
43,608✔
598
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
73,668✔
599
        if (status == ErrorCodes::OperationAborted) {
78,552✔
600
            return;
4,746✔
601
        }
9,630✔
602

33,982✔
603
        REALM_ASSERT(status.is_ok());
68,926✔
604

43,604✔
605
        std::unique_lock<std::mutex> lock(m_mutex);
68,922✔
606
        // This is a callback from a previous generation
33,978✔
607
        if (m_event_loop_generation != my_generation) {
78,548✔
608
            return;
9,654✔
609
        }
9,654✔
610
        if (m_state == State::Stopping) {
78,520✔
611
            return;
4,746✔
612
        }
4,746✔
613
        m_logger_ptr->trace("Default event loop: service run");
73,640✔
614
        REALM_ASSERT(m_state == State::Starting);
78,524!
615
        do_state_update(lock, State::Running);
78,524✔
616
    });
78,524✔
617

33,978✔
618
    // If there is no event loop observer or handle_error function registered, then just
33,978✔
619
    // allow the exception to bubble to the top so we can get a true stack trace
33,978✔
620
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
68,922!
621
        m_service.run_until_stopped(); // Throws
68,922✔
622
    }
68,922!
623
    else {
×
624
        try {
625
            m_service.run_until_stopped(); // Throws
×
626
        }
×
627
        catch (const std::exception& e) {
×
628
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
×
629
            std::unique_lock<std::mutex> lock(m_mutex);
×
630
            // Service is no longer running, event loop thread is stopping
631
            do_state_update(lock, State::Stopping);
×
632
            lock.unlock();
×
633
            m_logger_ptr->error("Default event loop exception: ", e.what());
9,630✔
634
            // If the error was not handled by the thread loop observer, then rethrow
635
            if (!m_observer_ptr->handle_error(e))
×
636
                throw;
9,630✔
637
        }
9,630✔
638
    }
4,746✔
639
}
73,668✔
640

9,630✔
641
void DefaultSocketProvider::stop(bool wait_for_stop)
9,626✔
642
{
78,548✔
643
    std::unique_lock<std::mutex> lock(m_mutex);
73,666✔
644

38,722✔
645
    // Do nothing if the thread is not started or running or stop has already been called
43,604✔
646
    if (m_state == State::Starting || m_state == State::Running) {
78,548✔
647
        m_logger_ptr->trace("Default event loop: stop()");
73,640✔
648
        do_state_update(lock, State::Stopping);
73,640✔
649
        // Updating state to Stopping will free a start() if it is waiting for the thread to
43,594✔
650
        // start and may cause the thread to exit early before calling service.run()
43,594✔
651
        m_service.stop(); // Unblocks m_service.run()
78,524✔
652
    }
78,524✔
653

43,604✔
654
    // Wait until the thread is stopped (exited) if requested
43,604✔
655
    if (wait_for_stop) {
78,552✔
656
        m_logger_ptr->trace("Default event loop: wait for stop");
78,552✔
657
        state_wait_for(lock, State::Stopped);
68,922✔
658
        if (m_thread.joinable()) {
68,922✔
659
            m_thread.join();
68,894✔
660
        }
68,894✔
661
    }
68,922✔
662
}
68,922✔
663

664
//                    +---------------------------------------+
665
//                   \/                                       |
28,882✔
666
// State Machine: Stopped -> Starting -> Running -> Stopping -+
14,234✔
667
//                              |           |          ^
28,882✔
668
//                              +----------------------+
28,882✔
669

28,882✔
670
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
671
{
206,710✔
672
    // m_state_mutex should already be locked...
121,162✔
673
    m_state = new_state;
216,200✔
674
    m_state_cv.notify_all(); // Let any waiters check the state
225,966✔
675
}
206,714✔
676

9,488✔
677
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
38,504✔
678
{
156,792✔
679
    // Check for condition already met or superseded
106,446✔
680
    if (m_state >= expected_state)
157,068✔
681
        return;
19,280✔
682

87,180✔
683
    m_state_cv.wait(lock, [this, expected_state]() {
294,828✔
684
        // are we there yet?
135,856✔
685
        if (m_state < expected_state)
275,576✔
686
            return false;
137,788✔
687
        return true;
141,262✔
688
    });
141,262✔
689
}
141,262✔
690

3,474✔
691
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
692
                                                                   WebSocketEndpoint&& endpoint)
693
{
24,900✔
694
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
24,900✔
695
                                                  std::move(observer), std::move(endpoint));
24,900✔
696
}
24,900✔
697

698
} // namespace realm::sync::websocket
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc