• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1842

16 Nov 2023 09:09AM UTC coverage: 91.7% (+0.02%) from 91.678%
1842

push

Evergreen

web-flow
Disable the network tests (#7145)

* Disable the network tests

* Update to 'debug' log level for network tests

* Added comment about the  tag

92134 of 168858 branches covered (0.0%)

231216 of 252143 relevant lines covered (91.7%)

6673850.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.6
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
24

25
#include <system_error>
26
#include <sstream>
27

28
// NOTE: The protocol specification is in `/doc/protocol.md`
29

30
using namespace realm;
31
using namespace _impl;
32
using namespace realm::util;
33
using namespace realm::sync;
34
using namespace realm::sync::websocket;
35

36
// clang-format off
37
using Connection      = ClientImpl::Connection;
38
using Session         = ClientImpl::Session;
39
using UploadChangeset = ClientHistory::UploadChangeset;
40

41
// These are a work-around for a bug in MSVC. It cannot find in-class types
42
// mentioned in signature of out-of-line member function definitions.
43
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
44
using OutputBuffer                = ClientImpl::OutputBuffer;
45
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
46
// clang-format on
47

48
void ClientImpl::ReconnectInfo::reset() noexcept
49
{
1,568✔
50
    m_backoff_state.reset();
1,568✔
51
    scheduled_reset = false;
1,568✔
52
}
1,568✔
53

54

55
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
56
                                       std::optional<ResumptionDelayInfo> new_delay_info)
57
{
3,258✔
58
    m_backoff_state.update(new_reason, new_delay_info);
3,258✔
59
}
3,258✔
60

61

62
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
63
{
5,106✔
64
    if (scheduled_reset) {
5,106✔
65
        reset();
4✔
66
    }
4✔
67

2,632✔
68
    if (!m_backoff_state.triggering_error) {
5,106✔
69
        return std::chrono::milliseconds::zero();
3,964✔
70
    }
3,964✔
71

618✔
72
    switch (*m_backoff_state.triggering_error) {
1,142✔
73
        case ConnectionTerminationReason::closed_voluntarily:
82✔
74
            return std::chrono::milliseconds::zero();
82✔
75
        case ConnectionTerminationReason::server_said_do_not_reconnect:
20✔
76
            return std::chrono::milliseconds::max();
20✔
77
        default:
1,038✔
78
            if (m_reconnect_mode == ReconnectMode::testing) {
1,038✔
79
                return std::chrono::milliseconds::max();
814✔
80
            }
814✔
81

112✔
82
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
224✔
83
            return m_backoff_state.delay_interval();
224✔
84
    }
1,142✔
85
}
1,142✔
86

87

88
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
89
                                      port_type& port, std::string& path) const
90
{
3,378✔
91
    util::Uri uri(url); // Throws
3,378✔
92
    uri.canonicalize(); // Throws
3,378✔
93
    std::string userinfo, address_2, port_2;
3,378✔
94
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,378✔
95
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,378✔
96
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,378✔
97
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,378✔
98
    if (REALM_UNLIKELY(!good))
3,378✔
99
        return false;
1,514✔
100
    ProtocolEnvelope protocol_2;
3,378✔
101
    port_type port_3;
3,378✔
102
    if (realm_scheme) {
3,378✔
103
        if (uri.get_scheme() == "realm:") {
×
104
            protocol_2 = ProtocolEnvelope::realm;
×
105
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
106
        }
×
107
        else {
×
108
            protocol_2 = ProtocolEnvelope::realms;
×
109
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
110
        }
×
111
    }
×
112
    else {
3,378✔
113
        REALM_ASSERT(ws_scheme);
3,378✔
114
        if (uri.get_scheme() == "ws:") {
3,378✔
115
            protocol_2 = ProtocolEnvelope::ws;
3,374✔
116
            port_3 = 80;
3,374✔
117
        }
3,374✔
118
        else {
4✔
119
            protocol_2 = ProtocolEnvelope::wss;
4✔
120
            port_3 = 443;
4✔
121
        }
4✔
122
    }
3,378✔
123
    if (!port_2.empty()) {
3,378✔
124
        std::istringstream in(port_2);    // Throws
3,378✔
125
        in.imbue(std::locale::classic()); // Throws
3,378✔
126
        in >> port_3;
3,378✔
127
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,378✔
128
            return false;
1,514✔
129
    }
3,378✔
130
    std::string path_2 = uri.get_path(); // Throws (copy)
3,378✔
131

1,514✔
132
    protocol = protocol_2;
3,378✔
133
    address = std::move(address_2);
3,378✔
134
    port = port_3;
3,378✔
135
    path = std::move(path_2);
3,378✔
136
    return true;
3,378✔
137
}
3,378✔
138

139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
142
    , logger{*logger_ptr}
143
    , m_reconnect_mode{config.reconnect_mode}
144
    , m_connect_timeout{config.connect_timeout}
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
151
    , m_dry_run{config.dry_run}
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
156
    , m_socket_provider{std::move(config.socket_provider)}
157
    , m_client_protocol{} // Throws
158
    , m_one_connection_per_session{config.one_connection_per_session}
159
    , m_random{}
160
{
8,978✔
161
    // FIXME: Would be better if seeding was up to the application.
4,420✔
162
    util::seed_prng_nondeterministically(m_random); // Throws
8,978✔
163

4,420✔
164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,978✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,978✔
166
                 get_current_protocol_version()); // Throws
8,978✔
167
    logger.info("Platform: %1", util::get_platform_info());
8,978✔
168
    const char* build_mode;
8,978✔
169
#if REALM_DEBUG
8,978✔
170
    build_mode = "Debug";
8,978✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
8,978✔
175
    logger.debug("Config param: one_connection_per_session = %1",
8,978✔
176
                 config.one_connection_per_session); // Throws
8,978✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
8,978✔
178
                 config.connect_timeout); // Throws
8,978✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
8,978✔
180
                 config.connection_linger_time); // Throws
8,978✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,978✔
182
                 config.ping_keepalive_period); // Throws
8,978✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,978✔
184
                 config.pong_keepalive_timeout); // Throws
8,978✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,978✔
186
                 config.fast_reconnect_limit); // Throws
8,978✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
8,978✔
188
                 config.disable_upload_compaction); // Throws
8,978✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
8,978✔
190
                 config.disable_sync_to_disk); // Throws
8,978✔
191
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,978✔
192
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,978✔
193
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,978✔
194
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
8,978✔
195

4,420✔
196
    if (config.reconnect_mode != ReconnectMode::normal) {
8,978✔
197
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
772✔
198
                    "never do this in production!");
772✔
199
    }
772✔
200

4,420✔
201
    if (config.dry_run) {
8,978✔
202
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
203
                    "never do this in production!");
×
204
    }
×
205

4,420✔
206
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
8,978✔
207

4,420✔
208
    if (m_one_connection_per_session) {
8,978✔
209
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
210
        // multiplexing.
2✔
211
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
212
        //            "never do this in production");
2✔
213
    }
4✔
214

4,420✔
215
    if (config.disable_upload_activation_delay) {
8,978✔
216
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
217
                    "never do this in production");
×
218
    }
×
219

4,420✔
220
    if (config.disable_sync_to_disk) {
8,978✔
221
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
222
                    "never do this in production");
×
223
    }
×
224

4,420✔
225
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,246✔
226
        if (status == ErrorCodes::OperationAborted)
14,246✔
227
            return;
×
228
        else if (!status.is_ok())
14,246✔
229
            throw Exception(status);
×
230
        actualize_and_finalize_session_wrappers(); // Throws
14,246✔
231
    });
14,246✔
232
}
8,978✔
233

234

235
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
236
{
161,904✔
237
    REALM_ASSERT(m_socket_provider);
161,904✔
238
    {
161,904✔
239
        std::lock_guard lock(m_drain_mutex);
161,904✔
240
        ++m_outstanding_posts;
161,904✔
241
        m_drained = false;
161,904✔
242
    }
161,904✔
243
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
161,900✔
244
        auto decr_guard = util::make_scope_exit([&]() noexcept {
161,902✔
245
            std::lock_guard lock(m_drain_mutex);
161,902✔
246
            REALM_ASSERT(m_outstanding_posts);
161,902✔
247
            --m_outstanding_posts;
161,902✔
248
            m_drain_cv.notify_all();
161,902✔
249
        });
161,902✔
250
        handler(status);
161,900✔
251
    });
161,900✔
252
}
161,904✔
253

254

255
void ClientImpl::drain_connections()
256
{
8,978✔
257
    logger.debug("Draining connections during sync client shutdown");
8,978✔
258
    for (auto& server_slot_pair : m_server_slots) {
5,678✔
259
        auto& server_slot = server_slot_pair.second;
2,380✔
260

1,122✔
261
        if (server_slot.connection) {
2,380✔
262
            auto& conn = server_slot.connection;
2,282✔
263
            conn->force_close();
2,282✔
264
        }
2,282✔
265
        else {
98✔
266
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
267
                conn_pair.second->force_close();
6✔
268
            }
6✔
269
        }
98✔
270
    }
2,380✔
271
}
8,978✔
272

273

274
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
275
                                                       SyncSocketProvider::FunctionHandler&& handler)
276
{
16,050✔
277
    REALM_ASSERT(m_socket_provider);
16,050✔
278
    {
16,050✔
279
        std::lock_guard lock(m_drain_mutex);
16,050✔
280
        ++m_outstanding_posts;
16,050✔
281
        m_drained = false;
16,050✔
282
    }
16,050✔
283
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,052✔
284
        handler(status);
16,052✔
285

7,850✔
286
        std::lock_guard lock(m_drain_mutex);
16,052✔
287
        REALM_ASSERT(m_outstanding_posts);
16,052✔
288
        --m_outstanding_posts;
16,052✔
289
        m_drain_cv.notify_all();
16,052✔
290
    });
16,052✔
291
}
16,050✔
292

293

294
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
295
{
11,476✔
296
    REALM_ASSERT(m_socket_provider);
11,476✔
297
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,476✔
298
}
11,476✔
299

300
Connection::~Connection()
301
{
2,498✔
302
    if (m_websocket_sentinel) {
2,498✔
303
        m_websocket_sentinel->destroyed = true;
×
304
        m_websocket_sentinel.reset();
×
305
    }
×
306
}
2,498✔
307

308
void Connection::activate()
309
{
2,494✔
310
    REALM_ASSERT(m_on_idle);
2,494✔
311
    m_activated = true;
2,494✔
312
    if (m_num_active_sessions == 0)
2,494✔
313
        m_on_idle->trigger();
×
314
    // We cannot in general connect immediately, because a prior failure to
1,174✔
315
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,174✔
316
    initiate_reconnect_wait(); // Throws
2,494✔
317
}
2,494✔
318

319

320
void Connection::activate_session(std::unique_ptr<Session> sess)
321
{
9,660✔
322
    REALM_ASSERT(sess);
9,660✔
323
    REALM_ASSERT(&sess->m_conn == this);
9,660✔
324
    REALM_ASSERT(!m_force_closed);
9,660✔
325
    Session& sess_2 = *sess;
9,660✔
326
    session_ident_type ident = sess->m_ident;
9,660✔
327
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,660✔
328
    bool was_inserted = p.second;
9,660✔
329
    REALM_ASSERT(was_inserted);
9,660✔
330
    // Save the session ident to the historical list of session idents
4,652✔
331
    m_session_history.insert(ident);
9,660✔
332
    sess_2.activate(); // Throws
9,660✔
333
    if (m_state == ConnectionState::connected) {
9,660✔
334
        bool fast_reconnect = false;
6,770✔
335
        sess_2.connection_established(fast_reconnect); // Throws
6,770✔
336
    }
6,770✔
337
    ++m_num_active_sessions;
9,660✔
338
}
9,660✔
339

340

341
void Connection::initiate_session_deactivation(Session* sess)
342
{
9,662✔
343
    REALM_ASSERT(sess);
9,662✔
344
    REALM_ASSERT(&sess->m_conn == this);
9,662✔
345
    REALM_ASSERT(m_num_active_sessions);
9,662✔
346
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,654✔
347
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,654✔
348
    // decrementing the num active sessions value.
4,654✔
349
    sess->initiate_deactivation(); // Throws
9,662✔
350
    if (sess->m_state == Session::Deactivated) {
9,662✔
351
        finish_session_deactivation(sess);
936✔
352
    }
936✔
353
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,662✔
354
        if (m_activated && m_state == ConnectionState::disconnected)
4,006✔
355
            m_on_idle->trigger();
326✔
356
    }
4,006✔
357
}
9,662✔
358

359

360
void Connection::cancel_reconnect_delay()
361
{
1,772✔
362
    REALM_ASSERT(m_activated);
1,772✔
363

990✔
364
    if (m_reconnect_delay_in_progress) {
1,772✔
365
        if (m_nonzero_reconnect_delay)
1,560✔
366
            logger.detail("Canceling reconnect delay"); // Throws
784✔
367

884✔
368
        // Cancel the in-progress wait operation by destroying the timer
884✔
369
        // object. Destruction is needed in this case, because a new wait
884✔
370
        // operation might have to be initiated before the previous one
884✔
371
        // completes (its completion handler starts to execute), so the new wait
884✔
372
        // operation must be done on a new timer object.
884✔
373
        m_reconnect_disconnect_timer.reset();
1,560✔
374
        m_reconnect_delay_in_progress = false;
1,560✔
375
        m_reconnect_info.reset();
1,560✔
376
        initiate_reconnect_wait(); // Throws
1,560✔
377
        return;
1,560✔
378
    }
1,560✔
379

106✔
380
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
381
    // that we are allowed to re-connect as quickly as possible.
106✔
382
    //
106✔
383
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
106✔
384
    // backoff/delay state before calculating the next delay, unless a PONG message is received
106✔
385
    // for the urgent PING message we send below.
106✔
386
    //
106✔
387
    // If we get a PONG message for the urgent PING message sent below, then the connection is
106✔
388
    // healthy and we can calculate the next delay normally.
106✔
389
    if (m_state != ConnectionState::disconnected) {
212✔
390
        m_reconnect_info.scheduled_reset = true;
212✔
391
        m_ping_after_scheduled_reset_of_reconnect_info = false;
212✔
392

106✔
393
        schedule_urgent_ping(); // Throws
212✔
394
        return;
212✔
395
    }
212✔
396
    // Nothing to do in this case. The next reconnect attemp will be made as
106✔
397
    // soon as there are any sessions that are both active and unsuspended.
106✔
398
}
212✔
399

400
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
401
{
7,870✔
402
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,870✔
403
    auto ident = sess->m_ident;
7,870✔
404
    m_sessions.erase(ident);
7,870✔
405
    m_session_history.erase(ident);
7,870✔
406
}
7,870✔
407

408
void Connection::force_close()
409
{
2,288✔
410
    if (m_force_closed) {
2,288✔
411
        return;
×
412
    }
×
413

1,080✔
414
    m_force_closed = true;
2,288✔
415

1,080✔
416
    if (m_state != ConnectionState::disconnected) {
2,288✔
417
        voluntary_disconnect();
2,206✔
418
    }
2,206✔
419

1,080✔
420
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,288✔
421
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,288✔
422
        m_reconnect_disconnect_timer.reset();
82✔
423
        m_reconnect_delay_in_progress = false;
82✔
424
        m_disconnect_delay_in_progress = false;
82✔
425
    }
82✔
426

1,080✔
427
    // We must copy any session pointers we want to close to a vector because force_closing
1,080✔
428
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,080✔
429
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,080✔
430
    std::vector<Session*> to_close;
2,288✔
431
    for (auto& session_pair : m_sessions) {
1,162✔
432
        if (session_pair.second->m_state == Session::State::Active) {
156✔
433
            to_close.push_back(session_pair.second.get());
156✔
434
        }
156✔
435
    }
156✔
436

1,080✔
437
    for (auto& sess : to_close) {
1,162✔
438
        sess->force_close();
156✔
439
    }
156✔
440

1,080✔
441
    logger.debug("Force closed idle connection");
2,288✔
442
}
2,288✔
443

444

445
void Connection::websocket_connected_handler(const std::string& protocol)
446
{
3,170✔
447
    if (!protocol.empty()) {
3,170✔
448
        std::string_view expected_prefix =
3,170✔
449
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,886✔
450
        // FIXME: Use std::string_view::begins_with() in C++20.
1,568✔
451
        auto prefix_matches = [&](std::string_view other) {
3,170✔
452
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,170✔
453
        };
3,170✔
454
        if (prefix_matches(expected_prefix)) {
3,170✔
455
            util::MemoryInputStream in;
3,170✔
456
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,170✔
457
            in.imbue(std::locale::classic());
3,170✔
458
            in.unsetf(std::ios_base::skipws);
3,170✔
459
            int value_2 = 0;
3,170✔
460
            in >> value_2;
3,170✔
461
            if (in && in.eof() && value_2 >= 0) {
3,170✔
462
                bool good_version =
3,170✔
463
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,170✔
464
                if (good_version) {
3,170✔
465
                    logger.detail("Negotiated protocol version: %1", value_2);
3,170✔
466
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,568✔
467
                    // will provide the appservices connection ID via a log message.
1,568✔
468
                    // TODO: Remove once the server starts sending the connection ID
1,568✔
469
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,170✔
470
                    m_negotiated_protocol_version = value_2;
3,170✔
471
                    handle_connection_established(); // Throws
3,170✔
472
                    return;
3,170✔
473
                }
3,170✔
474
            }
×
475
        }
3,170✔
476
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
477
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
478
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
479
    }
×
480
    else {
×
481
        close_due_to_client_side_error(
×
482
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
483
            ConnectionTerminationReason::bad_headers_in_http_response);
×
484
    }
×
485
}
3,170✔
486

487

488
bool Connection::websocket_binary_message_received(util::Span<const char> data)
489
{
81,454✔
490
    if (m_force_closed) {
81,454✔
491
        logger.debug("Received binary message after connection was force closed");
×
492
        return false;
×
493
    }
×
494

38,662✔
495
    using sf = SimulatedFailure;
81,454✔
496
    if (sf::check_trigger(sf::sync_client__read_head)) {
81,454✔
497
        close_due_to_client_side_error(
370✔
498
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
370✔
499
            ConnectionTerminationReason::read_or_write_error);
370✔
500
        return bool(m_websocket);
370✔
501
    }
370✔
502

38,436✔
503
    handle_message_received(data);
81,084✔
504
    return bool(m_websocket);
81,084✔
505
}
81,084✔
506

507

508
void Connection::websocket_error_handler()
509
{
494✔
510
    m_websocket_error_received = true;
494✔
511
}
494✔
512

513
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
514
{
614✔
515
    if (m_force_closed) {
614✔
516
        logger.debug("Received websocket close message after connection was force closed");
×
517
        return false;
×
518
    }
×
519
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
614✔
520

318✔
521
    switch (error_code) {
614✔
522
        case WebSocketError::websocket_ok:
68✔
523
            break;
68✔
524
        case WebSocketError::websocket_resolve_failed:
4✔
525
            [[fallthrough]];
4✔
526
        case WebSocketError::websocket_connection_failed: {
6✔
527
            SessionErrorInfo error_info(
6✔
528
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
6✔
529
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
6✔
530
            break;
6✔
531
        }
4✔
532
        case WebSocketError::websocket_read_error:
478✔
533
            [[fallthrough]];
478✔
534
        case WebSocketError::websocket_write_error: {
478✔
535
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
478✔
536
                                         ConnectionTerminationReason::read_or_write_error);
478✔
537
            break;
478✔
538
        }
478✔
539
        case WebSocketError::websocket_going_away:
250✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_protocol_error:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_unsupported_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_invalid_payload_data:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_policy_violation:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_reserved:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_no_status_received:
✔
552
            [[fallthrough]];
×
553
        case WebSocketError::websocket_invalid_extension: {
✔
554
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
555
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
556
            break;
×
557
        }
×
558
        case WebSocketError::websocket_message_too_big: {
4✔
559
            auto message = util::format(
4✔
560
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
561
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
562
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
563
            involuntary_disconnect(std::move(error_info),
4✔
564
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
565
            break;
4✔
566
        }
×
567
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
568
            close_due_to_client_side_error(
10✔
569
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
570
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
571
            break;
10✔
572
        }
×
573
        case WebSocketError::websocket_client_too_old:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_client_too_new:
✔
576
            [[fallthrough]];
×
577
        case WebSocketError::websocket_protocol_mismatch: {
✔
578
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
579
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
580
            break;
×
581
        }
×
582
        case WebSocketError::websocket_fatal_error: {
✔
583
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
584
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
585
            break;
×
586
        }
×
587
        case WebSocketError::websocket_forbidden: {
✔
588
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
589
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
590
            involuntary_disconnect(std::move(error_info),
×
591
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
592
            break;
×
593
        }
×
594
        case WebSocketError::websocket_unauthorized: {
36✔
595
            SessionErrorInfo error_info(
36✔
596
                {ErrorCodes::AuthError,
36✔
597
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
598
                IsFatal{false});
36✔
599
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
600
            involuntary_disconnect(std::move(error_info),
36✔
601
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
602
            break;
36✔
603
        }
×
604
        case WebSocketError::websocket_moved_permanently: {
12✔
605
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
606
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
607
            involuntary_disconnect(std::move(error_info),
12✔
608
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
609
            break;
12✔
610
        }
×
611
        case WebSocketError::websocket_abnormal_closure: {
✔
612
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
613
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
614
            involuntary_disconnect(std::move(error_info),
×
615
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
616
            break;
×
617
        }
×
618
        case WebSocketError::websocket_internal_server_error:
✔
619
            [[fallthrough]];
×
620
        case WebSocketError::websocket_retry_error: {
✔
621
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
622
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
623
            break;
×
624
        }
614✔
625
    }
614✔
626

318✔
627
    return bool(m_websocket);
614✔
628
}
614✔
629

630
// Guarantees that handle_reconnect_wait() is never called from within the
631
// execution of initiate_reconnect_wait() (no callback reentrance).
632
void Connection::initiate_reconnect_wait()
633
{
7,314✔
634
    REALM_ASSERT(m_activated);
7,314✔
635
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,314✔
636
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,314✔
637

3,672✔
638
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,672✔
639
    if (m_force_closed) {
7,314✔
640
        return;
2,204✔
641
    }
2,204✔
642

2,636✔
643
    m_reconnect_delay_in_progress = true;
5,110✔
644
    auto delay = m_reconnect_info.delay_interval();
5,110✔
645
    if (delay == std::chrono::milliseconds::max()) {
5,110✔
646
        logger.detail("Reconnection delayed indefinitely"); // Throws
834✔
647
        // Not actually starting a timer corresponds to an infinite wait
468✔
648
        m_nonzero_reconnect_delay = true;
834✔
649
        return;
834✔
650
    }
834✔
651

2,168✔
652
    if (delay == std::chrono::milliseconds::zero()) {
4,276✔
653
        m_nonzero_reconnect_delay = false;
4,050✔
654
    }
4,050✔
655
    else {
226✔
656
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
226✔
657
        m_nonzero_reconnect_delay = true;
226✔
658
    }
226✔
659

2,168✔
660
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,168✔
661
    // we need it to be cancelable in case the connection is terminated before the timer
2,168✔
662
    // callback is run.
2,168✔
663
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,278✔
664
        // If the operation is aborted, the connection object may have been
2,170✔
665
        // destroyed.
2,170✔
666
        if (status != ErrorCodes::OperationAborted)
4,278✔
667
            handle_reconnect_wait(status); // Throws
3,256✔
668
    });                                    // Throws
4,278✔
669
}
4,276✔
670

671

672
void Connection::handle_reconnect_wait(Status status)
673
{
3,256✔
674
    if (!status.is_ok()) {
3,256✔
675
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
676
        throw Exception(status);
×
677
    }
×
678

1,610✔
679
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,256✔
680
    m_reconnect_delay_in_progress = false;
3,256✔
681

1,610✔
682
    if (m_num_active_unsuspended_sessions > 0)
3,256✔
683
        initiate_reconnect(); // Throws
3,256✔
684
}
3,256✔
685

686
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
687
    explicit WebSocketObserverShim(Connection* conn)
688
        : conn(conn)
689
        , sentinel(conn->m_websocket_sentinel)
690
    {
3,258✔
691
    }
3,258✔
692

693
    Connection* conn;
694
    util::bind_ptr<LifecycleSentinel> sentinel;
695

696
    void websocket_connected_handler(const std::string& protocol) override
697
    {
3,170✔
698
        if (sentinel->destroyed) {
3,170✔
699
            return;
×
700
        }
×
701

1,568✔
702
        return conn->websocket_connected_handler(protocol);
3,170✔
703
    }
3,170✔
704

705
    void websocket_error_handler() override
706
    {
494✔
707
        if (sentinel->destroyed) {
494✔
708
            return;
×
709
        }
×
710

258✔
711
        conn->websocket_error_handler();
494✔
712
    }
494✔
713

714
    bool websocket_binary_message_received(util::Span<const char> data) override
715
    {
81,454✔
716
        if (sentinel->destroyed) {
81,454✔
717
            return false;
×
718
        }
×
719

38,664✔
720
        return conn->websocket_binary_message_received(data);
81,454✔
721
    }
81,454✔
722

723
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
724
    {
614✔
725
        if (sentinel->destroyed) {
614✔
726
            return true;
×
727
        }
×
728

318✔
729
        return conn->websocket_closed_handler(was_clean, error_code, msg);
614✔
730
    }
614✔
731
};
732

733
void Connection::initiate_reconnect()
734
{
3,258✔
735
    REALM_ASSERT(m_activated);
3,258✔
736

1,612✔
737
    m_state = ConnectionState::connecting;
3,258✔
738
    report_connection_state_change(ConnectionState::connecting); // Throws
3,258✔
739
    if (m_websocket_sentinel) {
3,258✔
740
        m_websocket_sentinel->destroyed = true;
×
741
    }
×
742
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,258✔
743
    m_websocket.reset();
3,258✔
744

1,612✔
745
    // Watchdog
1,612✔
746
    initiate_connect_wait(); // Throws
3,258✔
747

1,612✔
748
    std::vector<std::string> sec_websocket_protocol;
3,258✔
749
    {
3,258✔
750
        auto protocol_prefix =
3,258✔
751
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,968✔
752
        int min = get_oldest_supported_protocol_version();
3,258✔
753
        int max = get_current_protocol_version();
3,258✔
754
        REALM_ASSERT_3(min, <=, max);
3,258✔
755
        // List protocol version in descending order to ensure that the server
1,612✔
756
        // selects the highest possible version.
1,612✔
757
        for (int version = max; version >= min; --version) {
32,564✔
758
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
29,306✔
759
        }
29,306✔
760
    }
3,258✔
761

1,612✔
762
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,258✔
763
                m_server_endpoint.port, m_http_request_path_prefix);
3,258✔
764

1,612✔
765
    m_websocket_error_received = false;
3,258✔
766
    m_websocket =
3,258✔
767
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,258✔
768
                                            WebSocketEndpoint{
3,258✔
769
                                                m_server_endpoint.address,
3,258✔
770
                                                m_server_endpoint.port,
3,258✔
771
                                                get_http_request_path(),
3,258✔
772
                                                std::move(sec_websocket_protocol),
3,258✔
773
                                                is_ssl(m_server_endpoint.envelope),
3,258✔
774
                                                /// DEPRECATED - The following will be removed in a future release
1,612✔
775
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,258✔
776
                                                m_verify_servers_ssl_certificate,
3,258✔
777
                                                m_ssl_trust_certificate_path,
3,258✔
778
                                                m_ssl_verify_callback,
3,258✔
779
                                                m_proxy_config,
3,258✔
780
                                            });
3,258✔
781
}
3,258✔
782

783

784
void Connection::initiate_connect_wait()
785
{
3,256✔
786
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,610✔
787
    // fully establish the connection (including SSL and WebSocket
1,610✔
788
    // handshakes). Without such a watchdog, connect operations could take very
1,610✔
789
    // long, or even indefinite time.
1,610✔
790
    milliseconds_type time = m_client.m_connect_timeout;
3,256✔
791

1,610✔
792
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,258✔
793
        // If the operation is aborted, the connection object may have been
1,612✔
794
        // destroyed.
1,612✔
795
        if (status != ErrorCodes::OperationAborted)
3,258✔
796
            handle_connect_wait(status); // Throws
×
797
    });                                  // Throws
3,258✔
798
}
3,256✔
799

800

801
void Connection::handle_connect_wait(Status status)
802
{
×
803
    if (!status.is_ok()) {
×
804
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
805
        throw Exception(status);
×
806
    }
×
807

808
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
809
    logger.info("Connect timeout"); // Throws
×
810
    involuntary_disconnect(
×
811
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
812
                         IsFatal{false}},
×
813
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
814
}
×
815

816

817
void Connection::handle_connection_established()
818
{
3,168✔
819
    // Cancel connect timeout watchdog
1,566✔
820
    m_connect_timer.reset();
3,168✔
821

1,566✔
822
    m_state = ConnectionState::connected;
3,168✔
823

1,566✔
824
    milliseconds_type now = monotonic_clock_now();
3,168✔
825
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,168✔
826
    initiate_ping_delay(now);     // Throws
3,168✔
827

1,566✔
828
    bool fast_reconnect = false;
3,168✔
829
    if (m_disconnect_has_occurred) {
3,168✔
830
        milliseconds_type time = now - m_disconnect_time;
858✔
831
        if (time <= m_client.m_fast_reconnect_limit)
858✔
832
            fast_reconnect = true;
858✔
833
    }
858✔
834

1,566✔
835
    for (auto& p : m_sessions) {
4,126✔
836
        Session& sess = *p.second;
4,126✔
837
        sess.connection_established(fast_reconnect); // Throws
4,126✔
838
    }
4,126✔
839

1,566✔
840
    report_connection_state_change(ConnectionState::connected); // Throws
3,168✔
841
}
3,168✔
842

843

844
void Connection::schedule_urgent_ping()
845
{
212✔
846
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
212✔
847
    if (m_ping_delay_in_progress) {
212✔
848
        m_heartbeat_timer.reset();
194✔
849
        m_ping_delay_in_progress = false;
194✔
850
        m_minimize_next_ping_delay = true;
194✔
851
        milliseconds_type now = monotonic_clock_now();
194✔
852
        initiate_ping_delay(now); // Throws
194✔
853
        return;
194✔
854
    }
194✔
855
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
18!
856
    if (!m_send_ping)
18!
857
        m_minimize_next_ping_delay = true;
18✔
858
}
18✔
859

860

861
void Connection::initiate_ping_delay(milliseconds_type now)
862
{
3,558✔
863
    REALM_ASSERT(!m_ping_delay_in_progress);
3,558✔
864
    REALM_ASSERT(!m_waiting_for_pong);
3,558✔
865
    REALM_ASSERT(!m_send_ping);
3,558✔
866

1,730✔
867
    milliseconds_type delay = 0;
3,558✔
868
    if (!m_minimize_next_ping_delay) {
3,558✔
869
        delay = m_client.m_ping_keepalive_period;
3,362✔
870
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,640✔
871
        // the first PING message to be sent since the connection was
1,640✔
872
        // established. The purpose of this randomized deduction is to reduce
1,640✔
873
        // the risk of many connections sending PING messages simultaneously to
1,640✔
874
        // the server.
1,640✔
875
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,242✔
876
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,362✔
877
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,362✔
878
        delay -= randomized_deduction;
3,362✔
879
        // Deduct the time spent waiting for PONG
1,640✔
880
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,362✔
881
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,362✔
882
        if (spent_time < delay) {
3,362✔
883
            delay -= spent_time;
3,354✔
884
        }
3,354✔
885
        else {
8✔
886
            delay = 0;
8✔
887
        }
8✔
888
    }
3,362✔
889
    else {
196✔
890
        m_minimize_next_ping_delay = false;
196✔
891
    }
196✔
892

1,730✔
893

1,730✔
894
    m_ping_delay_in_progress = true;
3,558✔
895

1,730✔
896
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,558✔
897
        if (status == ErrorCodes::OperationAborted)
3,558✔
898
            return;
3,344✔
899
        else if (!status.is_ok())
214✔
900
            throw Exception(status);
×
901

86✔
902
        handle_ping_delay();                                    // Throws
214✔
903
    });                                                         // Throws
214✔
904
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,558✔
905
}
3,558✔
906

907

908
void Connection::handle_ping_delay()
909
{
214✔
910
    REALM_ASSERT(m_ping_delay_in_progress);
214✔
911
    m_ping_delay_in_progress = false;
214✔
912
    m_send_ping = true;
214✔
913

86✔
914
    initiate_pong_timeout(); // Throws
214✔
915

86✔
916
    if (m_state == ConnectionState::connected && !m_sending)
214✔
917
        send_next_message(); // Throws
190✔
918
}
214✔
919

920

921
void Connection::initiate_pong_timeout()
922
{
214✔
923
    REALM_ASSERT(!m_ping_delay_in_progress);
214✔
924
    REALM_ASSERT(!m_waiting_for_pong);
214✔
925
    REALM_ASSERT(m_send_ping);
214✔
926

86✔
927
    m_waiting_for_pong = true;
214✔
928
    m_pong_wait_started_at = monotonic_clock_now();
214✔
929

86✔
930
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
214✔
931
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
214✔
932
        if (status == ErrorCodes::OperationAborted)
214✔
933
            return;
202✔
934
        else if (!status.is_ok())
12✔
935
            throw Exception(status);
×
936

6✔
937
        handle_pong_timeout(); // Throws
12✔
938
    });                        // Throws
12✔
939
}
214✔
940

941

942
void Connection::handle_pong_timeout()
943
{
12✔
944
    REALM_ASSERT(m_waiting_for_pong);
12✔
945
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
946
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
947
                                 ConnectionTerminationReason::pong_timeout);
12✔
948
}
12✔
949

950

951
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
952
{
101,650✔
953
    // Stop sending messages if an websocket error was received.
47,174✔
954
    if (m_websocket_error_received)
101,650✔
955
        return;
×
956

47,174✔
957
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
101,650✔
958
        if (sentinel->destroyed) {
101,560✔
959
            return;
1,392✔
960
        }
1,392✔
961
        if (!status.is_ok()) {
100,168✔
962
            if (status != ErrorCodes::Error::OperationAborted) {
×
963
                // Write errors will be handled by the websocket_write_error_handler() callback
964
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
965
            }
×
966
            return;
×
967
        }
×
968
        handle_write_message(); // Throws
100,168✔
969
    });                         // Throws
100,168✔
970
    m_sending_session = sess;
101,650✔
971
    m_sending = true;
101,650✔
972
}
101,650✔
973

974

975
void Connection::handle_write_message()
976
{
100,164✔
977
    m_sending_session->message_sent(); // Throws
100,164✔
978
    if (m_sending_session->m_state == Session::Deactivated) {
100,164✔
979
        finish_session_deactivation(m_sending_session);
80✔
980
    }
80✔
981
    m_sending_session = nullptr;
100,164✔
982
    m_sending = false;
100,164✔
983
    send_next_message(); // Throws
100,164✔
984
}
100,164✔
985

986

987
void Connection::send_next_message()
988
{
160,564✔
989
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
160,564✔
990
    REALM_ASSERT(!m_sending_session);
160,564✔
991
    REALM_ASSERT(!m_sending);
160,564✔
992
    if (m_send_ping) {
160,564✔
993
        send_ping(); // Throws
202✔
994
        return;
202✔
995
    }
202✔
996
    while (!m_sessions_enlisted_to_send.empty()) {
221,370✔
997
        // The state of being connected is not supposed to be able to change
75,864✔
998
        // across this loop thanks to the "no callback reentrance" guarantee
75,864✔
999
        // provided by Websocket::async_write_text(), and friends.
75,864✔
1000
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
162,848✔
1001

75,864✔
1002
        Session& sess = *m_sessions_enlisted_to_send.front();
162,848✔
1003
        m_sessions_enlisted_to_send.pop_front();
162,848✔
1004
        sess.send_message(); // Throws
162,848✔
1005

75,864✔
1006
        if (sess.m_state == Session::Deactivated) {
162,848✔
1007
            finish_session_deactivation(&sess);
1,124✔
1008
        }
1,124✔
1009

75,864✔
1010
        // An enlisted session may choose to not send a message. In that case,
75,864✔
1011
        // we should pass the opportunity to the next enlisted session.
75,864✔
1012
        if (m_sending)
162,848✔
1013
            break;
101,840✔
1014
    }
162,848✔
1015
}
160,362✔
1016

1017

1018
void Connection::send_ping()
1019
{
202✔
1020
    REALM_ASSERT(!m_ping_delay_in_progress);
202✔
1021
    REALM_ASSERT(m_waiting_for_pong);
202✔
1022
    REALM_ASSERT(m_send_ping);
202✔
1023

80✔
1024
    m_send_ping = false;
202✔
1025
    if (m_reconnect_info.scheduled_reset)
202✔
1026
        m_ping_after_scheduled_reset_of_reconnect_info = true;
164✔
1027

80✔
1028
    m_last_ping_sent_at = monotonic_clock_now();
202✔
1029
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
202✔
1030
                 m_previous_ping_rtt); // Throws
202✔
1031

80✔
1032
    ClientProtocol& protocol = get_client_protocol();
202✔
1033
    OutputBuffer& out = get_output_buffer();
202✔
1034
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
202✔
1035
    initiate_write_ping(out);                                          // Throws
202✔
1036
    m_ping_sent = true;
202✔
1037
}
202✔
1038

1039

1040
void Connection::initiate_write_ping(const OutputBuffer& out)
1041
{
202✔
1042
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
202✔
1043
        if (sentinel->destroyed) {
202✔
1044
            return;
×
1045
        }
×
1046
        if (!status.is_ok()) {
202✔
1047
            if (status != ErrorCodes::Error::OperationAborted) {
×
1048
                // Write errors will be handled by the websocket_write_error_handler() callback
1049
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1050
            }
×
1051
            return;
×
1052
        }
×
1053
        handle_write_ping(); // Throws
202✔
1054
    });                      // Throws
202✔
1055
    m_sending = true;
202✔
1056
}
202✔
1057

1058

1059
void Connection::handle_write_ping()
1060
{
202✔
1061
    REALM_ASSERT(m_sending);
202✔
1062
    REALM_ASSERT(!m_sending_session);
202✔
1063
    m_sending = false;
202✔
1064
    send_next_message(); // Throws
202✔
1065
}
202✔
1066

1067

1068
void Connection::handle_message_received(util::Span<const char> data)
1069
{
81,090✔
1070
    // parse_message_received() parses the message and calls the proper handler
38,438✔
1071
    // on the Connection object (this).
38,438✔
1072
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,090✔
1073
}
81,090✔
1074

1075

1076
void Connection::initiate_disconnect_wait()
1077
{
4,350✔
1078
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,350✔
1079

2,050✔
1080
    if (m_disconnect_delay_in_progress) {
4,350✔
1081
        m_reconnect_disconnect_timer.reset();
2,080✔
1082
        m_disconnect_delay_in_progress = false;
2,080✔
1083
    }
2,080✔
1084

2,050✔
1085
    milliseconds_type time = m_client.m_connection_linger_time;
4,350✔
1086

2,050✔
1087
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,350✔
1088
        // If the operation is aborted, the connection object may have been
2,050✔
1089
        // destroyed.
2,050✔
1090
        if (status != ErrorCodes::OperationAborted)
4,350✔
1091
            handle_disconnect_wait(status); // Throws
10✔
1092
    });                                     // Throws
4,350✔
1093
    m_disconnect_delay_in_progress = true;
4,350✔
1094
}
4,350✔
1095

1096

1097
void Connection::handle_disconnect_wait(Status status)
1098
{
10✔
1099
    if (!status.is_ok()) {
10✔
1100
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1101
        throw Exception(status);
×
1102
    }
×
1103

2✔
1104
    m_disconnect_delay_in_progress = false;
10✔
1105

2✔
1106
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
10✔
1107
    if (m_num_active_unsuspended_sessions == 0) {
10✔
1108
        if (m_client.m_connection_linger_time > 0)
10✔
1109
            logger.detail("Linger time expired"); // Throws
4✔
1110
        voluntary_disconnect();                   // Throws
10✔
1111
        logger.info("Disconnected");              // Throws
10✔
1112
    }
10✔
1113
}
10✔
1114

1115

1116
void Connection::close_due_to_protocol_error(Status status)
1117
{
6✔
1118
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
6✔
1119
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
6✔
1120
    involuntary_disconnect(std::move(error_info),
6✔
1121
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
6✔
1122
}
6✔
1123

1124

1125
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1126
{
380✔
1127
    logger.info("Connection closed due to error: %1", status); // Throws
380✔
1128

232✔
1129
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
380✔
1130
}
380✔
1131

1132

1133
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1134
{
490✔
1135
    logger.info("Connection closed due to transient error: %1", status); // Throws
490✔
1136
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
490✔
1137
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
490✔
1138

256✔
1139
    involuntary_disconnect(std::move(error_info), reason); // Throw
490✔
1140
}
490✔
1141

1142

1143
// Close connection due to error discovered on the server-side, and then
1144
// reported to the client by way of a connection-level ERROR message.
1145
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1146
{
72✔
1147
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
72✔
1148
                int(error_code)); // Throws
72✔
1149

34✔
1150
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
46✔
1151
                                      : ConnectionTerminationReason::server_said_try_again_later;
60✔
1152
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
72✔
1153
                           reason); // Throws
72✔
1154
}
72✔
1155

1156

1157
void Connection::disconnect(const SessionErrorInfo& info)
1158
{
3,258✔
1159
    // Cancel connect timeout watchdog
1,612✔
1160
    m_connect_timer.reset();
3,258✔
1161

1,612✔
1162
    if (m_state == ConnectionState::connected) {
3,258✔
1163
        m_disconnect_time = monotonic_clock_now();
3,168✔
1164
        m_disconnect_has_occurred = true;
3,168✔
1165

1,568✔
1166
        // Sessions that are in the Deactivating state at this time can be
1,568✔
1167
        // immediately discarded, in part because they are no longer enlisted to
1,568✔
1168
        // send. Such sessions will be taken to the Deactivated state by
1,568✔
1169
        // Session::connection_lost(), and then they will be removed from
1,568✔
1170
        // `m_sessions`.
1,568✔
1171
        auto i = m_sessions.begin(), end = m_sessions.end();
3,168✔
1172
        while (i != end) {
6,792✔
1173
            // Prevent invalidation of the main iterator when erasing elements
2,012✔
1174
            auto j = i++;
3,624✔
1175
            Session& sess = *j->second;
3,624✔
1176
            sess.connection_lost(); // Throws
3,624✔
1177
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,624✔
1178
                m_sessions.erase(j);
1,792✔
1179
        }
3,624✔
1180
    }
3,168✔
1181

1,612✔
1182
    change_state_to_disconnected();
3,258✔
1183

1,612✔
1184
    m_ping_delay_in_progress = false;
3,258✔
1185
    m_waiting_for_pong = false;
3,258✔
1186
    m_send_ping = false;
3,258✔
1187
    m_minimize_next_ping_delay = false;
3,258✔
1188
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,258✔
1189
    m_ping_sent = false;
3,258✔
1190
    m_heartbeat_timer.reset();
3,258✔
1191
    m_previous_ping_rtt = 0;
3,258✔
1192

1,612✔
1193
    m_websocket_sentinel->destroyed = true;
3,258✔
1194
    m_websocket_sentinel.reset();
3,258✔
1195
    m_websocket.reset();
3,258✔
1196
    m_input_body_buffer.reset();
3,258✔
1197
    m_sending_session = nullptr;
3,258✔
1198
    m_sessions_enlisted_to_send.clear();
3,258✔
1199
    m_sending = false;
3,258✔
1200

1,612✔
1201
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,258✔
1202
    initiate_reconnect_wait();                                           // Throws
3,258✔
1203
}
3,258✔
1204

1205
bool Connection::is_flx_sync_connection() const noexcept
1206
{
107,616✔
1207
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
107,616✔
1208
}
107,616✔
1209

1210
void Connection::receive_pong(milliseconds_type timestamp)
1211
{
194✔
1212
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
194✔
1213

74✔
1214
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
194✔
1215
    if (REALM_UNLIKELY(!legal_at_this_time)) {
194✔
1216
        close_due_to_protocol_error(
×
1217
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1218
        return;
×
1219
    }
×
1220

74✔
1221
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
194✔
1222
        close_due_to_protocol_error(
×
1223
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1224
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1225
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1226
        return;
×
1227
    }
×
1228

74✔
1229
    milliseconds_type now = monotonic_clock_now();
194✔
1230
    milliseconds_type round_trip_time = now - timestamp;
194✔
1231
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
194✔
1232
    m_previous_ping_rtt = round_trip_time;
194✔
1233

74✔
1234
    // If this PONG message is a response to a PING mesage that was sent after
74✔
1235
    // the last invocation of cancel_reconnect_delay(), then the connection is
74✔
1236
    // still good, and we do not have to skip the next reconnect delay.
74✔
1237
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
194✔
1238
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
162✔
1239
        m_ping_after_scheduled_reset_of_reconnect_info = false;
162✔
1240
        m_reconnect_info.scheduled_reset = false;
162✔
1241
    }
162✔
1242

74✔
1243
    m_heartbeat_timer.reset();
194✔
1244
    m_waiting_for_pong = false;
194✔
1245

74✔
1246
    initiate_ping_delay(now); // Throws
194✔
1247

74✔
1248
    if (m_client.m_roundtrip_time_handler)
194✔
1249
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1250
}
194✔
1251

1252
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1253
{
73,284✔
1254
    if (session_ident == 0) {
73,284✔
1255
        return nullptr;
×
1256
    }
×
1257

35,538✔
1258
    auto* sess = get_session(session_ident);
73,284✔
1259
    if (REALM_LIKELY(sess)) {
73,290✔
1260
        return sess;
73,290✔
1261
    }
73,290✔
1262
    // Check the history to see if the message received was for a previous session
1263
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,647✔
1264
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1265
        close_due_to_protocol_error(
×
1266
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1267
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1268
                          session_ident)});
×
1269
    }
×
1270
    else {
2,147,483,647✔
1271
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1272
                     session_ident); // Throws
2,147,483,647✔
1273
    }
2,147,483,647✔
1274
    return nullptr;
2,147,483,647✔
1275
}
2,147,483,647✔
1276

1277
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1278
{
990✔
1279
    Session* sess = nullptr;
990✔
1280
    if (session_ident != 0) {
990✔
1281
        sess = find_and_validate_session(session_ident, "ERROR");
914✔
1282
        if (REALM_UNLIKELY(!sess)) {
914✔
1283
            return;
×
1284
        }
×
1285
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
914✔
1286
            close_due_to_protocol_error(std::move(status)); // Throws
×
1287
            return;
×
1288
        }
×
1289

458✔
1290
        if (sess->m_state == Session::Deactivated) {
914✔
1291
            finish_session_deactivation(sess);
2✔
1292
        }
2✔
1293
        return;
914✔
1294
    }
914✔
1295

36✔
1296
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
76✔
1297
                info.message, info.raw_error_code, info.is_fatal, session_ident,
76✔
1298
                info.server_requests_action); // Throws
76✔
1299

36✔
1300
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
76✔
1301
    if (REALM_LIKELY(known_error_code)) {
76✔
1302
        ProtocolError error_code = ProtocolError(info.raw_error_code);
72✔
1303
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
72✔
1304
            close_due_to_server_side_error(error_code, info); // Throws
72✔
1305
            return;
72✔
1306
        }
72✔
1307
        close_due_to_protocol_error(
×
1308
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1309
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1310
                          info.raw_error_code)});
×
1311
    }
×
1312
    else {
4✔
1313
        close_due_to_protocol_error(
4✔
1314
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1315
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1316
    }
4✔
1317
}
76✔
1318

1319

1320
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1321
                                             session_ident_type session_ident)
1322
{
16✔
1323
    if (session_ident == 0) {
16✔
1324
        return close_due_to_protocol_error(
×
1325
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1326
    }
×
1327

8✔
1328
    if (!is_flx_sync_connection()) {
16✔
1329
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1330
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1331
    }
×
1332

8✔
1333
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1334
    if (REALM_UNLIKELY(!sess)) {
16✔
1335
        return;
×
1336
    }
×
1337

8✔
1338
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1339
        close_due_to_protocol_error(std::move(status));
×
1340
    }
×
1341
}
16✔
1342

1343

1344
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1345
{
3,238✔
1346
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,238✔
1347
    if (REALM_UNLIKELY(!sess)) {
3,238✔
1348
        return;
×
1349
    }
×
1350

1,532✔
1351
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,238✔
1352
        close_due_to_protocol_error(std::move(status)); // Throws
×
1353
}
3,238✔
1354

1355
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1356
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1357
                                          DownloadBatchState batch_state,
1358
                                          const ReceivedChangesets& received_changesets)
1359
{
45,756✔
1360
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
45,756✔
1361
    if (REALM_UNLIKELY(!sess)) {
45,756✔
1362
        return;
×
1363
    }
×
1364

23,700✔
1365
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
45,756✔
1366
                                                     received_changesets);
45,756✔
1367
        !status.is_ok()) {
45,756✔
1368
        close_due_to_protocol_error(std::move(status));
×
1369
    }
×
1370
}
45,756✔
1371

1372
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1373
{
17,592✔
1374
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,592✔
1375
    if (REALM_UNLIKELY(!sess)) {
17,592✔
1376
        return;
×
1377
    }
×
1378

7,714✔
1379
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,592✔
1380
        close_due_to_protocol_error(std::move(status)); // Throws
2✔
1381
}
17,592✔
1382

1383

1384
void Connection::receive_unbound_message(session_ident_type session_ident)
1385
{
5,728✔
1386
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
5,728✔
1387
    if (REALM_UNLIKELY(!sess)) {
5,728✔
1388
        return;
×
1389
    }
×
1390

2,104✔
1391
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
5,728✔
1392
        close_due_to_protocol_error(std::move(status)); // Throws
×
1393
        return;
×
1394
    }
×
1395

2,104✔
1396
    if (sess->m_state == Session::Deactivated) {
5,728✔
1397
        finish_session_deactivation(sess);
5,728✔
1398
    }
5,728✔
1399
}
5,728✔
1400

1401

1402
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1403
                                               std::string_view body)
1404
{
44✔
1405
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
44✔
1406
    if (REALM_UNLIKELY(!sess)) {
44✔
1407
        return;
×
1408
    }
×
1409

22✔
1410
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
44✔
1411
        close_due_to_protocol_error(std::move(status));
×
1412
    }
×
1413
}
44✔
1414

1415

1416
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1417
                                            std::string_view message)
1418
{
7,530✔
1419
    std::string prefix;
7,530✔
1420
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
7,530✔
1421
        prefix = util::format("Server[%1]", m_appservices_coid);
7,530✔
1422
    }
7,530✔
1423
    else {
×
1424
        prefix = "Server";
×
1425
    }
×
1426

2,790✔
1427
    if (session_ident != 0) {
7,530✔
1428
        if (auto sess = get_session(session_ident)) {
5,740✔
1429
            sess->logger.log(level, "%1 log: %2", prefix, message);
5,740✔
1430
            return;
5,740✔
1431
        }
5,740✔
1432

1433
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1434
        return;
×
1435
    }
×
1436

890✔
1437
    logger.log(level, "%1 log: %2", prefix, message);
1,790✔
1438
}
1,790✔
1439

1440

1441
void Connection::receive_appservices_request_id(std::string_view coid)
1442
{
4,958✔
1443
    // Only set once per connection
2,456✔
1444
    if (!coid.empty() && m_appservices_coid.empty()) {
4,958✔
1445
        m_appservices_coid = coid;
2,294✔
1446
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,294✔
1447
    }
2,294✔
1448
}
4,958✔
1449

1450

1451
void Connection::handle_protocol_error(Status status)
1452
{
×
1453
    close_due_to_protocol_error(std::move(status));
×
1454
}
×
1455

1456

1457
// Sessions are guaranteed to be granted the opportunity to send a message in
1458
// the order that they enlist. Note that this is important to ensure
1459
// nonoverlapping communication with the server for consecutive sessions
1460
// associated with the same Realm file.
1461
//
1462
// CAUTION: The specified session may get destroyed before this function
1463
// returns, but only if its Session::send_message() puts it into the Deactivated
1464
// state.
1465
void Connection::enlist_to_send(Session* sess)
1466
{
164,330✔
1467
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
164,330✔
1468
    m_sessions_enlisted_to_send.push_back(sess); // Throws
164,330✔
1469
    if (!m_sending)
164,330✔
1470
        send_next_message(); // Throws
60,012✔
1471
}
164,330✔
1472

1473

1474
std::string Connection::get_active_appservices_connection_id()
1475
{
72✔
1476
    return m_appservices_coid;
72✔
1477
}
72✔
1478

1479
void Session::cancel_resumption_delay()
1480
{
3,712✔
1481
    REALM_ASSERT_EX(m_state == Active, m_state);
3,712✔
1482

2,068✔
1483
    if (!m_suspended)
3,712✔
1484
        return;
3,312✔
1485

204✔
1486
    m_suspended = false;
400✔
1487

204✔
1488
    logger.debug("Resumed"); // Throws
400✔
1489

204✔
1490
    if (unbind_process_complete())
400✔
1491
        initiate_rebind(); // Throws
392✔
1492

204✔
1493
    m_conn.one_more_active_unsuspended_session(); // Throws
400✔
1494

204✔
1495
    on_resumed(); // Throws
400✔
1496
}
400✔
1497

1498

1499
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1500
                                                 std::vector<ProtocolErrorInfo>* out)
1501
{
21,230✔
1502
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,230✔
1503
        return;
21,186✔
1504
    }
21,186✔
1505

22✔
1506
#ifdef REALM_DEBUG
44✔
1507
    REALM_ASSERT_DEBUG(
44✔
1508
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
44✔
1509
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
44✔
1510
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
44✔
1511
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1512
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1513
                       }));
44✔
1514
#endif
44✔
1515

22✔
1516
    while (!m_pending_compensating_write_errors.empty() &&
88✔
1517
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1518
               changesets.back().version) {
44✔
1519
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1520
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1521
        out->push_back(std::move(cur_error));
44✔
1522
        m_pending_compensating_write_errors.pop_front();
44✔
1523
    }
44✔
1524
}
44✔
1525

1526

1527
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1528
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1529
                                   DownloadBatchState download_batch_state)
1530
{
43,592✔
1531
    auto& history = get_history();
43,592✔
1532
    if (received_changesets.empty()) {
43,592✔
1533
        if (download_batch_state == DownloadBatchState::MoreToCome) {
22,338✔
1534
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1535
                                       "received empty download message that was not the last in batch",
×
1536
                                       ProtocolError::bad_progress);
×
1537
        }
×
1538
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
22,338✔
1539
        return;
22,338✔
1540
    }
22,338✔
1541

11,826✔
1542
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,254✔
1543
    auto transact = get_db()->start_read();
21,254✔
1544
    history.integrate_server_changesets(
21,254✔
1545
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,254✔
1546
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,242✔
1547
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,230✔
1548
        }); // Throws
21,230✔
1549
    if (received_changesets.size() == 1) {
21,254✔
1550
        logger.debug("1 remote changeset integrated, producing client version %1",
14,530✔
1551
                     version_info.sync_version.version); // Throws
14,530✔
1552
    }
14,530✔
1553
    else {
6,724✔
1554
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,724✔
1555
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,724✔
1556
    }
6,724✔
1557

11,826✔
1558
    for (const auto& pending_error : pending_compensating_write_errors) {
11,848✔
1559
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1560
                    pending_error.compensating_write_rejected_client_version,
44✔
1561
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1562
        try {
44✔
1563
            on_connection_state_changed(
44✔
1564
                m_conn.get_state(),
44✔
1565
                SessionErrorInfo{pending_error,
44✔
1566
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1567
                                                          pending_error.message)});
44✔
1568
        }
44✔
1569
        catch (...) {
22✔
1570
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1571
        }
×
1572
    }
44✔
1573
}
21,254✔
1574

1575

1576
void Session::on_integration_failure(const IntegrationException& error)
1577
{
32✔
1578
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1579
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1580
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1581

16✔
1582
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1583
    m_error_to_send = true;
32✔
1584

16✔
1585
    // Surface the error to the user otherwise is lost.
16✔
1586
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1587

16✔
1588
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1589
    // message cannot have been sent unless an ERROR message was received.
16✔
1590
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1591
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1592
        ensure_enlisted_to_send(); // Throws
32✔
1593
    }
32✔
1594
}
32✔
1595

1596
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1597
{
45,034✔
1598
    REALM_ASSERT_EX(m_state == Active, m_state);
45,034✔
1599
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
45,034✔
1600
    m_download_progress = progress.download;
45,034✔
1601
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
45,034✔
1602
    m_progress = progress;
45,034✔
1603
    if (upload_progressed) {
45,034✔
1604
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
32,840✔
1605
            if (progress.upload.client_version > m_upload_progress.client_version)
14,552✔
1606
                m_upload_progress = progress.upload;
1,840✔
1607
            m_last_version_selected_for_upload = progress.upload.client_version;
14,552✔
1608
        }
14,552✔
1609

16,862✔
1610
        check_for_upload_completion();
32,840✔
1611
    }
32,840✔
1612

23,290✔
1613
    do_recognize_sync_version(client_version); // Allows upload process to resume
45,034✔
1614
    check_for_download_completion();           // Throws
45,034✔
1615

23,290✔
1616
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,290✔
1617
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
45,034✔
1618
        auto& flx_subscription_store = *get_flx_subscription_store();
2,360✔
1619
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,360✔
1620
    }
2,360✔
1621

23,290✔
1622
    // Since the deactivation process has not been initiated, the UNBIND
23,290✔
1623
    // message cannot have been sent unless an ERROR message was received.
23,290✔
1624
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
45,034✔
1625
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
45,034✔
1626
        ensure_enlisted_to_send(); // Throws
45,030✔
1627
    }
45,030✔
1628
}
45,034✔
1629

1630

1631
Session::~Session()
1632
{
9,662✔
1633
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,654✔
1634
}
9,662✔
1635

1636

1637
std::string Session::make_logger_prefix(session_ident_type ident)
1638
{
9,660✔
1639
    std::ostringstream out;
9,660✔
1640
    out.imbue(std::locale::classic());
9,660✔
1641
    out << "Session[" << ident << "]: "; // Throws
9,660✔
1642
    return out.str();                    // Throws
9,660✔
1643
}
9,660✔
1644

1645

1646
void Session::activate()
1647
{
9,662✔
1648
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,662✔
1649

4,654✔
1650
    logger.debug("Activating"); // Throws
9,662✔
1651

4,654✔
1652
    bool has_pending_client_reset = false;
9,662✔
1653
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,662✔
1654
        bool file_exists = util::File::exists(get_realm_path());
9,660✔
1655
        m_performing_client_reset = get_client_reset_config().has_value();
9,660✔
1656

4,652✔
1657
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
9,660✔
1658
        if (!m_performing_client_reset) {
9,660✔
1659
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,326✔
1660
                                     &has_pending_client_reset); // Throws
9,326✔
1661
        }
9,326✔
1662
    }
9,660✔
1663
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,662✔
1664
                 m_client_file_ident.salt); // Throws
9,662✔
1665
    m_upload_progress = m_progress.upload;
9,662✔
1666
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,662✔
1667
    m_download_progress = m_progress.download;
9,662✔
1668
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,662✔
1669

4,654✔
1670
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,662✔
1671
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,662✔
1672
    logger.debug("progress_download_client_version = %1",
9,662✔
1673
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,662✔
1674
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,662✔
1675
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,662✔
1676

4,654✔
1677
    reset_protocol_state();
9,662✔
1678
    m_state = Active;
9,662✔
1679

4,654✔
1680
    REALM_ASSERT(!m_suspended);
9,662✔
1681
    m_conn.one_more_active_unsuspended_session(); // Throws
9,662✔
1682

4,654✔
1683
    try {
9,662✔
1684
        process_pending_flx_bootstrap();
9,662✔
1685
    }
9,662✔
1686
    catch (const IntegrationException& error) {
4,656✔
1687
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1688
        m_suspended = true;
4✔
1689
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1690
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
4✔
1691
    }
4✔
1692

4,654✔
1693
    if (has_pending_client_reset) {
9,662✔
1694
        handle_pending_client_reset_acknowledgement();
18✔
1695
    }
18✔
1696
}
9,656✔
1697

1698

1699
// The caller (Connection) must discard the session if the session has become
1700
// deactivated upon return.
1701
void Session::initiate_deactivation()
1702
{
9,662✔
1703
    REALM_ASSERT_EX(m_state == Active, m_state);
9,662✔
1704

4,654✔
1705
    logger.debug("Initiating deactivation"); // Throws
9,662✔
1706

4,654✔
1707
    m_state = Deactivating;
9,662✔
1708

4,654✔
1709
    if (!m_suspended)
9,662✔
1710
        m_conn.one_less_active_unsuspended_session(); // Throws
9,132✔
1711

4,654✔
1712
    if (m_enlisted_to_send) {
9,662✔
1713
        REALM_ASSERT(!unbind_process_complete());
4,440✔
1714
        return;
4,440✔
1715
    }
4,440✔
1716

2,484✔
1717
    // Deactivate immediately if the BIND message has not yet been sent and the
2,484✔
1718
    // session is not enlisted to send, or if the unbinding process has already
2,484✔
1719
    // completed.
2,484✔
1720
    if (!m_bind_message_sent || unbind_process_complete()) {
5,222✔
1721
        complete_deactivation(); // Throws
936✔
1722
        // Life cycle state is now Deactivated
414✔
1723
        return;
936✔
1724
    }
936✔
1725

2,070✔
1726
    // Ready to send the UNBIND message, if it has not already been sent
2,070✔
1727
    if (!m_unbind_message_sent) {
4,286✔
1728
        enlist_to_send(); // Throws
4,138✔
1729
        return;
4,138✔
1730
    }
4,138✔
1731
}
4,286✔
1732

1733

1734
void Session::complete_deactivation()
1735
{
9,662✔
1736
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,662✔
1737
    m_state = Deactivated;
9,662✔
1738

4,654✔
1739
    logger.debug("Deactivation completed"); // Throws
9,662✔
1740
}
9,662✔
1741

1742

1743
// Called by the associated Connection object when this session is granted an
1744
// opportunity to send a message.
1745
//
1746
// The caller (Connection) must discard the session if the session has become
1747
// deactivated upon return.
1748
void Session::send_message()
1749
{
162,854✔
1750
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
162,854✔
1751
    REALM_ASSERT(m_enlisted_to_send);
162,854✔
1752
    m_enlisted_to_send = false;
162,854✔
1753
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
162,854✔
1754
        // Deactivation has been initiated. If the UNBIND message has not been
4,360✔
1755
        // sent yet, there is no point in sending it. Instead, we can let the
4,360✔
1756
        // deactivation process complete.
4,360✔
1757
        if (!m_bind_message_sent) {
9,052✔
1758
            return complete_deactivation(); // Throws
1,124✔
1759
            // Life cycle state is now Deactivated
1,104✔
1760
        }
1,124✔
1761

3,256✔
1762
        // Session life cycle state is Deactivating or the unbinding process has
3,256✔
1763
        // been initiated by a session specific ERROR message
3,256✔
1764
        if (!m_unbind_message_sent)
7,928✔
1765
            send_unbind_message(); // Throws
7,928✔
1766
        return;
7,928✔
1767
    }
7,928✔
1768

71,504✔
1769
    // Session life cycle state is Active and the unbinding process has
71,504✔
1770
    // not been initiated
71,504✔
1771
    REALM_ASSERT(!m_unbind_message_sent);
153,802✔
1772

71,504✔
1773
    if (!m_bind_message_sent)
153,802✔
1774
        return send_bind_message(); // Throws
10,144✔
1775

66,998✔
1776
    if (!m_ident_message_sent) {
143,658✔
1777
        if (have_client_file_ident())
8,458✔
1778
            send_ident_message(); // Throws
8,458✔
1779
        return;
8,458✔
1780
    }
8,458✔
1781

63,740✔
1782
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
135,200✔
1783
                                                      [](const PendingTestCommand& command) {
63,788✔
1784
                                                          return command.pending;
96✔
1785
                                                      });
96✔
1786
    if (has_pending_test_command) {
135,200✔
1787
        return send_test_command_message();
44✔
1788
    }
44✔
1789

63,718✔
1790
    if (m_error_to_send)
135,156✔
1791
        return send_json_error_message(); // Throws
26✔
1792

63,706✔
1793
    // Stop sending upload, mark and query messages when the client detects an error.
63,706✔
1794
    if (m_client_error) {
135,130✔
1795
        return;
12✔
1796
    }
12✔
1797

63,700✔
1798
    if (m_target_download_mark > m_last_download_mark_sent)
135,118✔
1799
        return send_mark_message(); // Throws
18,292✔
1800

55,632✔
1801
    auto is_upload_allowed = [&]() -> bool {
116,830✔
1802
        if (!m_is_flx_sync_session) {
116,830✔
1803
            return true;
105,674✔
1804
        }
105,674✔
1805

5,832✔
1806
        auto migration_store = get_migration_store();
11,156✔
1807
        if (!migration_store) {
11,156✔
1808
            return true;
×
1809
        }
×
1810

5,832✔
1811
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
11,156✔
1812
        if (!sentinel_query_version) {
11,156✔
1813
            return true;
11,130✔
1814
        }
11,130✔
1815

14✔
1816
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
14✔
1817
        return m_last_sent_flx_query_version != *sentinel_query_version;
26✔
1818
    };
26✔
1819

55,632✔
1820
    if (!is_upload_allowed()) {
116,826✔
1821
        return;
12✔
1822
    }
12✔
1823

55,626✔
1824
    auto check_pending_flx_version = [&]() -> bool {
116,818✔
1825
        if (!m_is_flx_sync_session) {
116,816✔
1826
            return false;
105,674✔
1827
        }
105,674✔
1828

5,824✔
1829
        if (!m_allow_upload) {
11,142✔
1830
            return false;
2,208✔
1831
        }
2,208✔
1832

4,590✔
1833
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
8,934✔
1834

4,590✔
1835
        if (!m_pending_flx_sub_set) {
8,934✔
1836
            return false;
7,388✔
1837
        }
7,388✔
1838

768✔
1839
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,546✔
1840
    };
1,546✔
1841

55,626✔
1842
    if (check_pending_flx_version()) {
116,814✔
1843
        return send_query_change_message(); // throws
888✔
1844
    }
888✔
1845

55,184✔
1846
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
115,926✔
1847
        return send_upload_message(); // Throws
56,060✔
1848
    }
56,060✔
1849
}
115,926✔
1850

1851

1852
void Session::send_bind_message()
1853
{
10,144✔
1854
    REALM_ASSERT_EX(m_state == Active, m_state);
10,144✔
1855

4,506✔
1856
    session_ident_type session_ident = m_ident;
10,144✔
1857
    bool need_client_file_ident = !have_client_file_ident();
10,144✔
1858
    const bool is_subserver = false;
10,144✔
1859

4,506✔
1860

4,506✔
1861
    ClientProtocol& protocol = m_conn.get_client_protocol();
10,144✔
1862
    int protocol_version = m_conn.get_negotiated_protocol_version();
10,144✔
1863
    OutputBuffer& out = m_conn.get_output_buffer();
10,144✔
1864
    // Discard the token since it's ignored by the server.
4,506✔
1865
    std::string empty_access_token;
10,144✔
1866
    if (m_is_flx_sync_session) {
10,144✔
1867
        nlohmann::json bind_json_data;
1,414✔
1868
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,414✔
1869
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1870
        }
60✔
1871
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,414✔
1872
        if (logger.would_log(util::Logger::Level::debug)) {
1,414✔
1873
            std::string json_data_dump;
1,414✔
1874
            if (!bind_json_data.empty()) {
1,414✔
1875
                json_data_dump = bind_json_data.dump();
1,414✔
1876
            }
1,414✔
1877
            logger.debug(
1,414✔
1878
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,414✔
1879
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,414✔
1880
        }
1,414✔
1881
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,414✔
1882
                                       need_client_file_ident, is_subserver); // Throws
1,414✔
1883
    }
1,414✔
1884
    else {
8,730✔
1885
        std::string server_path = get_virt_path();
8,730✔
1886
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
8,730✔
1887
                     session_ident, need_client_file_ident, is_subserver, server_path);
8,730✔
1888
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
8,730✔
1889
                                       need_client_file_ident, is_subserver); // Throws
8,730✔
1890
    }
8,730✔
1891
    m_conn.initiate_write_message(out, this); // Throws
10,144✔
1892

4,506✔
1893
    m_bind_message_sent = true;
10,144✔
1894

4,506✔
1895
    // Ready to send the IDENT message if the file identifier pair is already
4,506✔
1896
    // available.
4,506✔
1897
    if (!need_client_file_ident)
10,144✔
1898
        enlist_to_send(); // Throws
5,328✔
1899
}
10,144✔
1900

1901

1902
void Session::send_ident_message()
1903
{
8,458✔
1904
    REALM_ASSERT_EX(m_state == Active, m_state);
8,458✔
1905
    REALM_ASSERT(m_bind_message_sent);
8,458✔
1906
    REALM_ASSERT(!m_unbind_message_sent);
8,458✔
1907
    REALM_ASSERT(have_client_file_ident());
8,458✔
1908

3,258✔
1909

3,258✔
1910
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,458✔
1911
    OutputBuffer& out = m_conn.get_output_buffer();
8,458✔
1912
    session_ident_type session_ident = m_ident;
8,458✔
1913

3,258✔
1914
    if (m_is_flx_sync_session) {
8,458✔
1915
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,008✔
1916
        const auto active_query_body = active_query_set.to_ext_json();
1,008✔
1917
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,008✔
1918
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,008✔
1919
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,008✔
1920
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,008✔
1921
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,008✔
1922
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,008✔
1923
                     active_query_body); // Throws
1,008✔
1924
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,008✔
1925
                                        active_query_set.version(), active_query_body); // Throws
1,008✔
1926
        m_last_sent_flx_query_version = active_query_set.version();
1,008✔
1927
    }
1,008✔
1928
    else {
7,450✔
1929
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
7,450✔
1930
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
7,450✔
1931
                     "latest_server_version_salt=%6)",
7,450✔
1932
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
7,450✔
1933
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
7,450✔
1934
                     m_progress.latest_server_version.salt);                                  // Throws
7,450✔
1935
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
7,450✔
1936
    }
7,450✔
1937
    m_conn.initiate_write_message(out, this); // Throws
8,458✔
1938

3,258✔
1939
    m_ident_message_sent = true;
8,458✔
1940

3,258✔
1941
    // Other messages may be waiting to be sent
3,258✔
1942
    enlist_to_send(); // Throws
8,458✔
1943
}
8,458✔
1944

1945
void Session::send_query_change_message()
1946
{
888✔
1947
    REALM_ASSERT_EX(m_state == Active, m_state);
888✔
1948
    REALM_ASSERT(m_ident_message_sent);
888✔
1949
    REALM_ASSERT(!m_unbind_message_sent);
888✔
1950
    REALM_ASSERT(m_pending_flx_sub_set);
888✔
1951
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
888✔
1952

442✔
1953
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
888✔
1954
        return;
×
1955
    }
×
1956

442✔
1957
    auto sub_store = get_flx_subscription_store();
888✔
1958
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
888✔
1959
    auto latest_queries = latest_sub_set.to_ext_json();
888✔
1960
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
888✔
1961
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
888✔
1962

442✔
1963
    OutputBuffer& out = m_conn.get_output_buffer();
888✔
1964
    session_ident_type session_ident = get_ident();
888✔
1965
    ClientProtocol& protocol = m_conn.get_client_protocol();
888✔
1966
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
888✔
1967
    m_conn.initiate_write_message(out, this);
888✔
1968

442✔
1969
    m_last_sent_flx_query_version = latest_sub_set.version();
888✔
1970

442✔
1971
    request_download_completion_notification();
888✔
1972
}
888✔
1973

1974
void Session::send_upload_message()
1975
{
56,060✔
1976
    REALM_ASSERT_EX(m_state == Active, m_state);
56,060✔
1977
    REALM_ASSERT(m_ident_message_sent);
56,060✔
1978
    REALM_ASSERT(!m_unbind_message_sent);
56,060✔
1979

27,704✔
1980
    if (REALM_UNLIKELY(get_client().is_dry_run()))
56,060✔
1981
        return;
27,704✔
1982

27,704✔
1983
    version_type target_upload_version = m_last_version_available;
56,060✔
1984
    if (m_pending_flx_sub_set) {
56,060✔
1985
        REALM_ASSERT(m_is_flx_sync_session);
660✔
1986
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
660✔
1987
    }
660✔
1988

27,704✔
1989
    std::vector<UploadChangeset> uploadable_changesets;
56,060✔
1990
    version_type locked_server_version = 0;
56,060✔
1991
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
56,060✔
1992
                                             locked_server_version); // Throws
56,060✔
1993

27,704✔
1994
    if (uploadable_changesets.empty()) {
56,060✔
1995
        // Nothing more to upload right now
13,832✔
1996
        check_for_upload_completion(); // Throws
29,370✔
1997
        // If we need to limit upload up to some version other than the last client version available and there are no
13,832✔
1998
        // changes to upload, then there is no need to send an empty message.
13,832✔
1999
        if (m_pending_flx_sub_set) {
29,370✔
2000
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
190✔
2001
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
190✔
2002
            // Other messages may be waiting to be sent
94✔
2003
            return enlist_to_send(); // Throws
190✔
2004
        }
190✔
2005
    }
26,690✔
2006
    else {
26,690✔
2007
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
26,690✔
2008
    }
26,690✔
2009

27,704✔
2010
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
55,964✔
2011
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
470✔
2012
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
470✔
2013
    }
470✔
2014

27,610✔
2015
    version_type progress_client_version = m_upload_progress.client_version;
55,870✔
2016
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
55,870✔
2017

27,610✔
2018
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,870✔
2019
                 "locked_server_version=%3, num_changesets=%4)",
55,870✔
2020
                 progress_client_version, progress_server_version, locked_server_version,
55,870✔
2021
                 uploadable_changesets.size()); // Throws
55,870✔
2022

27,610✔
2023
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,870✔
2024
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,870✔
2025

27,610✔
2026
    for (const UploadChangeset& uc : uploadable_changesets) {
49,026✔
2027
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
41,628✔
2028
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
41,628✔
2029
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
41,628✔
2030
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
41,628✔
2031
        if (logger.would_log(util::Logger::Level::trace)) {
41,628✔
2032
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2033
            if (changeset_data.size() < 1024) {
×
2034
                logger.trace("Changeset: %1",
×
2035
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2036
            }
×
2037
            else {
×
2038
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2039
                             protocol.compressed_hex_dump(changeset_data));
×
2040
            }
×
2041

2042
#if REALM_DEBUG
×
2043
            ChunkedBinaryInputStream in{changeset_data};
×
2044
            Changeset log;
×
2045
            try {
×
2046
                parse_changeset(in, log);
×
2047
                std::stringstream ss;
×
2048
                log.print(ss);
×
2049
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2050
            }
×
2051
            catch (const BadChangesetError& err) {
×
2052
                logger.error("Unable to parse changeset: %1", err.what());
×
2053
            }
×
2054
#endif
×
2055
        }
×
2056

20,212✔
2057
#if 0 // Upload log compaction is currently not implemented
2058
        if (!get_client().m_disable_upload_compaction) {
2059
            ChangesetEncoder::Buffer encode_buffer;
2060

2061
            {
2062
                // Upload compaction only takes place within single changesets to
2063
                // avoid another client seeing inconsistent snapshots.
2064
                ChunkedBinaryInputStream stream{uc.changeset};
2065
                Changeset changeset;
2066
                parse_changeset(stream, changeset); // Throws
2067
                // FIXME: What is the point of setting these? How can compaction care about them?
2068
                changeset.version = uc.progress.client_version;
2069
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2070
                changeset.origin_timestamp = uc.origin_timestamp;
2071
                changeset.origin_file_ident = uc.origin_file_ident;
2072

2073
                compact_changesets(&changeset, 1);
2074
                encode_changeset(changeset, encode_buffer);
2075

2076
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2077
                             encode_buffer.size()); // Throws
2078
            }
2079

2080
            upload_message_builder.add_changeset(
2081
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2082
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2083
        }
2084
        else
2085
#endif
2086
        {
41,628✔
2087
            upload_message_builder.add_changeset(uc.progress.client_version,
41,628✔
2088
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
41,628✔
2089
                                                 uc.origin_file_ident,
41,628✔
2090
                                                 uc.changeset); // Throws
41,628✔
2091
        }
41,628✔
2092
    }
41,628✔
2093

27,610✔
2094
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,870✔
2095
    OutputBuffer& out = m_conn.get_output_buffer();
55,870✔
2096
    session_ident_type session_ident = get_ident();
55,870✔
2097
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,870✔
2098
                                               progress_server_version,
55,870✔
2099
                                               locked_server_version); // Throws
55,870✔
2100
    m_conn.initiate_write_message(out, this);                          // Throws
55,870✔
2101

27,610✔
2102
    // Other messages may be waiting to be sent
27,610✔
2103
    enlist_to_send(); // Throws
55,870✔
2104
}
55,870✔
2105

2106

2107
void Session::send_mark_message()
2108
{
18,292✔
2109
    REALM_ASSERT_EX(m_state == Active, m_state);
18,292✔
2110
    REALM_ASSERT(m_ident_message_sent);
18,292✔
2111
    REALM_ASSERT(!m_unbind_message_sent);
18,292✔
2112
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
18,292✔
2113

8,068✔
2114
    request_ident_type request_ident = m_target_download_mark;
18,292✔
2115
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
18,292✔
2116

8,068✔
2117
    ClientProtocol& protocol = m_conn.get_client_protocol();
18,292✔
2118
    OutputBuffer& out = m_conn.get_output_buffer();
18,292✔
2119
    session_ident_type session_ident = get_ident();
18,292✔
2120
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
18,292✔
2121
    m_conn.initiate_write_message(out, this);                      // Throws
18,292✔
2122

8,068✔
2123
    m_last_download_mark_sent = request_ident;
18,292✔
2124

8,068✔
2125
    // Other messages may be waiting to be sent
8,068✔
2126
    enlist_to_send(); // Throws
18,292✔
2127
}
18,292✔
2128

2129

2130
void Session::send_unbind_message()
2131
{
7,928✔
2132
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
7,928✔
2133
    REALM_ASSERT(m_bind_message_sent);
7,928✔
2134
    REALM_ASSERT(!m_unbind_message_sent);
7,928✔
2135

3,256✔
2136
    logger.debug("Sending: UNBIND"); // Throws
7,928✔
2137

3,256✔
2138
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,928✔
2139
    OutputBuffer& out = m_conn.get_output_buffer();
7,928✔
2140
    session_ident_type session_ident = get_ident();
7,928✔
2141
    protocol.make_unbind_message(out, session_ident); // Throws
7,928✔
2142
    m_conn.initiate_write_message(out, this);         // Throws
7,928✔
2143

3,256✔
2144
    m_unbind_message_sent = true;
7,928✔
2145
}
7,928✔
2146

2147

2148
void Session::send_json_error_message()
2149
{
26✔
2150
    REALM_ASSERT_EX(m_state == Active, m_state);
26✔
2151
    REALM_ASSERT(m_ident_message_sent);
26✔
2152
    REALM_ASSERT(!m_unbind_message_sent);
26✔
2153
    REALM_ASSERT(m_error_to_send);
26✔
2154
    REALM_ASSERT(m_client_error);
26✔
2155

12✔
2156
    ClientProtocol& protocol = m_conn.get_client_protocol();
26✔
2157
    OutputBuffer& out = m_conn.get_output_buffer();
26✔
2158
    session_ident_type session_ident = get_ident();
26✔
2159
    auto protocol_error = m_client_error->error_for_server;
26✔
2160

12✔
2161
    auto message = util::format("%1", m_client_error->to_status());
26✔
2162
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
26✔
2163
                session_ident); // Throws
26✔
2164

12✔
2165
    nlohmann::json error_body_json;
26✔
2166
    error_body_json["message"] = std::move(message);
26✔
2167
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
26✔
2168
                                     error_body_json.dump()); // Throws
26✔
2169
    m_conn.initiate_write_message(out, this);                 // Throws
26✔
2170

12✔
2171
    m_error_to_send = false;
26✔
2172
    enlist_to_send(); // Throws
26✔
2173
}
26✔
2174

2175

2176
void Session::send_test_command_message()
2177
{
44✔
2178
    REALM_ASSERT_EX(m_state == Active, m_state);
44✔
2179

22✔
2180
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2181
                           [](const PendingTestCommand& command) {
44✔
2182
                               return command.pending;
44✔
2183
                           });
44✔
2184
    REALM_ASSERT(it != m_pending_test_commands.end());
44✔
2185

22✔
2186
    ClientProtocol& protocol = m_conn.get_client_protocol();
44✔
2187
    OutputBuffer& out = m_conn.get_output_buffer();
44✔
2188
    auto session_ident = get_ident();
44✔
2189

22✔
2190
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
44✔
2191
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
44✔
2192

22✔
2193
    m_conn.initiate_write_message(out, this); // Throws;
44✔
2194
    it->pending = false;
44✔
2195

22✔
2196
    enlist_to_send();
44✔
2197
}
44✔
2198

2199
bool Session::client_reset_if_needed()
2200
{
3,218✔
2201
    // Regardless of what happens, once we return from this function we will
1,526✔
2202
    // no longer be in the middle of a client reset
1,526✔
2203
    m_performing_client_reset = false;
3,218✔
2204

1,526✔
2205
    // Even if we end up not actually performing a client reset, consume the
1,526✔
2206
    // config to ensure that the resources it holds are released
1,526✔
2207
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,218✔
2208
    if (!client_reset_config) {
3,218✔
2209
        return false;
2,882✔
2210
    }
2,882✔
2211

168✔
2212
    auto on_flx_version_complete = [this](int64_t version) {
336✔
2213
        this->on_flx_sync_version_complete(version);
232✔
2214
    };
232✔
2215
    bool did_reset = client_reset::perform_client_reset(
336✔
2216
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
336✔
2217
        std::move(client_reset_config->notify_before_client_reset),
336✔
2218
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
336✔
2219
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
336✔
2220
    if (!did_reset) {
336✔
2221
        return false;
×
2222
    }
×
2223

168✔
2224
    // The fresh Realm has been used to reset the state
168✔
2225
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2226

168✔
2227
    SaltedFileIdent client_file_ident;
336✔
2228
    bool has_pending_client_reset = false;
336✔
2229
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
336✔
2230
                             &has_pending_client_reset); // Throws
336✔
2231
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
336✔
2232
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
336✔
2233
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2234
                    m_progress.download.last_integrated_client_version);
336✔
2235
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
336✔
2236
    REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
336✔
2237
                    m_progress.upload.last_integrated_server_version);
336✔
2238
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
336✔
2239

168✔
2240
    m_upload_progress = m_progress.upload;
336✔
2241
    m_download_progress = m_progress.download;
336✔
2242
    // In recovery mode, there may be new changesets to upload and nothing left to download.
168✔
2243
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
168✔
2244
    // For both, we want to allow uploads again without needing external changes to download first.
168✔
2245
    m_allow_upload = true;
336✔
2246
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
336✔
2247

168✔
2248
    if (has_pending_client_reset) {
336✔
2249
        handle_pending_client_reset_acknowledgement();
268✔
2250
    }
268✔
2251

168✔
2252
    update_subscription_version_info();
336✔
2253

168✔
2254
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
168✔
2255
    if (auto migration_store = get_migration_store()) {
336✔
2256
        migration_store->complete_migration_or_rollback();
240✔
2257
    }
240✔
2258

168✔
2259
    return true;
336✔
2260
}
336✔
2261

2262
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2263
{
3,238✔
2264
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,238✔
2265
                 client_file_ident.salt); // Throws
3,238✔
2266

1,532✔
2267
    // Ignore the message if the deactivation process has been initiated,
1,532✔
2268
    // because in that case, the associated Realm and SessionWrapper must
1,532✔
2269
    // not be accessed any longer.
1,532✔
2270
    if (m_state != Active)
3,238✔
2271
        return Status::OK(); // Success
20✔
2272

1,526✔
2273
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,218✔
2274
                               !m_unbound_message_received);
3,218✔
2275
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,218✔
2276
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2277
    }
×
2278
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,218✔
2279
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2280
    }
×
2281
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,218✔
2282
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2283
    }
×
2284

1,526✔
2285
    m_client_file_ident = client_file_ident;
3,218✔
2286

1,526✔
2287
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,218✔
2288
        // Ready to send the IDENT message
2289
        ensure_enlisted_to_send(); // Throws
×
2290
        return Status::OK();       // Success
×
2291
    }
×
2292

1,526✔
2293
    // if a client reset happens, it will take care of setting the file ident
1,526✔
2294
    // and if not, we do it here
1,526✔
2295
    bool did_client_reset = false;
3,218✔
2296
    try {
3,218✔
2297
        did_client_reset = client_reset_if_needed();
3,218✔
2298
    }
3,218✔
2299
    catch (const std::exception& e) {
1,560✔
2300
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
68✔
2301
        logger.error(err_msg.c_str());
68✔
2302
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
68✔
2303
        suspend(err_info);
68✔
2304
        return Status::OK();
68✔
2305
    }
68✔
2306
    if (!did_client_reset) {
3,150✔
2307
        get_history().set_client_file_ident(client_file_ident,
2,882✔
2308
                                            m_fix_up_object_ids); // Throws
2,882✔
2309
        m_progress.download.last_integrated_client_version = 0;
2,882✔
2310
        m_progress.upload.client_version = 0;
2,882✔
2311
        m_last_version_selected_for_upload = 0;
2,882✔
2312
    }
2,882✔
2313

1,492✔
2314
    // Ready to send the IDENT message
1,492✔
2315
    ensure_enlisted_to_send(); // Throws
3,150✔
2316
    return Status::OK();       // Success
3,150✔
2317
}
3,150✔
2318

2319
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2320
                                         DownloadBatchState batch_state, int64_t query_version,
2321
                                         const ReceivedChangesets& received_changesets)
2322
{
45,758✔
2323
    REALM_ASSERT_EX(query_version >= 0, query_version);
45,758✔
2324
    // Ignore the message if the deactivation process has been initiated,
23,700✔
2325
    // because in that case, the associated Realm and SessionWrapper must
23,700✔
2326
    // not be accessed any longer.
23,700✔
2327
    if (m_state != Active)
45,758✔
2328
        return Status::OK();
512✔
2329

23,398✔
2330
    if (is_steady_state_download_message(batch_state, query_version)) {
45,246✔
2331
        batch_state = DownloadBatchState::SteadyState;
43,598✔
2332
    }
43,598✔
2333

23,398✔
2334
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
45,246✔
2335
                 "latest_server_version=%3, latest_server_version_salt=%4, "
45,246✔
2336
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
45,246✔
2337
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
45,246✔
2338
                 progress.download.server_version, progress.download.last_integrated_client_version,
45,246✔
2339
                 progress.latest_server_version.version, progress.latest_server_version.salt,
45,246✔
2340
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
45,246✔
2341
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
45,246✔
2342

23,398✔
2343
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,398✔
2344
    // changeset over and over again.
23,398✔
2345
    if (m_client_error) {
45,246✔
2346
        logger.debug("Ignoring download message because the client detected an integration error");
×
2347
        return Status::OK();
×
2348
    }
×
2349

23,398✔
2350
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
45,246✔
2351
    if (REALM_UNLIKELY(!legal_at_this_time)) {
45,246✔
2352
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2353
    }
×
2354
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
45,246✔
2355
        logger.error("Bad sync progress received (%1)", status);
×
2356
        return status;
×
2357
    }
×
2358

23,398✔
2359
    version_type server_version = m_progress.download.server_version;
45,246✔
2360
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
45,246✔
2361
    for (const RemoteChangeset& changeset : received_changesets) {
44,462✔
2362
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,570✔
2363
        // version must be increasing, but can stay the same during bootstraps.
22,570✔
2364
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,430✔
2365
                                                         : (changeset.remote_version > server_version);
42,774✔
2366
        // Each server version cannot be greater than the one in the header of the download message.
22,570✔
2367
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,634✔
2368
        if (!good_server_version) {
43,634✔
2369
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2370
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2371
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2372
        }
×
2373
        server_version = changeset.remote_version;
43,634✔
2374
        // Check that per-changeset last integrated client version is "weakly"
22,570✔
2375
        // increasing.
22,570✔
2376
        bool good_client_version =
43,634✔
2377
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,634✔
2378
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,634✔
2379
        if (!good_client_version) {
43,634✔
2380
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2381
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2382
                                 "(%1, %2, %3)",
×
2383
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2384
                                 progress.download.last_integrated_client_version)};
×
2385
        }
×
2386
        last_integrated_client_version = changeset.last_integrated_local_version;
43,634✔
2387
        // Server shouldn't send our own changes, and zero is not a valid client
22,570✔
2388
        // file identifier.
22,570✔
2389
        bool good_file_ident =
43,634✔
2390
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,634✔
2391
        if (!good_file_ident) {
43,634✔
2392
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2393
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2394
                                 changeset.origin_file_ident)};
×
2395
        }
×
2396
    }
43,634✔
2397

23,398✔
2398
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
45,246✔
2399
                                       batch_state, received_changesets.size());
45,246✔
2400
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,246✔
2401
        return Status::OK();
12✔
2402
    }
12✔
2403
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,234✔
2404

23,392✔
2405
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
45,234✔
2406
        clear_resumption_delay_state();
1,640✔
2407
        return Status::OK();
1,640✔
2408
    }
1,640✔
2409

22,572✔
2410
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
43,594✔
2411

22,572✔
2412
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
43,594✔
2413
                                  batch_state, received_changesets.size());
43,594✔
2414
    if (hook_action == SyncClientHookAction::EarlyReturn) {
43,594✔
2415
        return Status::OK();
×
2416
    }
×
2417
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
43,594✔
2418

22,572✔
2419
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,572✔
2420
    // after a retryable session error.
22,572✔
2421
    clear_resumption_delay_state();
43,594✔
2422
    return Status::OK();
43,594✔
2423
}
43,594✔
2424

2425
Status Session::receive_mark_message(request_ident_type request_ident)
2426
{
17,592✔
2427
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,592✔
2428

7,714✔
2429
    // Ignore the message if the deactivation process has been initiated,
7,714✔
2430
    // because in that case, the associated Realm and SessionWrapper must
7,714✔
2431
    // not be accessed any longer.
7,714✔
2432
    if (m_state != Active)
17,592✔
2433
        return Status::OK(); // Success
708✔
2434

7,684✔
2435
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,884✔
2436
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,884✔
2437
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
2✔
2438
    }
2✔
2439
    bool good_request_ident =
16,882✔
2440
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,882✔
2441
    if (REALM_UNLIKELY(!good_request_ident)) {
16,882✔
2442
        return {
×
2443
            ErrorCodes::SyncProtocolInvariantFailed,
×
2444
            util::format(
×
2445
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2446
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2447
    }
×
2448

7,682✔
2449
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,882✔
2450
    m_last_download_mark_received = request_ident;
16,882✔
2451
    check_for_download_completion(); // Throws
16,882✔
2452

7,682✔
2453
    return Status::OK(); // Success
16,882✔
2454
}
16,882✔
2455

2456

2457
// The caller (Connection) must discard the session if the session has become
2458
// deactivated upon return.
2459
Status Session::receive_unbound_message()
2460
{
5,728✔
2461
    logger.debug("Received: UNBOUND");
5,728✔
2462

2,104✔
2463
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
5,728✔
2464
    if (REALM_UNLIKELY(!legal_at_this_time)) {
5,728✔
2465
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2466
    }
×
2467

2,104✔
2468
    // The fact that the UNBIND message has been sent, but an ERROR message has
2,104✔
2469
    // not been received, implies that the deactivation process must have been
2,104✔
2470
    // initiated, so this session must be in the Deactivating state or the session
2,104✔
2471
    // has been suspended because of a client side error.
2,104✔
2472
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
5,728!
2473

2,104✔
2474
    m_unbound_message_received = true;
5,728✔
2475

2,104✔
2476
    // Detect completion of the unbinding process
2,104✔
2477
    if (m_unbind_message_send_complete && m_state == Deactivating) {
5,728✔
2478
        // The deactivation process completes when the unbinding process
2,104✔
2479
        // completes.
2,104✔
2480
        complete_deactivation(); // Throws
5,728✔
2481
        // Life cycle state is now Deactivated
2,104✔
2482
    }
5,728✔
2483

2,104✔
2484
    return Status::OK(); // Success
5,728✔
2485
}
5,728✔
2486

2487

2488
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2489
{
16✔
2490
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2491
    // Ignore the message if the deactivation process has been initiated,
8✔
2492
    // because in that case, the associated Realm and SessionWrapper must
8✔
2493
    // not be accessed any longer.
8✔
2494
    if (m_state == Active) {
16✔
2495
        on_flx_sync_error(query_version, message); // throws
16✔
2496
    }
16✔
2497
    return Status::OK();
16✔
2498
}
16✔
2499

2500
// The caller (Connection) must discard the session if the session has become
2501
// deactivated upon return.
2502
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2503
{
914✔
2504
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
914✔
2505
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
914✔
2506

458✔
2507
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
914✔
2508
    if (REALM_UNLIKELY(!legal_at_this_time)) {
914✔
2509
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2510
    }
×
2511

458✔
2512
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
914✔
2513
    auto status = protocol_error_to_status(protocol_error, info.message);
914✔
2514
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
914✔
2515
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2516
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2517
                             info.raw_error_code)};
×
2518
    }
×
2519

458✔
2520
    // Can't process debug hook actions once the Session is undergoing deactivation, since
458✔
2521
    // the SessionWrapper may not be available
458✔
2522
    if (m_state == Active) {
914✔
2523
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
910✔
2524
        if (debug_action == SyncClientHookAction::EarlyReturn) {
910✔
2525
            return Status::OK();
8✔
2526
        }
8✔
2527
    }
906✔
2528

454✔
2529
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
454✔
2530
    // containing the compensating write has appeared in a download message.
454✔
2531
    if (status == ErrorCodes::SyncCompensatingWrite) {
906✔
2532
        // If the client is not active, the compensating writes will not be processed now, but will be
22✔
2533
        // sent again the next time the client connects
22✔
2534
        if (m_state == Active) {
44✔
2535
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2536
            m_pending_compensating_write_errors.push_back(info);
44✔
2537
        }
44✔
2538
        return Status::OK();
44✔
2539
    }
44✔
2540

432✔
2541
    m_error_message_received = true;
862✔
2542
    suspend(SessionErrorInfo{info, std::move(status)});
862✔
2543
    return Status::OK();
862✔
2544
}
862✔
2545

2546
void Session::suspend(const SessionErrorInfo& info)
2547
{
930✔
2548
    REALM_ASSERT(!m_suspended);
930✔
2549
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
930✔
2550
    logger.debug("Suspended"); // Throws
930✔
2551

466✔
2552
    m_suspended = true;
930✔
2553

466✔
2554
    // Detect completion of the unbinding process
466✔
2555
    if (m_unbind_message_send_complete && m_error_message_received) {
930!
2556
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2✔
2557
        // we received an ERROR message implies that the deactivation process must
2✔
2558
        // have been initiated, so this session must be in the Deactivating state.
2✔
2559
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
2!
2560

2✔
2561
        // The deactivation process completes when the unbinding process
2✔
2562
        // completes.
2✔
2563
        complete_deactivation(); // Throws
2✔
2564
        // Life cycle state is now Deactivated
2✔
2565
    }
2✔
2566

466✔
2567
    // Notify the application of the suspension of the session if the session is
466✔
2568
    // still in the Active state
466✔
2569
    if (m_state == Active) {
930✔
2570
        m_conn.one_less_active_unsuspended_session(); // Throws
926✔
2571
        on_suspended(info);                           // Throws
926✔
2572
    }
926✔
2573

466✔
2574
    if (!info.is_fatal) {
930✔
2575
        begin_resumption_delay(info);
396✔
2576
    }
396✔
2577

466✔
2578
    // Ready to send the UNBIND message, if it has not been sent already
466✔
2579
    if (!m_unbind_message_sent)
930✔
2580
        ensure_enlisted_to_send(); // Throws
926✔
2581
}
930✔
2582

2583
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2584
{
44✔
2585
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
44✔
2586
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2587
                           [&](const PendingTestCommand& command) {
44✔
2588
                               return command.id == ident;
44✔
2589
                           });
44✔
2590
    if (it == m_pending_test_commands.end()) {
44✔
2591
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2592
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2593
    }
×
2594

22✔
2595
    it->promise.emplace_value(std::string{body});
44✔
2596
    m_pending_test_commands.erase(it);
44✔
2597

22✔
2598
    return Status::OK();
44✔
2599
}
44✔
2600

2601
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2602
{
396✔
2603
    REALM_ASSERT(!m_try_again_activation_timer);
396✔
2604

202✔
2605
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
396✔
2606
                                  error_info.resumption_delay_interval);
396✔
2607
    auto try_again_interval = m_try_again_delay_info.delay_interval();
396✔
2608
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
396✔
2609
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2610
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2611
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2612
        try_again_interval = std::chrono::milliseconds{1000};
22✔
2613
    }
22✔
2614
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
396✔
2615
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
396✔
2616
        if (status == ErrorCodes::OperationAborted)
396✔
2617
            return;
4✔
2618
        else if (!status.is_ok())
392✔
2619
            throw Exception(status);
×
2620

200✔
2621
        m_try_again_activation_timer.reset();
392✔
2622
        cancel_resumption_delay();
392✔
2623
    });
392✔
2624
}
396✔
2625

2626
void Session::clear_resumption_delay_state()
2627
{
45,228✔
2628
    if (m_try_again_activation_timer) {
45,228✔
2629
        logger.debug("Clearing resumption delay state after successful download");
×
2630
        m_try_again_delay_info.reset();
×
2631
    }
×
2632
}
45,228✔
2633

2634
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2635
{
45,246✔
2636
    const SyncProgress& a = m_progress;
45,246✔
2637
    const SyncProgress& b = progress;
45,246✔
2638
    std::string message;
45,246✔
2639
    if (b.latest_server_version.version < a.latest_server_version.version) {
45,246✔
2640
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2641
                               "session (current: %1, received: %2)",
×
2642
                               a.latest_server_version.version, b.latest_server_version.version);
×
2643
    }
×
2644
    if (b.upload.client_version < a.upload.client_version) {
45,246✔
2645
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2646
                               "throughout a session (current: %1, received: %2)",
×
2647
                               a.upload.client_version, b.upload.client_version);
×
2648
    }
×
2649
    if (b.upload.client_version > m_last_version_available) {
45,246✔
2650
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2651
                               "version in existence (current: %1, received: %2)",
×
2652
                               m_last_version_available, b.upload.client_version);
×
2653
    }
×
2654
    if (b.download.server_version < a.download.server_version) {
45,246✔
2655
        message =
×
2656
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2657
                         a.download.server_version, b.download.server_version);
×
2658
    }
×
2659
    if (b.download.server_version > b.latest_server_version.version) {
45,246✔
2660
        message = util::format(
×
2661
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2662
            b.download.server_version, b.latest_server_version.version);
×
2663
    }
×
2664
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
45,246✔
2665
        message = util::format(
×
2666
            "Last integrated client version on the server at the position in the server's history of the download "
×
2667
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2668
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2669
    }
×
2670
    if (b.download.last_integrated_client_version > b.upload.client_version) {
45,246✔
2671
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2672
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2673
                               "on the server (download: %1, upload: %2)",
×
2674
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2675
    }
×
2676
    if (b.download.server_version < b.upload.last_integrated_server_version) {
45,246✔
2677
        message = util::format(
×
2678
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2679
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2680
            b.download.server_version, b.upload.last_integrated_server_version);
×
2681
    }
×
2682

23,398✔
2683
    if (message.empty()) {
45,246✔
2684
        return Status::OK();
45,246✔
2685
    }
45,246✔
2686
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
2687
}
×
2688

2689

2690
void Session::check_for_upload_completion()
2691
{
77,446✔
2692
    REALM_ASSERT_EX(m_state == Active, m_state);
77,446✔
2693
    if (!m_upload_completion_notification_requested) {
77,446✔
2694
        return;
45,236✔
2695
    }
45,236✔
2696

14,946✔
2697
    // during an ongoing client reset operation, we never upload anything
14,946✔
2698
    if (m_performing_client_reset)
32,210✔
2699
        return;
236✔
2700

14,828✔
2701
    // Upload process must have reached end of history
14,828✔
2702
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
31,974✔
2703
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
31,974✔
2704
    if (!scan_complete)
31,974✔
2705
        return;
6,964✔
2706

12,372✔
2707
    // All uploaded changesets must have been acknowledged by the server
12,372✔
2708
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,010✔
2709
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,010✔
2710
    if (!all_uploads_accepted)
25,010✔
2711
        return;
10,366✔
2712

7,232✔
2713
    m_upload_completion_notification_requested = false;
14,644✔
2714
    on_upload_completion(); // Throws
14,644✔
2715
}
14,644✔
2716

2717

2718
void Session::check_for_download_completion()
2719
{
61,918✔
2720
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
61,918✔
2721
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
61,918✔
2722
    if (m_last_download_mark_received == m_last_triggering_download_mark)
61,918✔
2723
        return;
44,866✔
2724
    if (m_last_download_mark_received < m_target_download_mark)
17,052✔
2725
        return;
312✔
2726
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,740✔
2727
        return;
×
2728
    m_last_triggering_download_mark = m_target_download_mark;
16,740✔
2729
    if (REALM_UNLIKELY(!m_allow_upload)) {
16,740✔
2730
        // Activate the upload process now, and enable immediate reactivation
2,006✔
2731
        // after a subsequent fast reconnect.
2,006✔
2732
        m_allow_upload = true;
5,522✔
2733
        ensure_enlisted_to_send(); // Throws
5,522✔
2734
    }
5,522✔
2735
    on_download_completion(); // Throws
16,740✔
2736
}
16,740✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc