• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2213

10 Apr 2024 11:21PM UTC coverage: 91.792% (-0.8%) from 92.623%
2213

push

Evergreen

web-flow
Add missing availability checks for SecCopyErrorMessageString (#7577)

This requires iOS 11.3 and we currently target iOS 11.

94842 of 175770 branches covered (53.96%)

7 of 22 new or added lines in 2 files covered. (31.82%)

1861 existing lines in 82 files now uncovered.

242866 of 264583 relevant lines covered (91.79%)

5593111.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.35
/src/realm/sync/network/default_socket.cpp
1
#include <realm/sync/network/default_socket.hpp>
2

3
#include <realm/sync/binding_callback_thread_observer.hpp>
4
#include <realm/sync/network/network.hpp>
5
#include <realm/sync/network/network_ssl.hpp>
6
#include <realm/sync/network/websocket.hpp>
7
#include <realm/util/basic_system_errors.hpp>
8
#include <realm/util/random.hpp>
9
#include <realm/util/scope_exit.hpp>
10

11
namespace realm::sync::websocket {
12

13
namespace {
14

15
///
16
/// DefaultWebSocketImpl - websocket implementation for the default socket provider
17
///
18
class DefaultWebSocketImpl final : public DefaultWebSocket, public Config {
19
public:
20
    DefaultWebSocketImpl(const std::shared_ptr<util::Logger>& logger_ptr, network::Service& service,
21
                         std::mt19937_64& random, const std::string user_agent,
22
                         std::unique_ptr<WebSocketObserver> observer, WebSocketEndpoint&& endpoint)
23
        : m_logger_ptr{logger_ptr}
24
        , m_network_logger{*m_logger_ptr}
25
        , m_random{random}
26
        , m_service{service}
27
        , m_user_agent{user_agent}
28
        , m_observer{std::move(observer)}
29
        , m_endpoint{std::move(endpoint)}
30
        , m_websocket(*this)
31
    {
3,458✔
32
        initiate_resolve();
3,458✔
33
    }
3,458✔
34

35
    virtual ~DefaultWebSocketImpl() = default;
3,456✔
36

37
    void async_write_binary(util::Span<const char> data, SyncSocketProvider::FunctionHandler&& handler) override
38
    {
96,068✔
39
        m_websocket.async_write_binary(data.data(), data.size(),
96,068✔
40
                                       [write_handler = std::move(handler)](std::error_code ec, size_t) {
95,982✔
41
                                           write_handler(DefaultWebSocketImpl::get_status_from_util_error(ec));
95,978✔
42
                                       });
95,978✔
43
    }
96,068✔
44

45
    std::string_view get_appservices_request_id() const noexcept override
46
    {
3,364✔
47
        return m_app_services_coid;
3,364✔
48
    }
3,364✔
49

50
    void force_handshake_response_for_testing(int status_code, std::string body = "") override
51
    {
12✔
52
        m_websocket.force_handshake_response_for_testing(status_code, body);
12✔
53
    }
12✔
54

55
    // public for HTTPClient CRTP, but not on the EZSocket interface, so de-facto private
56
    void async_read(char*, std::size_t, ReadCompletionHandler) override;
57
    void async_read_until(char*, std::size_t, char, ReadCompletionHandler) override;
58
    void async_write(const char*, std::size_t, WriteCompletionHandler) override;
59

60
private:
61
    using milliseconds_type = std::int_fast64_t;
62

63
    const std::shared_ptr<util::Logger>& websocket_get_logger() noexcept override
64
    {
3,456✔
65
        return m_logger_ptr;
3,456✔
66
    }
3,456✔
67
    std::mt19937_64& websocket_get_random() noexcept override
68
    {
99,492✔
69
        return m_random;
99,492✔
70
    }
99,492✔
71

72
    void websocket_handshake_completion_handler(const HTTPHeaders& headers) override
73
    {
3,368✔
74
        const std::string empty;
3,368✔
75
        if (auto it = headers.find("X-Appservices-Request-Id"); it != headers.end()) {
3,368✔
76
            m_app_services_coid = it->second;
1,490✔
77
        }
1,490✔
78
        auto it = headers.find("Sec-WebSocket-Protocol");
3,368✔
79
        m_observer->websocket_connected_handler(it == headers.end() ? empty : it->second);
3,368✔
80
    }
3,368✔
81
    void websocket_read_error_handler(std::error_code ec) override
82
    {
492✔
83
        m_network_logger.error("Reading failed: %1", ec.message()); // Throws
492✔
84
        constexpr bool was_clean = false;
492✔
85
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_read_error, ec.message());
492✔
86
    }
492✔
87
    void websocket_write_error_handler(std::error_code ec) override
88
    {
×
89
        m_network_logger.error("Writing failed: %1", ec.message()); // Throws
×
90
        constexpr bool was_clean = false;
×
91
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_write_error, ec.message());
×
92
    }
×
93
    void websocket_handshake_error_handler(std::error_code ec, const HTTPHeaders*, std::string_view body) override
94
    {
12✔
95
        WebSocketError error = WebSocketError::websocket_ok;
12✔
96
        bool was_clean = true;
12✔
97

6✔
98
        if (ec == websocket::HttpError::bad_response_301_moved_permanently ||
12✔
99
            ec == websocket::HttpError::bad_response_308_permanent_redirect) {
12✔
100
            error = WebSocketError::websocket_moved_permanently;
12✔
101
        }
12✔
UNCOV
102
        else if (ec == websocket::HttpError::bad_response_3xx_redirection) {
×
103
            error = WebSocketError::websocket_retry_error;
×
104
            was_clean = false;
×
105
        }
×
106
        else if (ec == websocket::HttpError::bad_response_401_unauthorized) {
×
107
            error = WebSocketError::websocket_unauthorized;
×
108
        }
×
109
        else if (ec == websocket::HttpError::bad_response_403_forbidden) {
×
110
            error = WebSocketError::websocket_forbidden;
×
111
        }
×
112
        else if (ec == websocket::HttpError::bad_response_5xx_server_error ||
×
113
                 ec == websocket::HttpError::bad_response_500_internal_server_error ||
×
114
                 ec == websocket::HttpError::bad_response_502_bad_gateway ||
×
115
                 ec == websocket::HttpError::bad_response_503_service_unavailable ||
×
116
                 ec == websocket::HttpError::bad_response_504_gateway_timeout) {
×
117
            error = WebSocketError::websocket_internal_server_error;
×
118
            was_clean = false;
×
119
        }
×
120
        else {
×
121
            error = WebSocketError::websocket_fatal_error;
×
122
            was_clean = false;
×
123
            if (!body.empty()) {
×
124
                std::string_view identifier = "REALM_SYNC_PROTOCOL_MISMATCH";
×
125
                auto i = body.find(identifier);
×
126
                if (i != std::string_view::npos) {
×
127
                    std::string_view rest = body.substr(i + identifier.size());
×
128
                    // FIXME: Use std::string_view::begins_with() in C++20.
UNCOV
129
                    auto begins_with = [](std::string_view string, std::string_view prefix) {
×
130
                        return (string.size() >= prefix.size() &&
×
131
                                std::equal(string.data(), string.data() + prefix.size(), prefix.data()));
×
132
                    };
×
133
                    if (begins_with(rest, ":CLIENT_TOO_OLD")) {
×
134
                        error = WebSocketError::websocket_client_too_old;
×
135
                    }
×
136
                    else if (begins_with(rest, ":CLIENT_TOO_NEW")) {
×
137
                        error = WebSocketError::websocket_client_too_new;
×
138
                    }
×
139
                    else {
×
140
                        // Other more complicated forms of mismatch
UNCOV
141
                        error = WebSocketError::websocket_protocol_mismatch;
×
142
                    }
×
143
                    was_clean = true;
×
144
                }
×
145
            }
×
146
        }
×
147

6✔
148
        websocket_error_and_close_handler(was_clean, error, ec.message());
12✔
149
    }
12✔
150
    void websocket_protocol_error_handler(std::error_code ec) override
UNCOV
151
    {
×
152
        constexpr bool was_clean = false;
×
153
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_protocol_error, ec.message());
×
154
    }
×
155
    bool websocket_close_message_received(WebSocketError code, std::string_view message) override
156
    {
94✔
157
        constexpr bool was_clean = true;
94✔
158

46✔
159
        return websocket_error_and_close_handler(was_clean, code, message);
94✔
160
    }
94✔
161
    bool websocket_error_and_close_handler(bool was_clean, WebSocketError code, std::string_view reason)
162
    {
612✔
163
        if (!was_clean) {
612✔
164
            m_observer->websocket_error_handler();
506✔
165
        }
506✔
166
        return m_observer->websocket_closed_handler(was_clean, code, reason);
612✔
167
    }
612✔
168
    bool websocket_binary_message_received(const char* ptr, std::size_t size) override
169
    {
76,574✔
170
        return m_observer->websocket_binary_message_received(util::Span<const char>(ptr, size));
76,574✔
171
    }
76,574✔
172

173
    static Status get_status_from_util_error(std::error_code ec)
174
    {
95,976✔
175
        if (!ec) {
95,976✔
176
            return Status::OK();
94,540✔
177
        }
94,540✔
178
        switch (ec.value()) {
1,436✔
179
            case util::error::operation_aborted:
1,438✔
180
                return {ErrorCodes::Error::OperationAborted, "Write operation cancelled"};
1,438✔
UNCOV
181
            case util::error::address_family_not_supported:
✔
182
                [[fallthrough]];
×
183
            case util::error::invalid_argument:
✔
184
                return {ErrorCodes::Error::InvalidArgument, ec.message()};
×
185
            case util::error::no_memory:
✔
186
                return {ErrorCodes::Error::OutOfMemory, ec.message()};
×
187
            case util::error::connection_aborted:
✔
188
                [[fallthrough]];
×
189
            case util::error::connection_reset:
✔
190
                [[fallthrough]];
×
191
            case util::error::broken_pipe:
✔
192
                [[fallthrough]];
×
193
            case util::error::resource_unavailable_try_again:
✔
194
                return {ErrorCodes::Error::ConnectionClosed, ec.message()};
×
195
            default:
✔
196
                return {ErrorCodes::Error::UnknownError, ec.message()};
×
197
        }
1,436✔
198
    }
1,436✔
199

200
    void initiate_resolve();
201
    void handle_resolve(std::error_code, network::Endpoint::List);
202
    void initiate_tcp_connect(network::Endpoint::List, std::size_t);
203
    void handle_tcp_connect(std::error_code, network::Endpoint::List, std::size_t);
204
    void initiate_http_tunnel();
205
    void initiate_websocket_or_ssl_handshake();
206
    void initiate_ssl_handshake();
207
    void handle_ssl_handshake(std::error_code);
208
    void initiate_websocket_handshake();
209
    void handle_connection_established();
210

211
    void schedule_urgent_ping();
212
    void initiate_ping_delay(milliseconds_type now);
213
    void handle_ping_delay();
214
    void initiate_pong_timeout();
215
    void handle_pong_timeout();
216

217
    const std::shared_ptr<util::Logger> m_logger_ptr;
218
    util::Logger& m_network_logger;
219
    std::mt19937_64& m_random;
220
    network::Service& m_service;
221
    const std::string m_user_agent;
222
    std::string m_app_services_coid;
223

224
    std::unique_ptr<WebSocketObserver> m_observer;
225

226
    const WebSocketEndpoint m_endpoint;
227
    util::Optional<network::Resolver> m_resolver;
228
    util::Optional<network::Socket> m_socket;
229
    util::Optional<network::ssl::Context> m_ssl_context;
230
    util::Optional<network::ssl::Stream> m_ssl_stream;
231
    network::ReadAheadBuffer m_read_ahead_buffer;
232
    websocket::Socket m_websocket;
233
    util::Optional<HTTPClient<DefaultWebSocketImpl>> m_proxy_client;
234
};
235

236
void DefaultWebSocketImpl::async_read(char* buffer, std::size_t size, ReadCompletionHandler handler)
237
{
172,442✔
238
    REALM_ASSERT(m_socket);
172,442✔
239
    if (m_ssl_stream) {
172,442✔
240
        m_ssl_stream->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
158✔
241
    }
158✔
242
    else {
172,284✔
243
        m_socket->async_read(buffer, size, m_read_ahead_buffer, std::move(handler)); // Throws
172,284✔
244
    }
172,284✔
245
}
172,442✔
246

247

248
void DefaultWebSocketImpl::async_read_until(char* buffer, std::size_t size, char delim, ReadCompletionHandler handler)
249
{
31,076✔
250
    REALM_ASSERT(m_socket);
31,076✔
251
    if (m_ssl_stream) {
31,076✔
252
        m_ssl_stream->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
98✔
253
    }
98✔
254
    else {
30,978✔
255
        m_socket->async_read_until(buffer, size, delim, m_read_ahead_buffer, std::move(handler)); // Throws
30,978✔
256
    }
30,978✔
257
}
31,076✔
258

259

260
void DefaultWebSocketImpl::async_write(const char* data, std::size_t size, WriteCompletionHandler handler)
261
{
99,484✔
262
    REALM_ASSERT(m_socket);
99,484✔
263
    if (m_ssl_stream) {
99,484✔
264
        m_ssl_stream->async_write(data, size, std::move(handler)); // Throws
84✔
265
    }
84✔
266
    else {
99,400✔
267
        m_socket->async_write(data, size, std::move(handler)); // Throws
99,400✔
268
    }
99,400✔
269
}
99,484✔
270

271

272
void DefaultWebSocketImpl::initiate_resolve()
273
{
3,454✔
274
    const std::string& address = m_endpoint.proxy ? m_endpoint.proxy->address : m_endpoint.address;
3,454✔
275
    const port_type& port = m_endpoint.proxy ? m_endpoint.proxy->port : m_endpoint.port;
3,454✔
276

1,740✔
277
    if (m_endpoint.proxy) {
3,454✔
278
        // logger.detail("Using %1 proxy", proxy->type); // Throws
UNCOV
279
    }
×
280

1,740✔
281
    m_network_logger.detail("Resolving '%1:%2'", address, port); // Throws
3,454✔
282

1,740✔
283
    network::Resolver::Query query(address, util::to_string(port)); // Throws
3,454✔
284
    auto handler = [this](std::error_code ec, network::Endpoint::List endpoints) {
3,452✔
285
        // If the operation is aborted, the connection object may have been
1,740✔
286
        // destroyed.
1,740✔
287
        if (ec != util::error::operation_aborted)
3,452✔
288
            handle_resolve(ec, std::move(endpoints)); // Throws
3,452✔
289
    };
3,452✔
290
    m_resolver.emplace(m_service);                                   // Throws
3,454✔
291
    m_resolver->async_resolve(std::move(query), std::move(handler)); // Throws
3,454✔
292
}
3,454✔
293

294

295
void DefaultWebSocketImpl::handle_resolve(std::error_code ec, network::Endpoint::List endpoints)
296
{
3,452✔
297
    if (ec) {
3,452✔
298
        m_network_logger.error("Failed to resolve '%1:%2': %3", m_endpoint.address, m_endpoint.port,
4✔
299
                               ec.message()); // Throws
4✔
300
        constexpr bool was_clean = false;
4✔
301
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_resolve_failed,
4✔
302
                                          ec.message()); // Throws
4✔
303
        return;
4✔
304
    }
4✔
305

1,738✔
306
    initiate_tcp_connect(std::move(endpoints), 0); // Throws
3,448✔
307
}
3,448✔
308

309

310
void DefaultWebSocketImpl::initiate_tcp_connect(network::Endpoint::List endpoints, std::size_t i)
311
{
3,448✔
312
    REALM_ASSERT(i < endpoints.size());
3,448✔
313

1,738✔
314
    network::Endpoint ep = *(endpoints.begin() + i);
3,448✔
315
    std::size_t n = endpoints.size();
3,448✔
316
    m_socket.emplace(m_service); // Throws
3,448✔
317
    m_socket->async_connect(ep, [this, endpoints = std::move(endpoints), i](std::error_code ec) mutable {
3,448✔
318
        // If the operation is aborted, the connection object may have been
1,738✔
319
        // destroyed.
1,738✔
320
        if (ec != util::error::operation_aborted)
3,448✔
321
            handle_tcp_connect(ec, std::move(endpoints), i); // Throws
3,440✔
322
    });
3,448✔
323
    m_network_logger.detail("Connecting to endpoint '%1:%2' (%3/%4)", ep.address(), ep.port(), (i + 1), n); // Throws
3,448✔
324
}
3,448✔
325

326
void DefaultWebSocketImpl::handle_tcp_connect(std::error_code ec, network::Endpoint::List endpoints, std::size_t i)
327
{
3,440✔
328
    REALM_ASSERT(i < endpoints.size());
3,440✔
329
    const network::Endpoint& ep = *(endpoints.begin() + i);
3,440✔
330
    if (ec) {
3,440✔
UNCOV
331
        m_network_logger.error("Failed to connect to endpoint '%1:%2': %3", ep.address(), ep.port(),
×
332
                               ec.message()); // Throws
×
333
        std::size_t i_2 = i + 1;
×
334
        if (i_2 < endpoints.size()) {
×
335
            initiate_tcp_connect(std::move(endpoints), i_2); // Throws
×
336
            return;
×
337
        }
×
338
        // All endpoints failed
UNCOV
339
        m_network_logger.error("Failed to connect to '%1:%2': All endpoints failed", m_endpoint.address,
×
340
                               m_endpoint.port);
×
341
        constexpr bool was_clean = false;
×
342
        websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
343
                                          ec.message()); // Throws
×
344
        return;
×
345
    }
×
346

1,734✔
347
    REALM_ASSERT(m_socket);
3,440✔
348
    network::Endpoint ep_2 = m_socket->local_endpoint();
3,440✔
349
    m_network_logger.info("Connected to endpoint '%1:%2' (from '%3:%4')", ep.address(), ep.port(), ep_2.address(),
3,440✔
350
                          ep_2.port()); // Throws
3,440✔
351

1,734✔
352
    // TODO: Handle HTTPS proxies
1,734✔
353
    if (m_endpoint.proxy) {
3,440✔
UNCOV
354
        initiate_http_tunnel(); // Throws
×
355
        return;
×
356
    }
×
357

1,734✔
358
    initiate_websocket_or_ssl_handshake(); // Throws
3,440✔
359
}
3,440✔
360

361
void DefaultWebSocketImpl::initiate_websocket_or_ssl_handshake()
362
{
3,440✔
363
    if (m_endpoint.is_ssl) {
3,440✔
364
        initiate_ssl_handshake(); // Throws
24✔
365
    }
24✔
366
    else {
3,416✔
367
        initiate_websocket_handshake(); // Throws
3,416✔
368
    }
3,416✔
369
}
3,440✔
370

371
void DefaultWebSocketImpl::initiate_http_tunnel()
UNCOV
372
{
×
373
    HTTPRequest req;
×
374
    req.method = HTTPMethod::Connect;
×
375
    req.headers.emplace("Host", util::format("%1:%2", m_endpoint.address, m_endpoint.port));
×
376
    // TODO handle proxy authorization
377

UNCOV
378
    m_proxy_client.emplace(*this, m_logger_ptr);
×
379
    auto handler = [this](HTTPResponse response, std::error_code ec) {
×
380
        if (ec && ec != util::error::operation_aborted) {
×
381
            m_network_logger.error("Failed to establish HTTP tunnel: %1", ec.message());
×
382
            constexpr bool was_clean = false;
×
383
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
384
                                              ec.message()); // Throws
×
385
            return;
×
386
        }
×
387

UNCOV
388
        if (response.status != HTTPStatus::Ok) {
×
389
            m_network_logger.error("Proxy server returned response '%1 %2'", response.status,
×
390
                                   response.reason); // Throws
×
391
            constexpr bool was_clean = false;
×
392
            websocket_error_and_close_handler(was_clean, WebSocketError::websocket_connection_failed,
×
393
                                              response.reason); // Throws
×
394
            return;
×
395
        }
×
396

UNCOV
397
        initiate_websocket_or_ssl_handshake(); // Throws
×
398
    };
×
399

UNCOV
400
    m_proxy_client->async_request(req, std::move(handler)); // Throws
×
401
}
×
402

403
void DefaultWebSocketImpl::initiate_ssl_handshake()
404
{
24✔
405
    using namespace network::ssl;
24✔
406

14✔
407
    if (!m_ssl_context) {
24✔
408
        m_ssl_context.emplace(); // Throws
24✔
409
        if (m_endpoint.verify_servers_ssl_certificate) {
24✔
410
            if (m_endpoint.ssl_trust_certificate_path) {
20✔
411
                m_ssl_context->use_verify_file(*m_endpoint.ssl_trust_certificate_path); // Throws
10✔
412
            }
10✔
413
            else if (!m_endpoint.ssl_verify_callback) {
10✔
414
                m_ssl_context->use_default_verify(); // Throws
4✔
415
#if REALM_INCLUDE_CERTS
2✔
416
                // On platforms like Windows or Android where OpenSSL is not normally found
2✔
417
                // `use_default_verify()` won't actually be able to load any default certificates.
2✔
418
                // That's why we bundle a set of trusted certificates ourselves.
2✔
419
                m_ssl_context->use_included_certificate_roots(); // Throws
2✔
420
#endif
2✔
421
            }
4✔
422
        }
20✔
423
    }
24✔
424

14✔
425
    m_ssl_stream.emplace(*m_socket, *m_ssl_context, Stream::client); // Throws
24✔
426
    m_ssl_stream->set_logger(m_logger_ptr.get());
24✔
427
    m_ssl_stream->set_host_name(m_endpoint.address); // Throws
24✔
428
    if (m_endpoint.verify_servers_ssl_certificate) {
24✔
429
        m_ssl_stream->set_verify_mode(VerifyMode::peer); // Throws
20✔
430
        m_ssl_stream->set_server_port(m_endpoint.port);
20✔
431
        if (!m_endpoint.ssl_trust_certificate_path) {
20✔
432
            if (m_endpoint.ssl_verify_callback) {
10✔
433
                m_ssl_stream->use_verify_callback(m_endpoint.ssl_verify_callback);
6✔
434
            }
6✔
435
        }
10✔
436
    }
20✔
437

14✔
438
    auto handler = [this](std::error_code ec) {
24✔
439
        // If the operation is aborted, the connection object may have been
14✔
440
        // destroyed.
14✔
441
        if (ec != util::error::operation_aborted)
24✔
442
            handle_ssl_handshake(ec); // Throws
24✔
443
    };
24✔
444
    m_ssl_stream->async_handshake(std::move(handler)); // Throws
24✔
445

14✔
446
    // FIXME: We also need to perform the SSL shutdown operation somewhere
14✔
447
}
24✔
448

449

450
void DefaultWebSocketImpl::handle_ssl_handshake(std::error_code ec)
451
{
24✔
452
    if (ec) {
24✔
453
        REALM_ASSERT(ec != util::error::operation_aborted);
10✔
454
        constexpr bool was_clean = false;
10✔
455
        WebSocketError parsed_error_code;
10✔
456
        if (ec == network::ssl::Errors::tls_handshake_failed) {
10✔
457
            parsed_error_code = WebSocketError::websocket_tls_handshake_failed;
10✔
458
        }
10✔
UNCOV
459
        else {
×
460
            parsed_error_code = WebSocketError::websocket_connection_failed;
×
461
        }
×
462

6✔
463
        websocket_error_and_close_handler(was_clean, parsed_error_code, ec.message()); // Throws
10✔
464
        return;
10✔
465
    }
10✔
466

8✔
467
    initiate_websocket_handshake(); // Throws
14✔
468
}
14✔
469

470

471
void DefaultWebSocketImpl::initiate_websocket_handshake()
472
{
3,430✔
473
    auto headers = HTTPHeaders(m_endpoint.headers.begin(), m_endpoint.headers.end());
3,430✔
474
    headers["User-Agent"] = m_user_agent;
3,430✔
475

1,728✔
476
    // Compute the value of the "Host" header.
1,728✔
477
    const std::uint_fast16_t default_port = (m_endpoint.is_ssl ? 443 : 80);
3,424✔
478
    auto host = m_endpoint.port == default_port ? m_endpoint.address
1,728✔
479
                                                : util::format("%1:%2", m_endpoint.address, m_endpoint.port);
3,430✔
480

1,728✔
481
    // Convert the list of protocols to a string
1,728✔
482
    std::ostringstream protocol_list;
3,430✔
483
    protocol_list.exceptions(std::ios_base::failbit | std::ios_base::badbit);
3,430✔
484
    protocol_list.imbue(std::locale::classic());
3,430✔
485
    if (m_endpoint.protocols.size() > 1)
3,430✔
486
        std::copy(m_endpoint.protocols.begin(), m_endpoint.protocols.end() - 1,
3,430✔
487
                  std::ostream_iterator<std::string>(protocol_list, ", "));
3,430✔
488
    protocol_list << m_endpoint.protocols.back();
3,430✔
489

1,728✔
490
    m_websocket.initiate_client_handshake(m_endpoint.path, std::move(host), protocol_list.str(),
3,430✔
491
                                          std::move(headers)); // Throws
3,430✔
492
}
3,430✔
493
} // namespace
494

495
///
496
/// DefaultSocketProvider - default socket provider implementation
497
///
498

499
DefaultSocketProvider::DefaultSocketProvider(const std::shared_ptr<util::Logger>& logger,
500
                                             const std::string& user_agent,
501
                                             const std::shared_ptr<BindingCallbackThreadObserver>& observer_ptr,
502
                                             AutoStart auto_start)
503
    : m_logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::network, logger)}
504
    , m_observer_ptr{observer_ptr}
505
    , m_user_agent{user_agent}
506
    , m_state{State::Stopped}
507
{
9,626✔
508
    REALM_ASSERT(m_logger_ptr);                     // Make sure the logger is valid
9,626✔
509
    util::seed_prng_nondeterministically(m_random); // Throws
9,626✔
510
    if (auto_start) {
9,626✔
511
        start();
8,866✔
512
    }
8,866✔
513
}
9,626✔
514

515
DefaultSocketProvider::~DefaultSocketProvider()
516
{
9,626✔
517
    m_logger_ptr->trace("Default event loop teardown");
9,626✔
518
    // Wait for the thread to stop
4,744✔
519
    stop(true);
9,626✔
520
    // Shutting down - no need to lock mutex before check
4,744✔
521
    REALM_ASSERT(m_state == State::Stopped);
9,626✔
522
}
9,626✔
523

524
void DefaultSocketProvider::start()
525
{
9,626✔
526
    std::unique_lock<std::mutex> lock(m_mutex);
9,626✔
527
    // Has the thread already been started or is running
4,744✔
528
    if (m_state == State::Starting || m_state == State::Running)
9,626✔
UNCOV
529
        return; // early return
×
530

4,744✔
531
    // If the thread has been previously run, make sure it has been joined first
4,744✔
532
    if (m_state == State::Stopping) {
9,626✔
UNCOV
533
        state_wait_for(lock, State::Stopped);
×
UNCOV
534
    }
×
535

4,744✔
536
    m_logger_ptr->trace("Default event loop: start()");
9,626✔
537
    REALM_ASSERT(m_state == State::Stopped);
9,626✔
538

4,744✔
539
    do_state_update(lock, State::Starting);
9,626✔
540
    m_thread = std::thread{&DefaultSocketProvider::event_loop, this};
9,626✔
541
    // Wait for the thread to start before continuing
4,744✔
542
    state_wait_for(lock, State::Running);
9,626✔
543
}
9,626✔
544

545
void DefaultSocketProvider::OnlyForTesting::run_event_loop_on_current_thread(DefaultSocketProvider* provider)
546
{
4✔
547
    {
4✔
548
        std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
549
        REALM_ASSERT(provider->m_state == State::Stopped);
4✔
550
        provider->do_state_update(lk, State::Starting);
4✔
551
    }
4✔
552

2✔
553
    provider->event_loop();
4✔
554
}
4✔
555

556
void DefaultSocketProvider::OnlyForTesting::prep_event_loop_for_restart(DefaultSocketProvider* provider)
557
{
4✔
558
    std::unique_lock<std::mutex> lk(provider->m_mutex);
4✔
559
    REALM_ASSERT(provider->m_state == State::Stopped);
4✔
560
    provider->m_service.reset();
4✔
561
}
4✔
562

563
void DefaultSocketProvider::event_loop()
564
{
9,630✔
565
    m_logger_ptr->trace("Default event loop: thread running");
9,630✔
566
    // Calls will_destroy_thread() when destroyed
4,746✔
567
    auto will_destroy_thread = util::make_scope_exit([&]() noexcept {
9,630✔
568
        m_logger_ptr->trace("Default event loop: thread exiting");
9,630✔
569
        if (m_observer_ptr)
9,630✔
UNCOV
570
            m_observer_ptr->will_destroy_thread();
×
571

4,746✔
572
        std::unique_lock<std::mutex> lock(m_mutex);
9,630✔
573
        // Did we get here due to an unhandled exception?
4,746✔
574
        if (m_state != State::Stopping) {
9,630✔
575
            m_logger_ptr->error("Default event loop: thread exited unexpectedly");
4✔
576
        }
4✔
577
        m_state = State::Stopped;
9,630✔
578
        lock.unlock();
9,630✔
579
        m_state_cv.notify_all();
9,630✔
580
    });
9,630✔
581

4,746✔
582
    if (m_observer_ptr)
9,630✔
UNCOV
583
        m_observer_ptr->did_create_thread();
×
584

4,746✔
585
    {
9,630✔
586
        std::lock_guard<std::mutex> lock(m_mutex);
9,630✔
587
        REALM_ASSERT(m_state == State::Starting);
9,630✔
588
    }
9,630✔
589

4,746✔
590
    // We update the state to Running from inside the event loop so that start() is blocked until
4,746✔
591
    // the event loop is actually ready to receive work.
4,746✔
592
    m_service.post([this, my_generation = ++m_event_loop_generation](Status status) {
9,630✔
593
        if (status == ErrorCodes::OperationAborted) {
9,630✔
UNCOV
594
            return;
×
UNCOV
595
        }
×
596

4,746✔
597
        REALM_ASSERT(status.is_ok());
9,630✔
598

4,746✔
599
        std::unique_lock<std::mutex> lock(m_mutex);
9,630✔
600
        // This is a callback from a previous generation
4,746✔
601
        if (m_event_loop_generation != my_generation) {
9,630✔
602
            return;
4✔
603
        }
4✔
604
        if (m_state == State::Stopping) {
9,626✔
UNCOV
605
            return;
×
UNCOV
606
        }
×
607
        m_logger_ptr->trace("Default event loop: service run");
9,626✔
608
        REALM_ASSERT(m_state == State::Starting);
9,626✔
609
        do_state_update(lock, State::Running);
9,626✔
610
    });
9,626✔
611

4,746✔
612
    // If there is no event loop observer or handle_error function registered, then just
4,746✔
613
    // allow the exception to bubble to the top so we can get a true stack trace
4,746✔
614
    if (!m_observer_ptr || !m_observer_ptr->has_handle_error()) {
9,630!
615
        m_service.run_until_stopped(); // Throws
9,630✔
616
    }
9,630✔
UNCOV
617
    else {
×
UNCOV
618
        try {
×
UNCOV
619
            m_service.run_until_stopped(); // Throws
×
UNCOV
620
        }
×
UNCOV
621
        catch (const std::exception& e) {
×
UNCOV
622
            REALM_ASSERT(m_observer_ptr); // should not change while event loop is running
×
623
            std::unique_lock<std::mutex> lock(m_mutex);
×
624
            // Service is no longer running, event loop thread is stopping
625
            do_state_update(lock, State::Stopping);
×
626
            lock.unlock();
×
627
            m_logger_ptr->error("Default event loop exception: ", e.what());
×
628
            // If the error was not handled by the thread loop observer, then rethrow
629
            if (!m_observer_ptr->handle_error(e))
×
UNCOV
630
                throw;
×
631
        }
×
632
    }
×
633
}
9,630✔
634

635
void DefaultSocketProvider::stop(bool wait_for_stop)
636
{
9,630✔
637
    std::unique_lock<std::mutex> lock(m_mutex);
9,630✔
638

4,746✔
639
    // Do nothing if the thread is not started or running or stop has already been called
4,746✔
640
    if (m_state == State::Starting || m_state == State::Running) {
9,630✔
641
        m_logger_ptr->trace("Default event loop: stop()");
9,626✔
642
        do_state_update(lock, State::Stopping);
9,626✔
643
        // Updating state to Stopping will free a start() if it is waiting for the thread to
4,744✔
644
        // start and may cause the thread to exit early before calling service.run()
4,744✔
645
        m_service.stop(); // Unblocks m_service.run()
9,626✔
646
    }
9,626✔
647

4,746✔
648
    // Wait until the thread is stopped (exited) if requested
4,746✔
649
    if (wait_for_stop) {
9,630✔
650
        m_logger_ptr->trace("Default event loop: wait for stop");
9,630✔
651
        state_wait_for(lock, State::Stopped);
9,630✔
652
        if (m_thread.joinable()) {
9,630✔
653
            m_thread.join();
9,626✔
654
        }
9,626✔
655
    }
9,630✔
656
}
9,630✔
657

658
//                    +---------------------------------------+
659
//                   \/                                       |
660
// State Machine: Stopped -> Starting -> Running -> Stopping -+
661
//                              |           |          ^
662
//                              +----------------------+
663

664
void DefaultSocketProvider::do_state_update(std::unique_lock<std::mutex>&, State new_state)
665
{
28,882✔
666
    // m_state_mutex should already be locked...
14,234✔
667
    m_state = new_state;
28,882✔
668
    m_state_cv.notify_all(); // Let any waiters check the state
28,882✔
669
}
28,882✔
670

671
void DefaultSocketProvider::state_wait_for(std::unique_lock<std::mutex>& lock, State expected_state)
672
{
19,256✔
673
    // Check for condition already met or superseded
9,490✔
674
    if (m_state >= expected_state)
19,256✔
675
        return;
4✔
676

9,488✔
677
    m_state_cv.wait(lock, [this, expected_state]() {
38,504✔
678
        // are we there yet?
18,976✔
679
        if (m_state < expected_state)
38,504✔
680
            return false;
19,252✔
681
        return true;
19,252✔
682
    });
19,252✔
683
}
19,252✔
684

685
std::unique_ptr<WebSocketInterface> DefaultSocketProvider::connect(std::unique_ptr<WebSocketObserver> observer,
686
                                                                   WebSocketEndpoint&& endpoint)
687
{
3,458✔
688
    return std::make_unique<DefaultWebSocketImpl>(m_logger_ptr, m_service, m_random, m_user_agent,
3,458✔
689
                                                  std::move(observer), std::move(endpoint));
3,458✔
690
}
3,458✔
691

692
} // namespace realm::sync::websocket
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc