• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1856

22 Nov 2023 04:32PM UTC coverage: 91.666% (-0.02%) from 91.689%
1856

push

Evergreen

web-flow
Fix duplication of recoverable list changes after unrecoverable changes (#7155)

After we copy a list in client reset recovery we need to not apply any changes
to that list in subsequent transactions as we copy directly to the final state
of the list and applying changes would just duplicate those changes.

This is only applicable to flexible sync when there's a subscription change in
between the write with unrecoverable changes and the write with recoverable
changes.

92274 of 169124 branches covered (0.0%)

77 of 77 new or added lines in 3 files covered. (100.0%)

120 existing lines in 16 files now uncovered.

231289 of 252317 relevant lines covered (91.67%)

6368856.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.58
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
23
        : m_logger_ptr{logger_ptr}
24
        , m_logger{*m_logger_ptr}
25
        , m_random{random}
26
        , m_service{service}
27
        , m_user_agent{user_agent}
28
        , m_observer{std::move(observer)}
29
        , m_endpoint{std::move(endpoint)}
30
        , m_websocket(*this)
31
    {
3,332✔
32
        initiate_resolve();
3,332✔
33
    }
3,332✔
34

35
    virtual ~DefaultWebSocketImpl() = default;
3,328✔
36

37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
38
    {
92,486✔
39
        m_websocket.async_write_binary(data.data(), data.size(),
92,486✔
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
92,370✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
92,368✔
42
                                       });
92,368✔
43
    }
92,486✔
44

45
    std::string_view get_appservices_request_id() const noexcept override
46
    {
3,244✔
47
        return m_app_services_coid;
3,244✔
48
    }
3,244✔
49

50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
51
    {
12✔
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
12✔
53
    }
12✔
54

55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
59

60
private:
61
    using milliseconds_type = std::int_fast64_t;
62

63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
64
    {
3,332✔
65
        return m_logger_ptr;
3,332✔
66
    }
3,332✔
67
    std::mt19937_64& websocket_get_random() noexcept override
68
    {
95,790✔
69
        return m_random;
95,790✔
70
    }
95,790✔
71

72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
73
    {
3,248✔
74
        const std::string empty;
3,248✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
3,248✔
76
            m_app_services_coid = it->second;
1,350✔
77
        }
1,350✔
78
        auto it = headers.find("Sec-WebSocket-Protocol");
3,248✔
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
3,248✔
80
    }
3,248✔
81
    void websocket_read_error_handler(std::error_code ec) override
82
    {
560✔
83
        m_logger.error("Reading failed: %1", ec.message()); // Throws
560✔
84
        constexpr bool was_clean = false;
560✔
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
560✔
86
    }
560✔
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
×
89
        m_logger.error("Writing failed: %1", ec.message()); // Throws
×
90
        constexpr bool was_clean = false;
×
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
×
92
    }
×
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*,
94
                                           const std::string_view* body) override
95
    {
12✔
96
        WebSocketError error = WebSocketError::websocket_ok;
12✔
97
        bool was_clean = true;
12✔
98

6✔
99
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
12✔
100
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
12✔
101
            error = WebSocketError::websocket_moved_permanently;
12✔
102
        }
12✔
103
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
×
104
            error = WebSocketError::websocket_retry_error;
×
105
            was_clean = false;
×
106
        }
×
107
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
×
108
            error = WebSocketError::websocket_unauthorized;
×
109
        }
×
110
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
111
            error = WebSocketError::websocket_forbidden;
×
112
        }
×
113
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
×
114
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
×
115
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
×
116
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
×
117
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
×
118
            error = WebSocketError::websocket_internal_server_error;
×
119
            was_clean = false;
×
120
        }
×
121
        else {
×
122
            error = WebSocketError::websocket_fatal_error;
×
123
            was_clean = false;
×
124
            if (body) {
×
125
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
×
126
                auto i = body->find(identifier);
×
127
                if (i != std::string_view::npos) {
×
128
                    std::string_view rest = body->substr(i + identifier.size());
×
129
                    // FIXME: Use std::string_view::begins_with() in C++20.
130
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
×
131
                        return (string.size() >= prefix.size() &&
×
132
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
133
                    };
×
134
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
×
135
                        error = WebSocketError::websocket_client_too_old;
×
136
                    }
×
137
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
×
138
                        error = WebSocketError::websocket_client_too_new;
×
139
                    }
×
140
                    else {
×
141
                        // Other more complicated forms of mismatch
142
                        error = WebSocketError::websocket_protocol_mismatch;
×
143
                    }
×
144
                    was_clean = true;
×
145
                }
×
146
            }
×
147
        }
×
148

6✔
149
        websocket_error_and_close_handler(was_clean, error, ec.message());
12✔
150
    }
12✔
151
    void websocket_protocol_error_handler(std::error_code ec) override
152
    {
×
153
        constexpr bool was_clean = false;
×
154
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
155
    }
×
156
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
157
    {
108✔
158
        constexpr bool was_clean = true;
108✔
159

54✔
160
        return websocket_error_and_close_handler(was_clean, code, message);
108✔
161
    }
108✔
162
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
163
    {
694✔
164
        if (!was_clean) {
694✔
165
            m_observer->websocket_error_handler();
574✔
166
        }
574✔
167
        return m_observer->websocket_closed_handler(was_clean, code, reason);
694✔
168
    }
694✔
169
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
170
    {
73,684✔
171
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
73,684✔
172
    }
73,684✔
173

174
    static Status get_status_from_util_error(std::error_code ec)
175
    {
92,372✔
176
        if (!ec) {
92,372✔
177
            return Status::OK();
90,946✔
178
        }
90,946✔
179
        switch (ec.value()) {
1,426✔
180
            case util::error::operation_aborted:
1,422✔
181
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
1,422✔
182
            case util::error::address_family_not_supported:
✔
183
                [[fallthrough]];
×
184
            case util::error::invalid_argument:
✔
185
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
×
186
            case util::error::no_memory:
✔
187
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
×
188
            case util::error::connection_aborted:
✔
189
                [[fallthrough]];
×
190
            case util::error::connection_reset:
✔
191
                [[fallthrough]];
×
192
            case util::error::broken_pipe:
✔
193
                [[fallthrough]];
×
194
            case util::error::resource_unavailable_try_again:
✔
195
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
×
196
            default:
✔
197
                return {ErrorCodes::Error::UnknownError, ec.message()};
×
198
        }
1,426✔
199
    }
1,426✔
200

201
    void initiate_resolve();
202
    void handle_resolve(std::error_code, network::Endpoint::List);
203
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
204
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
205
    void initiate_http_tunnel();
206
    void initiate_websocket_or_ssl_handshake();
207
    void initiate_ssl_handshake();
208
    void handle_ssl_handshake(std::error_code);
209
    void initiate_websocket_handshake();
210
    void handle_connection_established();
211

212
    void schedule_urgent_ping();
213
    void initiate_ping_delay(milliseconds_type now);
214
    void handle_ping_delay();
215
    void initiate_pong_timeout();
216
    void handle_pong_timeout();
217

218
    const std::shared_ptr<util::Logger> m_logger_ptr;
219
    util::Logger& m_logger;
220
    std::mt19937_64& m_random;
221
    network::Service& m_service;
222
    const std::string m_user_agent;
223
    std::string m_app_services_coid;
224

225
    std::unique_ptr<WebSocketObserver> m_observer;
226

227
    const WebSocketEndpoint m_endpoint;
228
    util::Optional<network::Resolver> m_resolver;
229
    util::Optional<network::Socket> m_socket;
230
    util::Optional<network::ssl::Context> m_ssl_context;
231
    util::Optional<network::ssl::Stream> m_ssl_stream;
232
    network::ReadAheadBuffer m_read_ahead_buffer;
233
    websocket::Socket m_websocket;
234
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
235
};
236

237
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
238
{
166,248✔
239
    REALM_ASSERT(m_socket);
166,248✔
240
    if (m_ssl_stream) {
166,248✔
241
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
154✔
242
    }
154✔
243
    else {
166,094✔
244
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
166,094✔
245
    }
166,094✔
246
}
166,248✔
247

248

249
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
250
{
29,674✔
251
    REALM_ASSERT(m_socket);
29,674✔
252
    if (m_ssl_stream) {
29,674✔
253
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
98✔
254
    }
98✔
255
    else {
29,576✔
256
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
29,576✔
257
    }
29,576✔
258
}
29,674✔
259

260

261
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
262
{
95,790✔
263
    REALM_ASSERT(m_socket);
95,790✔
264
    if (m_ssl_stream) {
95,790✔
265
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
84✔
266
    }
84✔
267
    else {
95,706✔
268
        m_socket->async_write(data, size, std::move(handler)); // Throws
95,706✔
269
    }
95,706✔
270
}
95,790✔
271

272

273
void DefaultWebSocketImpl::initiate_resolve()
274
{
3,330✔
275
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
3,330✔
276
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
3,330✔
277

1,658✔
278
    if (m_endpoint.proxy) {
3,330✔
279
        // logger.detail("Using %1 proxy", proxy->type); // Throws
280
    }
×
281

1,658✔
282
    m_logger.detail("Resolving '%1:%2'", address, port); // Throws
3,330✔
283

1,658✔
284
    network::Resolver::Query query(address, util::to_string(port)); // Throws
3,330✔
285
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
3,328✔
286
        // If the operation is aborted, the connection object may have been
1,656✔
287
        // destroyed.
1,656✔
288
        if (ec != util::error::operation_aborted)
3,326✔
289
            handle_resolve(ec, std::move(endpoints)); // Throws
3,324✔
290
    };
3,326✔
291
    m_resolver.emplace(m_service);                                   // Throws
3,330✔
292
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
3,330✔
293
}
3,330✔
294

295

296
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
297
{
3,324✔
298
    if (ec) {
3,324✔
299
        m_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port, ec.message()); // Throws
4✔
300
        constexpr bool was_clean = false;
4✔
301
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
4✔
302
                                          ec.message()); // Throws
4✔
303
        return;
4✔
304
    }
4✔
305

1,652✔
306
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
3,320✔
307
}
3,320✔
308

309

310
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
311
{
3,320✔
312
    REALM_ASSERT(i < endpoints.size());
3,320✔
313

1,652✔
314
    network::Endpoint ep = *(endpoints.begin() + i);
3,320✔
315
    std::size_t n = endpoints.size();
3,320✔
316
    m_socket.emplace(m_service); // Throws
3,320✔
317
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
3,320✔
318
        // If the operation is aborted, the connection object may have been
1,652✔
319
        // destroyed.
1,652✔
320
        if (ec != util::error::operation_aborted)
3,320✔
321
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
3,316✔
322
    });
3,320✔
323
    m_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
3,320✔
324
}
3,320✔
325

326

327
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
328
{
3,316✔
329
    REALM_ASSERT(i < endpoints.size());
3,316✔
330
    const network::Endpoint& ep = *(endpoints.begin() + i);
3,316✔
331
    if (ec) {
3,316✔
UNCOV
332
        m_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
×
UNCOV
333
                       ec.message()); // Throws
×
UNCOV
334
        std::size_t i_2 = i + 1;
×
UNCOV
335
        if (i_2 < endpoints.size()) {
×
UNCOV
336
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
×
UNCOV
337
            return;
×
UNCOV
338
        }
×
339
        // All endpoints failed
UNCOV
340
        m_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address, m_endpoint.port);
×
UNCOV
341
        constexpr bool was_clean = false;
×
UNCOV
342
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
UNCOV
343
                                          ec.message()); // Throws
×
UNCOV
344
        return;
×
UNCOV
345
    }
×
346

1,648✔
347
    REALM_ASSERT(m_socket);
3,316✔
348
    network::Endpoint ep_2 = m_socket->local_endpoint();
3,316✔
349
    m_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
3,316✔
350
                  ep_2.port()); // Throws
3,316✔
351

1,648✔
352
    // TODO: Handle HTTPS proxies
1,648✔
353
    if (m_endpoint.proxy) {
3,316✔
354
        initiate_http_tunnel(); // Throws
×
355
        return;
×
356
    }
×
357

1,648✔
358
    initiate_websocket_or_ssl_handshake(); // Throws
3,316✔
359
}
3,316✔
360

361
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
362
{
3,316✔
363
    if (m_endpoint.is_ssl) {
3,316✔
364
        initiate_ssl_handshake(); // Throws
24✔
365
    }
24✔
366
    else {
3,292✔
367
        initiate_websocket_handshake(); // Throws
3,292✔
368
    }
3,292✔
369
}
3,316✔
370

371
void DefaultWebSocketImpl::initiate_http_tunnel()
372
{
×
373
    HTTPRequest req;
×
374
    req.method = HTTPMethod::Connect;
×
375
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
×
376
    // TODO handle proxy authorization
377

378
    m_proxy_client.emplace(*this, m_logger_ptr);
×
379
    auto handler = [this](HTTPResponse response, std::error_code ec) {
×
380
        if (ec && ec != util::error::operation_aborted) {
×
381
            m_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
×
382
            constexpr bool was_clean = false;
×
383
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
384
                                              ec.message()); // Throws
×
385
            return;
×
386
        }
×
387

388
        if (response.status != HTTPStatus::Ok) {
×
389
            m_logger.error("Proxy server returned response '%1 %2'", response.status, response.reason); // Throws
×
390
            constexpr bool was_clean = false;
×
391
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
392
                                              response.reason); // Throws
×
393
            return;
×
394
        }
×
395

396
        initiate_websocket_or_ssl_handshake(); // Throws
×
397
    };
×
398

399
    m_proxy_client->async_request(req, std::move(handler)); // Throws
×
400
}
×
401

402
void DefaultWebSocketImpl::initiate_ssl_handshake()
403
{
24✔
404
    using namespace network::ssl;
24✔
405

14✔
406
    if (!m_ssl_context) {
24✔
407
        m_ssl_context.emplace(); // Throws
24✔
408
        if (m_endpoint.verify_servers_ssl_certificate) {
24✔
409
            if (m_endpoint.ssl_trust_certificate_path) {
20✔
410
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
10✔
411
            }
10✔
412
            else if (!m_endpoint.ssl_verify_callback) {
10✔
413
                m_ssl_context->use_default_verify(); // Throws
4✔
414
#if REALM_INCLUDE_CERTS
2✔
415
                // On platforms like Windows or Android where OpenSSL is not normally found
2✔
416
                // `use_default_verify()` won't actually be able to load any default certificates.
2✔
417
                // That's why we bundle a set of trusted certificates ourselves.
2✔
418
                m_ssl_context->use_included_certificate_roots(); // Throws
2✔
419
#endif
2✔
420
            }
4✔
421
        }
20✔
422
    }
24✔
423

14✔
424
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
24✔
425
    m_ssl_stream->set_logger(m_logger_ptr.get());
24✔
426
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
24✔
427
    if (m_endpoint.verify_servers_ssl_certificate) {
24✔
428
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
20✔
429
        m_ssl_stream->set_server_port(m_endpoint.port);
20✔
430
        if (!m_endpoint.ssl_trust_certificate_path) {
20✔
431
            if (m_endpoint.ssl_verify_callback) {
10✔
432
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
6✔
433
            }
6✔
434
        }
10✔
435
    }
20✔
436

14✔
437
    auto handler = [this](std::error_code ec) {
24✔
438
        // If the operation is aborted, the connection object may have been
14✔
439
        // destroyed.
14✔
440
        if (ec != util::error::operation_aborted)
24✔
441
            handle_ssl_handshake(ec); // Throws
24✔
442
    };
24✔
443
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
444

14✔
445
    // FIXME: We also need to perform the SSL shutdown operation somewhere
14✔
446
}
24✔
447

448

449
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
450
{
24✔
451
    if (ec) {
24✔
452
        REALM_ASSERT(ec != util::error::operation_aborted);
10✔
453
        constexpr bool was_clean = false;
10✔
454
        WebSocketError parsed_error_code;
10✔
455
        if (ec == network::ssl::Errors::tls_handshake_failed) {
10✔
456
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
10✔
457
        }
10✔
458
        else {
×
459
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
460
        }
×
461

6✔
462
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
10✔
463
        return;
10✔
464
    }
10✔
465

8✔
466
    initiate_websocket_handshake(); // Throws
14✔
467
}
14✔
468

469

470
void DefaultWebSocketImpl::initiate_websocket_handshake()
471
{
3,306✔
472
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
3,306✔
473
    headers["User-Agent"] = m_user_agent;
3,306✔
474

1,642✔
475
    // Compute the value of the "Host" header.
1,642✔
476
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
3,300✔
477
    auto host = m_endpoint.port == default_port ? m_endpoint.address
1,642✔
478
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
3,306✔
479

1,642✔
480
    // Convert the list of protocols to a string
1,642✔
481
    std::ostringstream protocol_list;
3,306✔
482
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
3,306✔
483
    protocol_list.imbue(std::locale::classic());
3,306✔
484
    if (m_endpoint.protocols.size() > 1)
3,306✔
485
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
3,306✔
486
                  std::ostream_iterator<std::string>(protocol_list, ", "));
3,306✔
487
    protocol_list << m_endpoint.protocols.back();
3,306✔
488

1,642✔
489
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
3,306✔
490
                                          std::move(headers)); // Throws
3,306✔
491
}
3,306✔
492
} // namespace
493

494
///
495
/// DefaultSocketProvider - default socket provider implementation
496
///
497

498
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
499
                                             const std::string user_agent,
500
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
501
                                             AutoStart auto_start)
502
    : m_logger_ptr{logger}
503
    , m_observer_ptr{observer_ptr}
504
    , m_service{}
505
    , m_random{}
506
    , m_user_agent{user_agent}
507
    , m_mutex{}
508
    , m_state{State::Stopped}
509
    , m_state_cv{}
510
    , m_thread{}
511
{
8,982✔
512
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
8,982✔
513
    util::seed_prng_nondeterministically(m_random); // Throws
8,982✔
514
    if (auto_start) {
8,982✔
515
        start();
8,230✔
516
    }
8,230✔
517
}
8,982✔
518

519
DefaultSocketProvider::~DefaultSocketProvider()
520
{
8,982✔
521
    m_logger_ptr->trace("Default event loop teardown");
8,982✔
522
    // Wait for the thread to stop
4,422✔
523
    stop(true);
8,982✔
524
    // Shutting down - no need to lock mutex before check
4,422✔
525
    REALM_ASSERT(m_state == State::Stopped);
8,982✔
526
}
8,982✔
527

528
void DefaultSocketProvider::start()
529
{
8,982✔
530
    std::unique_lock<std::mutex> lock(m_mutex);
8,982✔
531
    // Has the thread already been started or is running
4,422✔
532
    if (m_state == State::Starting || m_state == State::Running)
8,982✔
533
        return; // early return
×
534

4,422✔
535
    // If the thread has been previously run, make sure it has been joined first
4,422✔
536
    if (m_state == State::Stopping) {
8,982✔
537
        state_wait_for(lock, State::Stopped);
×
538
    }
×
539

4,422✔
540
    m_logger_ptr->trace("Default event loop: start()");
8,982✔
541
    REALM_ASSERT(m_state == State::Stopped);
8,982✔
542

4,422✔
543
    do_state_update(lock, State::Starting);
8,982✔
544
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
8,982✔
545
    // Wait for the thread to start before continuing
4,422✔
546
    state_wait_for(lock, State::Running);
8,982✔
547
}
8,982✔
548

549
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
550
{
4✔
551
    {
4✔
552
        std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
553
        REALM_ASSERT(provider->m_state == State::Stopped);
4✔
554
        provider->do_state_update(lk, State::Starting);
4✔
555
    }
4✔
556

2✔
557
    provider->event_loop();
4✔
558
}
4✔
559

560
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
561
{
4✔
562
    std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
563
    REALM_ASSERT(provider->m_state == State::Stopped);
4✔
564
    provider->m_service.reset();
4✔
565
}
4✔
566

567
void DefaultSocketProvider::event_loop()
568
{
8,986✔
569
    m_logger_ptr->trace("Default event loop: thread running");
8,986✔
570
    // Calls will_destroy_thread() when destroyed
4,424✔
571
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
8,986✔
572
        m_logger_ptr->trace("Default event loop: thread exiting");
8,986✔
573
        if (m_observer_ptr)
8,986✔
574
            m_observer_ptr->will_destroy_thread();
×
575

4,424✔
576
        std::unique_lock<std::mutex> lock(m_mutex);
8,986✔
577
        // Did we get here due to an unhandled exception?
4,424✔
578
        if (m_state != State::Stopping) {
8,986✔
579
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
4✔
580
        }
4✔
581
        m_state = State::Stopped;
8,986✔
582
        lock.unlock();
8,986✔
583
        m_state_cv.notify_all();
8,986✔
584
    });
8,986✔
585

4,424✔
586
    if (m_observer_ptr)
8,986✔
587
        m_observer_ptr->did_create_thread();
×
588

4,424✔
589
    {
8,986✔
590
        std::lock_guard<std::mutex> lock(m_mutex);
8,986✔
591
        REALM_ASSERT(m_state == State::Starting);
8,986✔
592
    }
8,986✔
593

4,424✔
594
    // We update the state to Running from inside the event loop so that start() is blocked until
4,424✔
595
    // the event loop is actually ready to receive work.
4,424✔
596
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
8,986✔
597
        if (status == ErrorCodes::OperationAborted) {
8,986✔
598
            return;
×
599
        }
×
600

4,424✔
601
        REALM_ASSERT(status.is_ok());
8,986✔
602

4,424✔
603
        std::unique_lock<std::mutex> lock(m_mutex);
8,986✔
604
        // This is a callback from a previous generation
4,424✔
605
        if (m_event_loop_generation != my_generation) {
8,986✔
606
            return;
4✔
607
        }
4✔
608
        if (m_state == State::Stopping) {
8,982✔
609
            return;
×
610
        }
×
611
        m_logger_ptr->trace("Default event loop: service run");
8,982✔
612
        REALM_ASSERT(m_state == State::Starting);
8,982✔
613
        do_state_update(lock, State::Running);
8,982✔
614
    });
8,982✔
615

4,424✔
616
    // If there is no event loop observer or handle_error function registered, then just
4,424✔
617
    // allow the exception to bubble to the top so we can get a true stack trace
4,424✔
618
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
8,986!
619
        m_service.run_until_stopped(); // Throws
8,986✔
620
    }
8,986✔
621
    else {
×
622
        try {
×
623
            m_service.run_until_stopped(); // Throws
×
624
        }
×
625
        catch (const std::exception& e) {
×
626
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
×
627
            std::unique_lock<std::mutex> lock(m_mutex);
×
628
            // Service is no longer running, event loop thread is stopping
629
            do_state_update(lock, State::Stopping);
×
630
            lock.unlock();
×
631
            m_logger_ptr->error("Default event loop exception: ", e.what());
×
632
            // If the error was not handled by the thread loop observer, then rethrow
633
            if (!m_observer_ptr->handle_error(e))
×
634
                throw;
×
635
        }
×
636
    }
×
637
}
8,986✔
638

639
void DefaultSocketProvider::stop(bool wait_for_stop)
640
{
8,986✔
641
    std::unique_lock<std::mutex> lock(m_mutex);
8,986✔
642

4,424✔
643
    // Do nothing if the thread is not started or running or stop has already been called
4,424✔
644
    if (m_state == State::Starting || m_state == State::Running) {
8,986✔
645
        m_logger_ptr->trace("Default event loop: stop()");
8,982✔
646
        do_state_update(lock, State::Stopping);
8,982✔
647
        // Updating state to Stopping will free a start() if it is waiting for the thread to
4,422✔
648
        // start and may cause the thread to exit early before calling service.run()
4,422✔
649
        m_service.stop(); // Unblocks m_service.run()
8,982✔
650
    }
8,982✔
651

4,424✔
652
    // Wait until the thread is stopped (exited) if requested
4,424✔
653
    if (wait_for_stop) {
8,986✔
654
        m_logger_ptr->trace("Default event loop: wait for stop");
8,986✔
655
        state_wait_for(lock, State::Stopped);
8,986✔
656
        if (m_thread.joinable()) {
8,986✔
657
            m_thread.join();
8,982✔
658
        }
8,982✔
659
    }
8,986✔
660
}
8,986✔
661

662
//                    +---------------------------------------+
663
//                   \/                                       |
664
// State Machine: Stopped -> Starting -> Running -> Stopping -+
665
//                              |           |          ^
666
//                              +----------------------+
667

668
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
669
{
26,950✔
670
    // m_state_mutex should already be locked...
13,268✔
671
    m_state = new_state;
26,950✔
672
    m_state_cv.notify_all(); // Let any waiters check the state
26,950✔
673
}
26,950✔
674

675
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
676
{
17,968✔
677
    // Check for condition already met or superseded
8,846✔
678
    if (m_state >= expected_state)
17,968✔
679
        return;
4✔
680

8,844✔
681
    m_state_cv.wait(lock, [this, expected_state]() {
35,928✔
682
        // are we there yet?
17,688✔
683
        if (m_state < expected_state)
35,928✔
684
            return false;
17,964✔
685
        return true;
17,964✔
686
    });
17,964✔
687
}
17,964✔
688

689
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
690
                                                                   WebSocketEndpoint&& endpoint)
691
{
3,332✔
692
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
3,332✔
693
                                                  std::move(observer), std::move(endpoint));
3,332✔
694
}
3,332✔
695

696
} // namespace realm::sync::websocket
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc