• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jorgen.edelbo_391

13 Aug 2024 07:57AM UTC coverage: 91.091% (-0.02%) from 91.107%
jorgen.edelbo_391

Pull #7826

Evergreen

web-flow
Merge pull request #7979 from realm/test-upgrade-files

Create test file in file-format 24
Pull Request #7826: Merge Next major

103486 of 182216 branches covered (56.79%)

3157 of 3519 new or added lines in 54 files covered. (89.71%)

191 existing lines in 22 files now uncovered.

219971 of 241486 relevant lines covered (91.09%)

6652643.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.78
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/client_reset_operation.hpp>
10
#include <realm/sync/noinst/sync_schema_migration.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <system_error>
24
#include <sstream>
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
2,122✔
48
    m_backoff_state.reset();
2,122✔
49
    scheduled_reset = false;
2,122✔
50
}
2,122✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,936✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,936✔
57
}
3,936✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
6,412✔
62
    if (scheduled_reset) {
6,412✔
63
        reset();
8✔
64
    }
8✔
65

66
    if (!m_backoff_state.triggering_error) {
6,412✔
67
        return std::chrono::milliseconds::zero();
4,874✔
68
    }
4,874✔
69

70
    switch (*m_backoff_state.triggering_error) {
1,538✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,440✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,440✔
77
                return std::chrono::milliseconds::max();
1,082✔
78
            }
1,082✔
79

80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
358✔
81
            return m_backoff_state.delay_interval();
358✔
82
    }
1,538✔
83
}
1,538✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
4,402✔
89
    util::Uri uri(url); // Throws
4,402✔
90
    uri.canonicalize(); // Throws
4,402✔
91
    std::string userinfo, address_2, port_2;
4,402✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,402✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,402✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,402✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,402✔
96
    if (REALM_UNLIKELY(!good))
4,402✔
97
        return false;
×
98
    ProtocolEnvelope protocol_2;
4,402✔
99
    port_type port_3;
4,402✔
100
    if (realm_scheme) {
4,402✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
4,402✔
111
        REALM_ASSERT(ws_scheme);
4,402✔
112
        if (uri.get_scheme() == "ws:") {
4,402✔
113
            protocol_2 = ProtocolEnvelope::ws;
4,322✔
114
            port_3 = 80;
4,322✔
115
        }
4,322✔
116
        else {
80✔
117
            protocol_2 = ProtocolEnvelope::wss;
80✔
118
            port_3 = 443;
80✔
119
        }
80✔
120
    }
4,402✔
121
    if (!port_2.empty()) {
4,402✔
122
        std::istringstream in(port_2);    // Throws
4,298✔
123
        in.imbue(std::locale::classic()); // Throws
4,298✔
124
        in >> port_3;
4,298✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,298✔
126
            return false;
×
127
    }
4,298✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
4,402✔
129

130
    protocol = protocol_2;
4,402✔
131
    address = std::move(address_2);
4,402✔
132
    port = port_3;
4,402✔
133
    path = std::move(path_2);
4,402✔
134
    return true;
4,402✔
135
}
4,402✔
136

137
ClientImpl::ClientImpl(ClientConfig config)
138
    : logger(std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger)))
4,918✔
139
    , m_reconnect_mode{config.reconnect_mode}
4,918✔
140
    , m_connect_timeout{config.connect_timeout}
4,918✔
141
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,918✔
142
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,918✔
143
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,918✔
144
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,918✔
145
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,918✔
146
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,918✔
147
    , m_dry_run{config.dry_run}
4,918✔
148
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,918✔
149
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,918✔
150
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,918✔
151
    , m_socket_provider{std::move(config.socket_provider)}
4,918✔
152
    , m_client_protocol{} // Throws
4,918✔
153
    , m_one_connection_per_session{config.one_connection_per_session}
4,918✔
154
    , m_random{}
4,918✔
155
{
9,974✔
156
    // FIXME: Would be better if seeding was up to the application.
157
    util::seed_prng_nondeterministically(m_random); // Throws
9,974✔
158

159
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,974✔
160
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,974✔
161
                 get_current_protocol_version()); // Throws
9,974✔
162
    logger.info("Platform: %1", util::get_platform_info());
9,974✔
163
    const char* build_mode;
9,974✔
164
#if REALM_DEBUG
9,974✔
165
    build_mode = "Debug";
9,974✔
166
#else
167
    build_mode = "Release";
168
#endif
169
    logger.debug("Build mode: %1", build_mode);
9,974✔
170
    logger.debug("Config param: one_connection_per_session = %1",
9,974✔
171
                 config.one_connection_per_session); // Throws
9,974✔
172
    logger.debug("Config param: connect_timeout = %1 ms",
9,974✔
173
                 config.connect_timeout); // Throws
9,974✔
174
    logger.debug("Config param: connection_linger_time = %1 ms",
9,974✔
175
                 config.connection_linger_time); // Throws
9,974✔
176
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,974✔
177
                 config.ping_keepalive_period); // Throws
9,974✔
178
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,974✔
179
                 config.pong_keepalive_timeout); // Throws
9,974✔
180
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,974✔
181
                 config.fast_reconnect_limit); // Throws
9,974✔
182
    logger.debug("Config param: disable_sync_to_disk = %1",
9,974✔
183
                 config.disable_sync_to_disk); // Throws
9,974✔
184
    logger.debug(
9,974✔
185
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,974✔
186
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,974✔
187
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,974✔
188
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,974✔
189

190
    if (config.reconnect_mode != ReconnectMode::normal) {
9,974✔
191
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
192
                    "never do this in production!");
776✔
193
    }
776✔
194

195
    if (config.dry_run) {
9,974✔
196
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
197
                    "never do this in production!");
×
198
    }
×
199

200
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,974✔
201

202
    if (m_one_connection_per_session) {
9,974✔
203
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
204
                    "never do this in production");
4✔
205
    }
4✔
206

207
    if (config.disable_upload_activation_delay) {
9,974✔
208
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
209
                    "never do this in production");
×
210
    }
×
211

212
    if (config.disable_sync_to_disk) {
9,974✔
213
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
214
                    "never do this in production");
×
215
    }
×
216

217
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,912✔
218
        if (status == ErrorCodes::OperationAborted)
14,912✔
219
            return;
×
220
        else if (!status.is_ok())
14,912✔
221
            throw Exception(status);
×
222
        actualize_and_finalize_session_wrappers(); // Throws
14,912✔
223
    });
14,912✔
224
}
9,974✔
225

226
void ClientImpl::incr_outstanding_posts()
227
{
205,484✔
228
    util::CheckedLockGuard lock(m_drain_mutex);
205,484✔
229
    ++m_outstanding_posts;
205,484✔
230
    m_drained = false;
205,484✔
231
}
205,484✔
232

233
void ClientImpl::decr_outstanding_posts()
234
{
205,482✔
235
    util::CheckedLockGuard lock(m_drain_mutex);
205,482✔
236
    REALM_ASSERT(m_outstanding_posts);
205,482✔
237
    if (--m_outstanding_posts <= 0) {
205,482✔
238
        // Notify must happen with lock held or another thread could destroy
239
        // ClientImpl between when we release the lock and when we call notify
240
        m_drain_cv.notify_all();
18,252✔
241
    }
18,252✔
242
}
205,482✔
243

244
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
245
{
56,672✔
246
    REALM_ASSERT(m_socket_provider);
56,672✔
247
    incr_outstanding_posts();
56,672✔
248
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
56,672✔
249
        auto decr_guard = util::make_scope_exit([&]() noexcept {
56,672✔
250
            decr_outstanding_posts();
56,672✔
251
        });
56,672✔
252
        handler(status);
56,670✔
253
    });
56,670✔
254
}
56,672✔
255

256
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
257
{
130,356✔
258
    REALM_ASSERT(m_socket_provider);
130,356✔
259
    incr_outstanding_posts();
130,356✔
260
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
130,356✔
261
        auto decr_guard = util::make_scope_exit([&]() noexcept {
130,354✔
262
            decr_outstanding_posts();
130,354✔
263
        });
130,354✔
264
        if (status == ErrorCodes::OperationAborted)
130,348✔
265
            return;
×
266
        if (!status.is_ok())
130,348✔
267
            throw Exception(status);
×
268
        handler();
130,348✔
269
    });
130,348✔
270
}
130,356✔
271

272

273
void ClientImpl::drain_connections()
274
{
9,974✔
275
    logger.debug("Draining connections during sync client shutdown");
9,974✔
276
    for (auto& server_slot_pair : m_server_slots) {
9,974✔
277
        auto& server_slot = server_slot_pair.second;
2,738✔
278

279
        if (server_slot.connection) {
2,738✔
280
            auto& conn = server_slot.connection;
2,512✔
281
            conn->force_close();
2,512✔
282
        }
2,512✔
283
        else {
226✔
284
            for (auto& conn_pair : server_slot.alt_connections) {
226✔
UNCOV
285
                conn_pair.second->force_close();
×
UNCOV
286
            }
×
287
        }
226✔
288
    }
2,738✔
289
}
9,974✔
290

291

292
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
293
                                                       SyncSocketProvider::FunctionHandler&& handler)
294
{
18,468✔
295
    REALM_ASSERT(m_socket_provider);
18,468✔
296
    incr_outstanding_posts();
18,468✔
297
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
18,468✔
298
        auto decr_guard = util::make_scope_exit([&]() noexcept {
18,458✔
299
            decr_outstanding_posts();
18,456✔
300
        });
18,456✔
301
        handler(status);
18,454✔
302
    });
18,454✔
303
}
18,468✔
304

305

306
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
307
{
12,818✔
308
    REALM_ASSERT(m_socket_provider);
12,818✔
309
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,818✔
310
}
12,818✔
311

312
Connection::~Connection()
313
{
2,842✔
314
    if (m_websocket_sentinel) {
2,842✔
315
        m_websocket_sentinel->destroyed = true;
×
316
        m_websocket_sentinel.reset();
×
317
    }
×
318
}
2,842✔
319

320
void Connection::activate()
321
{
2,848✔
322
    REALM_ASSERT(m_on_idle);
2,848✔
323
    m_activated = true;
2,848✔
324
    if (m_num_active_sessions == 0)
2,848✔
325
        m_on_idle->trigger();
×
326
    // We cannot in general connect immediately, because a prior failure to
327
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
328
    initiate_reconnect_wait(); // Throws
2,848✔
329
}
2,848✔
330

331

332
void Connection::activate_session(std::unique_ptr<Session> sess)
333
{
10,420✔
334
    REALM_ASSERT(sess);
10,420✔
335
    REALM_ASSERT(&sess->m_conn == this);
10,420✔
336
    REALM_ASSERT(!m_force_closed);
10,420✔
337
    Session& sess_2 = *sess;
10,420✔
338
    session_ident_type ident = sess->m_ident;
10,420✔
339
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,420✔
340
    bool was_inserted = p.second;
10,420✔
341
    REALM_ASSERT(was_inserted);
10,420✔
342
    // Save the session ident to the historical list of session idents
343
    m_session_history.insert(ident);
10,420✔
344
    sess_2.activate(); // Throws
10,420✔
345
    if (m_state == ConnectionState::connected) {
10,420✔
346
        bool fast_reconnect = false;
7,310✔
347
        sess_2.connection_established(fast_reconnect); // Throws
7,310✔
348
    }
7,310✔
349
    ++m_num_active_sessions;
10,420✔
350
}
10,420✔
351

352

353
void Connection::initiate_session_deactivation(Session* sess)
354
{
10,422✔
355
    REALM_ASSERT(sess);
10,422✔
356
    REALM_ASSERT(&sess->m_conn == this);
10,422✔
357
    REALM_ASSERT(m_num_active_sessions);
10,422✔
358
    // Since the client may be waiting for m_num_active_sessions to reach 0
359
    // in stop_and_wait() (on a separate thread), deactivate Session before
360
    // decrementing the num active sessions value.
361
    sess->initiate_deactivation(); // Throws
10,422✔
362
    if (sess->m_state == Session::Deactivated) {
10,422✔
363
        finish_session_deactivation(sess);
944✔
364
    }
944✔
365
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,422✔
366
        if (m_activated && m_state == ConnectionState::disconnected)
4,652✔
367
            m_on_idle->trigger();
374✔
368
    }
4,650✔
369
}
10,422✔
370

371

372
void Connection::cancel_reconnect_delay()
373
{
2,346✔
374
    REALM_ASSERT(m_activated);
2,346✔
375

376
    if (m_reconnect_delay_in_progress) {
2,346✔
377
        if (m_nonzero_reconnect_delay)
2,110✔
378
            logger.detail("Canceling reconnect delay"); // Throws
1,058✔
379

380
        // Cancel the in-progress wait operation by destroying the timer
381
        // object. Destruction is needed in this case, because a new wait
382
        // operation might have to be initiated before the previous one
383
        // completes (its completion handler starts to execute), so the new wait
384
        // operation must be done on a new timer object.
385
        m_reconnect_disconnect_timer.reset();
2,110✔
386
        m_reconnect_delay_in_progress = false;
2,110✔
387
        m_reconnect_info.reset();
2,110✔
388
        initiate_reconnect_wait(); // Throws
2,110✔
389
        return;
2,110✔
390
    }
2,110✔
391

392
    // If we are not disconnected, then we need to make sure the next time we get disconnected
393
    // that we are allowed to re-connect as quickly as possible.
394
    //
395
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
396
    // backoff/delay state before calculating the next delay, unless a PONG message is received
397
    // for the urgent PING message we send below.
398
    //
399
    // If we get a PONG message for the urgent PING message sent below, then the connection is
400
    // healthy and we can calculate the next delay normally.
401
    if (m_state != ConnectionState::disconnected) {
236✔
402
        m_reconnect_info.scheduled_reset = true;
236✔
403
        m_ping_after_scheduled_reset_of_reconnect_info = false;
236✔
404

405
        schedule_urgent_ping(); // Throws
236✔
406
        return;
236✔
407
    }
236✔
408
    // Nothing to do in this case. The next reconnect attemp will be made as
409
    // soon as there are any sessions that are both active and unsuspended.
410
}
236✔
411

412
void Connection::finish_session_deactivation(Session* sess)
413
{
8,048✔
414
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,048✔
415
    auto ident = sess->m_ident;
8,048✔
416
    m_sessions.erase(ident);
8,048✔
417
    m_session_history.erase(ident);
8,048✔
418
}
8,048✔
419

420
void Connection::force_close()
421
{
2,514✔
422
    if (m_force_closed) {
2,514✔
423
        return;
×
424
    }
×
425

426
    m_force_closed = true;
2,514✔
427

428
    if (m_state != ConnectionState::disconnected) {
2,514✔
429
        voluntary_disconnect();
2,482✔
430
    }
2,482✔
431

432
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,514✔
433
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,514✔
434
        m_reconnect_disconnect_timer.reset();
32✔
435
        m_reconnect_delay_in_progress = false;
32✔
436
        m_disconnect_delay_in_progress = false;
32✔
437
    }
32✔
438

439
    // We must copy any session pointers we want to close to a vector because force_closing
440
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
441
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
442
    std::vector<Session*> to_close;
2,514✔
443
    for (auto& session_pair : m_sessions) {
2,514✔
444
        if (session_pair.second->m_state == Session::State::Active) {
102✔
445
            to_close.push_back(session_pair.second.get());
102✔
446
        }
102✔
447
    }
102✔
448

449
    for (auto& sess : to_close) {
2,514✔
450
        sess->force_close();
102✔
451
    }
102✔
452

453
    logger.debug("Force closed idle connection");
2,514✔
454
}
2,514✔
455

456

457
void Connection::websocket_connected_handler(const std::string& protocol)
458
{
3,730✔
459
    if (!protocol.empty()) {
3,730✔
460
        std::string_view expected_prefix =
3,730✔
461
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,730✔
462
        // FIXME: Use std::string_view::begins_with() in C++20.
463
        auto prefix_matches = [&](std::string_view other) {
3,730✔
464
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,730✔
465
        };
3,730✔
466
        if (prefix_matches(expected_prefix)) {
3,730✔
467
            util::MemoryInputStream in;
3,730✔
468
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,730✔
469
            in.imbue(std::locale::classic());
3,730✔
470
            in.unsetf(std::ios_base::skipws);
3,730✔
471
            int value_2 = 0;
3,730✔
472
            in >> value_2;
3,730✔
473
            if (in && in.eof() && value_2 >= 0) {
3,730✔
474
                bool good_version =
3,730✔
475
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,730✔
476
                if (good_version) {
3,730✔
477
                    logger.detail("Negotiated protocol version: %1", value_2);
3,730✔
478
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
479
                    // will provide the appservices connection ID via a log message.
480
                    // TODO: Remove once the server starts sending the connection ID
481
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,730✔
482
                    m_negotiated_protocol_version = value_2;
3,730✔
483
                    handle_connection_established(); // Throws
3,730✔
484
                    return;
3,730✔
485
                }
3,730✔
486
            }
3,730✔
487
        }
3,730✔
488
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
489
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
490
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
491
    }
×
492
    else {
×
493
        close_due_to_client_side_error(
×
494
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
495
            ConnectionTerminationReason::bad_headers_in_http_response);
×
496
    }
×
497
}
3,730✔
498

499

500
bool Connection::websocket_binary_message_received(util::Span<const char> data)
501
{
81,734✔
502
    if (m_force_closed) {
81,734✔
503
        logger.debug("Received binary message after connection was force closed");
×
504
        return false;
×
505
    }
×
506

507
    using sf = SimulatedFailure;
81,734✔
508
    if (sf::check_trigger(sf::sync_client__read_head)) {
81,734✔
509
        close_due_to_client_side_error(
486✔
510
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
486✔
511
            ConnectionTerminationReason::read_or_write_error);
486✔
512
        return bool(m_websocket);
486✔
513
    }
486✔
514

515
    handle_message_received(data);
81,248✔
516
    return bool(m_websocket);
81,248✔
517
}
81,734✔
518

519

520
void Connection::websocket_error_handler()
521
{
774✔
522
    m_websocket_error_received = true;
774✔
523
}
774✔
524

525
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
526
{
894✔
527
    if (m_force_closed) {
894✔
528
        logger.debug("Received websocket close message after connection was force closed");
×
529
        return false;
×
530
    }
×
531
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
894✔
532

533
    switch (error_code) {
894✔
534
        case WebSocketError::websocket_ok:
70✔
535
            break;
70✔
536
        case WebSocketError::websocket_resolve_failed:
4✔
537
            [[fallthrough]];
4✔
538
        case WebSocketError::websocket_connection_failed: {
112✔
539
            SessionErrorInfo error_info(
112✔
540
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
541
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
542
            // to make sure the websocket URL is correct
543
            if (!m_server_endpoint.is_verified) {
112✔
544
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
545
            }
84✔
546
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
547
            break;
112✔
548
        }
4✔
549
        case WebSocketError::websocket_read_error:
640✔
550
            [[fallthrough]];
640✔
551
        case WebSocketError::websocket_write_error: {
640✔
552
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
640✔
553
                                         ConnectionTerminationReason::read_or_write_error);
640✔
554
            break;
640✔
555
        }
640✔
556
        case WebSocketError::websocket_going_away:
✔
557
            [[fallthrough]];
×
558
        case WebSocketError::websocket_protocol_error:
✔
559
            [[fallthrough]];
×
560
        case WebSocketError::websocket_unsupported_data:
✔
561
            [[fallthrough]];
×
562
        case WebSocketError::websocket_invalid_payload_data:
✔
563
            [[fallthrough]];
×
564
        case WebSocketError::websocket_policy_violation:
✔
565
            [[fallthrough]];
×
566
        case WebSocketError::websocket_reserved:
✔
567
            [[fallthrough]];
×
568
        case WebSocketError::websocket_no_status_received:
✔
569
            [[fallthrough]];
×
570
        case WebSocketError::websocket_invalid_extension: {
✔
571
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
572
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
573
            break;
×
574
        }
×
575
        case WebSocketError::websocket_message_too_big: {
4✔
576
            auto message = util::format(
4✔
577
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
578
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
579
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
580
            involuntary_disconnect(std::move(error_info),
4✔
581
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
582
            break;
4✔
583
        }
×
584
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
585
            close_due_to_client_side_error(
10✔
586
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
587
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
588
            break;
10✔
589
        }
×
590
        case WebSocketError::websocket_fatal_error: {
✔
591
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
592
            // been verified, then use a non-fatal error and try to perform a location update.
593
            SessionErrorInfo error_info(
×
594
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
595
                IsFatal{m_server_endpoint.is_verified});
×
596
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
597
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
598
            // to make sure the websocket URL is correct
599
            if (!m_server_endpoint.is_verified) {
×
600
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
601
                reason = ConnectionTerminationReason::connect_operation_failed;
×
602
            }
×
603
            involuntary_disconnect(std::move(error_info), reason);
×
604
            break;
×
605
        }
×
606
        case WebSocketError::websocket_forbidden: {
✔
607
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
608
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
609
            involuntary_disconnect(std::move(error_info),
×
610
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
611
            break;
×
612
        }
×
613
        case WebSocketError::websocket_unauthorized: {
44✔
614
            SessionErrorInfo error_info(
44✔
615
                {ErrorCodes::AuthError,
44✔
616
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
617
                IsFatal{false});
44✔
618
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
619
            involuntary_disconnect(std::move(error_info),
44✔
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
621
            break;
44✔
622
        }
×
623
        case WebSocketError::websocket_moved_permanently: {
14✔
624
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
14✔
625
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
14✔
626
            involuntary_disconnect(std::move(error_info),
14✔
627
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
14✔
628
            break;
14✔
629
        }
×
630
        case WebSocketError::websocket_abnormal_closure: {
✔
631
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
632
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
633
            involuntary_disconnect(std::move(error_info),
×
634
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
635
            break;
×
636
        }
×
637
        case WebSocketError::websocket_internal_server_error:
✔
638
            [[fallthrough]];
×
639
        case WebSocketError::websocket_retry_error: {
✔
640
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
641
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
642
            break;
×
643
        }
×
644
    }
894✔
645

646
    return bool(m_websocket);
894✔
647
}
894✔
648

649
// Guarantees that handle_reconnect_wait() is never called from within the
650
// execution of initiate_reconnect_wait() (no callback reentrance).
651
void Connection::initiate_reconnect_wait()
652
{
8,892✔
653
    REALM_ASSERT(m_activated);
8,892✔
654
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,892✔
655
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,892✔
656

657
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
658
    if (m_force_closed) {
8,892✔
659
        return;
2,482✔
660
    }
2,482✔
661

662
    m_reconnect_delay_in_progress = true;
6,410✔
663
    auto delay = m_reconnect_info.delay_interval();
6,410✔
664
    if (delay == std::chrono::milliseconds::max()) {
6,410✔
665
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,100✔
666
        // Not actually starting a timer corresponds to an infinite wait
667
        m_nonzero_reconnect_delay = true;
1,100✔
668
        return;
1,100✔
669
    }
1,100✔
670

671
    if (delay == std::chrono::milliseconds::zero()) {
5,310✔
672
        m_nonzero_reconnect_delay = false;
4,948✔
673
    }
4,948✔
674
    else {
362✔
675
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
362✔
676
        m_nonzero_reconnect_delay = true;
362✔
677
    }
362✔
678

679
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
680
    // we need it to be cancelable in case the connection is terminated before the timer
681
    // callback is run.
682
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,312✔
683
        // If the operation is aborted, the connection object may have been
684
        // destroyed.
685
        if (status != ErrorCodes::OperationAborted)
5,312✔
686
            handle_reconnect_wait(status); // Throws
3,934✔
687
    });                                    // Throws
5,312✔
688
}
5,310✔
689

690

691
void Connection::handle_reconnect_wait(Status status)
692
{
3,936✔
693
    if (!status.is_ok()) {
3,936✔
694
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
695
        throw Exception(status);
×
696
    }
×
697

698
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,936✔
699
    m_reconnect_delay_in_progress = false;
3,936✔
700

701
    if (m_num_active_unsuspended_sessions > 0)
3,936✔
702
        initiate_reconnect(); // Throws
3,928✔
703
}
3,936✔
704

705
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
706
    explicit WebSocketObserverShim(Connection* conn)
707
        : conn(conn)
1,856✔
708
        , sentinel(conn->m_websocket_sentinel)
1,856✔
709
    {
3,936✔
710
    }
3,936✔
711

712
    Connection* conn;
713
    util::bind_ptr<LifecycleSentinel> sentinel;
714

715
    void websocket_connected_handler(const std::string& protocol) override
716
    {
3,730✔
717
        if (sentinel->destroyed) {
3,730✔
718
            return;
×
719
        }
×
720

721
        return conn->websocket_connected_handler(protocol);
3,730✔
722
    }
3,730✔
723

724
    void websocket_error_handler() override
725
    {
774✔
726
        if (sentinel->destroyed) {
774✔
727
            return;
×
728
        }
×
729

730
        conn->websocket_error_handler();
774✔
731
    }
774✔
732

733
    bool websocket_binary_message_received(util::Span<const char> data) override
734
    {
81,734✔
735
        if (sentinel->destroyed) {
81,734✔
736
            return false;
×
737
        }
×
738

739
        return conn->websocket_binary_message_received(data);
81,734✔
740
    }
81,734✔
741

742
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
743
    {
894✔
744
        if (sentinel->destroyed) {
894✔
745
            return true;
×
746
        }
×
747

748
        return conn->websocket_closed_handler(was_clean, error_code, msg);
894✔
749
    }
894✔
750
};
751

752
void Connection::initiate_reconnect()
753
{
3,936✔
754
    REALM_ASSERT(m_activated);
3,936✔
755

756
    m_state = ConnectionState::connecting;
3,936✔
757
    report_connection_state_change(ConnectionState::connecting); // Throws
3,936✔
758
    if (m_websocket_sentinel) {
3,936✔
759
        m_websocket_sentinel->destroyed = true;
×
760
    }
×
761
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,936✔
762
    m_websocket.reset();
3,936✔
763

764
    // Watchdog
765
    initiate_connect_wait(); // Throws
3,936✔
766

767
    std::vector<std::string> sec_websocket_protocol;
3,936✔
768
    {
3,936✔
769
        auto protocol_prefix =
3,936✔
770
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,936✔
771
        int min = get_oldest_supported_protocol_version();
3,936✔
772
        int max = get_current_protocol_version();
3,936✔
773
        REALM_ASSERT_3(min, <=, max);
3,936✔
774
        // List protocol version in descending order to ensure that the server
775
        // selects the highest possible version.
776
        for (int version = max; version >= min; --version) {
55,082✔
777
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
51,146✔
778
        }
51,146✔
779
    }
3,936✔
780

781
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,936✔
782
                m_server_endpoint.port, m_http_request_path_prefix);
3,936✔
783

784
    m_websocket_error_received = false;
3,936✔
785
    m_websocket =
3,936✔
786
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,936✔
787
                                            WebSocketEndpoint{
3,936✔
788
                                                m_server_endpoint.address,
3,936✔
789
                                                m_server_endpoint.port,
3,936✔
790
                                                get_http_request_path(),
3,936✔
791
                                                std::move(sec_websocket_protocol),
3,936✔
792
                                                is_ssl(m_server_endpoint.envelope),
3,936✔
793
                                                /// DEPRECATED - The following will be removed in a future release
794
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,936✔
795
                                                m_verify_servers_ssl_certificate,
3,936✔
796
                                                m_ssl_trust_certificate_path,
3,936✔
797
                                                m_ssl_verify_callback,
3,936✔
798
                                                m_proxy_config,
3,936✔
799
                                            });
3,936✔
800
}
3,936✔
801

802

803
void Connection::initiate_connect_wait()
804
{
3,934✔
805
    // Deploy a watchdog to enforce an upper bound on the time it can take to
806
    // fully establish the connection (including SSL and WebSocket
807
    // handshakes). Without such a watchdog, connect operations could take very
808
    // long, or even indefinite time.
809
    milliseconds_type time = m_client.m_connect_timeout;
3,934✔
810

811
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,936✔
812
        // If the operation is aborted, the connection object may have been
813
        // destroyed.
814
        if (status != ErrorCodes::OperationAborted)
3,936✔
815
            handle_connect_wait(status); // Throws
×
816
    });                                  // Throws
3,936✔
817
}
3,934✔
818

819

820
void Connection::handle_connect_wait(Status status)
821
{
×
822
    if (!status.is_ok()) {
×
823
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
824
        throw Exception(status);
×
825
    }
×
826

827
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
828
    logger.info("Connect timeout"); // Throws
×
829
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
830
                                IsFatal{false});
×
831
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
832
    // to make sure the websocket URL is correct
833
    if (!m_server_endpoint.is_verified) {
×
834
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
835
    }
×
836
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
837
}
×
838

839

840
void Connection::handle_connection_established()
841
{
3,730✔
842
    // Cancel connect timeout watchdog
843
    m_connect_timer.reset();
3,730✔
844

845
    m_state = ConnectionState::connected;
3,730✔
846
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,730✔
847

848
    milliseconds_type now = monotonic_clock_now();
3,730✔
849
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,730✔
850
    initiate_ping_delay(now);     // Throws
3,730✔
851

852
    bool fast_reconnect = false;
3,730✔
853
    if (m_disconnect_has_occurred) {
3,730✔
854
        milliseconds_type time = now - m_disconnect_time;
1,140✔
855
        if (time <= m_client.m_fast_reconnect_limit)
1,140✔
856
            fast_reconnect = true;
1,140✔
857
    }
1,140✔
858

859
    for (auto& p : m_sessions) {
4,954✔
860
        Session& sess = *p.second;
4,954✔
861
        sess.connection_established(fast_reconnect); // Throws
4,954✔
862
    }
4,954✔
863

864
    report_connection_state_change(ConnectionState::connected); // Throws
3,730✔
865
}
3,730✔
866

867

868
void Connection::schedule_urgent_ping()
869
{
236✔
870
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
236✔
871
    if (m_ping_delay_in_progress) {
236✔
872
        m_heartbeat_timer.reset();
128✔
873
        m_ping_delay_in_progress = false;
128✔
874
        m_minimize_next_ping_delay = true;
128✔
875
        milliseconds_type now = monotonic_clock_now();
128✔
876
        initiate_ping_delay(now); // Throws
128✔
877
        return;
128✔
878
    }
128✔
879
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
108✔
880
    if (!m_send_ping)
108✔
881
        m_minimize_next_ping_delay = true;
108✔
882
}
108✔
883

884

885
void Connection::initiate_ping_delay(milliseconds_type now)
886
{
4,032✔
887
    REALM_ASSERT(!m_ping_delay_in_progress);
4,032✔
888
    REALM_ASSERT(!m_waiting_for_pong);
4,032✔
889
    REALM_ASSERT(!m_send_ping);
4,032✔
890

891
    milliseconds_type delay = 0;
4,032✔
892
    if (!m_minimize_next_ping_delay) {
4,032✔
893
        delay = m_client.m_ping_keepalive_period;
3,892✔
894
        // Make a randomized deduction of up to 10%, or up to 100% if this is
895
        // the first PING message to be sent since the connection was
896
        // established. The purpose of this randomized deduction is to reduce
897
        // the risk of many connections sending PING messages simultaneously to
898
        // the server.
899
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,892✔
900
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,892✔
901
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,892✔
902
        delay -= randomized_deduction;
3,892✔
903
        // Deduct the time spent waiting for PONG
904
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,892✔
905
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,892✔
906
        if (spent_time < delay) {
3,892✔
907
            delay -= spent_time;
3,884✔
908
        }
3,884✔
909
        else {
8✔
910
            delay = 0;
8✔
911
        }
8✔
912
    }
3,892✔
913
    else {
140✔
914
        m_minimize_next_ping_delay = false;
140✔
915
    }
140✔
916

917

918
    m_ping_delay_in_progress = true;
4,032✔
919

920
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
4,032✔
921
        if (status == ErrorCodes::OperationAborted)
4,030✔
922
            return;
3,832✔
923
        else if (!status.is_ok())
198✔
924
            throw Exception(status);
×
925

926
        handle_ping_delay();                                    // Throws
198✔
927
    });                                                         // Throws
198✔
928
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
4,032✔
929
}
4,032✔
930

931

932
void Connection::handle_ping_delay()
933
{
198✔
934
    REALM_ASSERT(m_ping_delay_in_progress);
198✔
935
    m_ping_delay_in_progress = false;
198✔
936
    m_send_ping = true;
198✔
937

938
    initiate_pong_timeout(); // Throws
198✔
939

940
    if (m_state == ConnectionState::connected && !m_sending)
198✔
941
        send_next_message(); // Throws
142✔
942
}
198✔
943

944

945
void Connection::initiate_pong_timeout()
946
{
198✔
947
    REALM_ASSERT(!m_ping_delay_in_progress);
198✔
948
    REALM_ASSERT(!m_waiting_for_pong);
198✔
949
    REALM_ASSERT(m_send_ping);
198✔
950

951
    m_waiting_for_pong = true;
198✔
952
    m_pong_wait_started_at = monotonic_clock_now();
198✔
953

954
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
198✔
955
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
198✔
956
        if (status == ErrorCodes::OperationAborted)
198✔
957
            return;
186✔
958
        else if (!status.is_ok())
12✔
959
            throw Exception(status);
×
960

961
        handle_pong_timeout(); // Throws
12✔
962
    });                        // Throws
12✔
963
}
198✔
964

965

966
void Connection::handle_pong_timeout()
967
{
12✔
968
    REALM_ASSERT(m_waiting_for_pong);
12✔
969
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
970
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
971
                                 ConnectionTerminationReason::pong_timeout);
12✔
972
}
12✔
973

974

975
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
976
{
99,638✔
977
    // Stop sending messages if an websocket error was received.
978
    if (m_websocket_error_received)
99,638✔
979
        return;
×
980

981
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
99,638✔
982
        if (sentinel->destroyed) {
99,562✔
983
            return;
1,482✔
984
        }
1,482✔
985
        if (!status.is_ok()) {
98,080✔
986
            if (status != ErrorCodes::Error::OperationAborted) {
×
987
                // Write errors will be handled by the websocket_write_error_handler() callback
988
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
989
            }
×
990
            return;
×
991
        }
×
992
        handle_write_message(); // Throws
98,080✔
993
    });                         // Throws
98,080✔
994
    m_sending_session = sess;
99,638✔
995
    m_sending = true;
99,638✔
996
}
99,638✔
997

998

999
void Connection::handle_write_message()
1000
{
98,080✔
1001
    m_sending_session->message_sent(); // Throws
98,080✔
1002
    if (m_sending_session->m_state == Session::Deactivated) {
98,080✔
1003
        finish_session_deactivation(m_sending_session);
120✔
1004
    }
120✔
1005
    m_sending_session = nullptr;
98,080✔
1006
    m_sending = false;
98,080✔
1007
    send_next_message(); // Throws
98,080✔
1008
}
98,080✔
1009

1010

1011
void Connection::send_next_message()
1012
{
175,440✔
1013
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
175,440✔
1014
    REALM_ASSERT(!m_sending_session);
175,440✔
1015
    REALM_ASSERT(!m_sending);
175,440✔
1016
    if (m_send_ping) {
175,440✔
1017
        send_ping(); // Throws
186✔
1018
        return;
186✔
1019
    }
186✔
1020
    while (!m_sessions_enlisted_to_send.empty()) {
253,112✔
1021
        // The state of being connected is not supposed to be able to change
1022
        // across this loop thanks to the "no callback reentrance" guarantee
1023
        // provided by Websocket::async_write_text(), and friends.
1024
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
177,872✔
1025

1026
        Session& sess = *m_sessions_enlisted_to_send.front();
177,872✔
1027
        m_sessions_enlisted_to_send.pop_front();
177,872✔
1028
        sess.send_message(); // Throws
177,872✔
1029

1030
        if (sess.m_state == Session::Deactivated) {
177,872✔
1031
            finish_session_deactivation(&sess);
2,972✔
1032
        }
2,972✔
1033

1034
        // An enlisted session may choose to not send a message. In that case,
1035
        // we should pass the opportunity to the next enlisted session.
1036
        if (m_sending)
177,872✔
1037
            break;
100,014✔
1038
    }
177,872✔
1039
}
175,254✔
1040

1041

1042
void Connection::send_ping()
1043
{
186✔
1044
    REALM_ASSERT(!m_ping_delay_in_progress);
186✔
1045
    REALM_ASSERT(m_waiting_for_pong);
186✔
1046
    REALM_ASSERT(m_send_ping);
186✔
1047

1048
    m_send_ping = false;
186✔
1049
    if (m_reconnect_info.scheduled_reset)
186✔
1050
        m_ping_after_scheduled_reset_of_reconnect_info = true;
136✔
1051

1052
    m_last_ping_sent_at = monotonic_clock_now();
186✔
1053
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
186✔
1054
                 m_previous_ping_rtt); // Throws
186✔
1055

1056
    ClientProtocol& protocol = get_client_protocol();
186✔
1057
    OutputBuffer& out = get_output_buffer();
186✔
1058
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
186✔
1059
    initiate_write_ping(out);                                          // Throws
186✔
1060
    m_ping_sent = true;
186✔
1061
}
186✔
1062

1063

1064
void Connection::initiate_write_ping(const OutputBuffer& out)
1065
{
186✔
1066
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
186✔
1067
        if (sentinel->destroyed) {
186✔
1068
            return;
2✔
1069
        }
2✔
1070
        if (!status.is_ok()) {
184✔
1071
            if (status != ErrorCodes::Error::OperationAborted) {
×
1072
                // Write errors will be handled by the websocket_write_error_handler() callback
1073
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1074
            }
×
1075
            return;
×
1076
        }
×
1077
        handle_write_ping(); // Throws
184✔
1078
    });                      // Throws
184✔
1079
    m_sending = true;
186✔
1080
}
186✔
1081

1082

1083
void Connection::handle_write_ping()
1084
{
184✔
1085
    REALM_ASSERT(m_sending);
184✔
1086
    REALM_ASSERT(!m_sending_session);
184✔
1087
    m_sending = false;
184✔
1088
    send_next_message(); // Throws
184✔
1089
}
184✔
1090

1091

1092
void Connection::handle_message_received(util::Span<const char> data)
1093
{
81,248✔
1094
    // parse_message_received() parses the message and calls the proper handler
1095
    // on the Connection object (this).
1096
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,248✔
1097
}
81,248✔
1098

1099

1100
void Connection::initiate_disconnect_wait()
1101
{
4,808✔
1102
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,808✔
1103

1104
    if (m_disconnect_delay_in_progress) {
4,808✔
1105
        m_reconnect_disconnect_timer.reset();
2,228✔
1106
        m_disconnect_delay_in_progress = false;
2,228✔
1107
    }
2,228✔
1108

1109
    milliseconds_type time = m_client.m_connection_linger_time;
4,808✔
1110

1111
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,808✔
1112
        // If the operation is aborted, the connection object may have been
1113
        // destroyed.
1114
        if (status != ErrorCodes::OperationAborted)
4,802✔
1115
            handle_disconnect_wait(status); // Throws
12✔
1116
    });                                     // Throws
4,802✔
1117
    m_disconnect_delay_in_progress = true;
4,808✔
1118
}
4,808✔
1119

1120

1121
void Connection::handle_disconnect_wait(Status status)
1122
{
12✔
1123
    if (!status.is_ok()) {
12✔
1124
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1125
        throw Exception(status);
×
1126
    }
×
1127

1128
    m_disconnect_delay_in_progress = false;
12✔
1129

1130
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1131
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1132
        if (m_client.m_connection_linger_time > 0)
12✔
1133
            logger.detail("Linger time expired"); // Throws
×
1134
        voluntary_disconnect();                   // Throws
12✔
1135
        logger.info("Disconnected");              // Throws
12✔
1136
    }
12✔
1137
}
12✔
1138

1139

1140
void Connection::close_due_to_protocol_error(Status status)
1141
{
16✔
1142
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1143
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1144
    involuntary_disconnect(std::move(error_info),
16✔
1145
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1146
}
16✔
1147

1148

1149
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1150
{
496✔
1151
    logger.info("Connection closed due to error: %1", status); // Throws
496✔
1152

1153
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
496✔
1154
}
496✔
1155

1156

1157
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1158
{
652✔
1159
    logger.info("Connection closed due to transient error: %1", status); // Throws
652✔
1160
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
652✔
1161
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
652✔
1162

1163
    involuntary_disconnect(std::move(error_info), reason); // Throw
652✔
1164
}
652✔
1165

1166

1167
// Close connection due to error discovered on the server-side, and then
1168
// reported to the client by way of a connection-level ERROR message.
1169
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1170
{
68✔
1171
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
68✔
1172
                int(error_code)); // Throws
68✔
1173

1174
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
68✔
1175
                                      : ConnectionTerminationReason::server_said_try_again_later;
68✔
1176
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
68✔
1177
                           reason); // Throws
68✔
1178
}
68✔
1179

1180

1181
void Connection::disconnect(const SessionErrorInfo& info)
1182
{
3,934✔
1183
    // Cancel connect timeout watchdog
1184
    m_connect_timer.reset();
3,934✔
1185

1186
    if (m_state == ConnectionState::connected) {
3,934✔
1187
        m_disconnect_time = monotonic_clock_now();
3,728✔
1188
        m_disconnect_has_occurred = true;
3,728✔
1189

1190
        // Sessions that are in the Deactivating state at this time can be
1191
        // immediately discarded, in part because they are no longer enlisted to
1192
        // send. Such sessions will be taken to the Deactivated state by
1193
        // Session::connection_lost(), and then they will be removed from
1194
        // `m_sessions`.
1195
        auto i = m_sessions.begin(), end = m_sessions.end();
3,728✔
1196
        while (i != end) {
8,470✔
1197
            // Prevent invalidation of the main iterator when erasing elements
1198
            auto j = i++;
4,742✔
1199
            Session& sess = *j->second;
4,742✔
1200
            sess.connection_lost(); // Throws
4,742✔
1201
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,742✔
1202
                m_sessions.erase(j);
2,374✔
1203
        }
4,742✔
1204
    }
3,728✔
1205

1206
    change_state_to_disconnected();
3,934✔
1207

1208
    m_ping_delay_in_progress = false;
3,934✔
1209
    m_waiting_for_pong = false;
3,934✔
1210
    m_send_ping = false;
3,934✔
1211
    m_minimize_next_ping_delay = false;
3,934✔
1212
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,934✔
1213
    m_ping_sent = false;
3,934✔
1214
    m_heartbeat_timer.reset();
3,934✔
1215
    m_previous_ping_rtt = 0;
3,934✔
1216

1217
    m_websocket_sentinel->destroyed = true;
3,934✔
1218
    m_websocket_sentinel.reset();
3,934✔
1219
    m_websocket.reset();
3,934✔
1220
    m_input_body_buffer.reset();
3,934✔
1221
    m_sending_session = nullptr;
3,934✔
1222
    m_sessions_enlisted_to_send.clear();
3,934✔
1223
    m_sending = false;
3,934✔
1224

1225
    if (!m_appservices_coid.empty()) {
3,934✔
1226
        m_appservices_coid.clear();
3,682✔
1227
        logger.base_logger = make_logger(m_ident, std::nullopt, get_client().logger.base_logger);
3,682✔
1228
        for (auto& [ident, sess] : m_sessions) {
3,682✔
1229
            sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
2,306✔
1230
        }
2,306✔
1231
    }
3,682✔
1232

1233
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,934✔
1234
    initiate_reconnect_wait();                                           // Throws
3,934✔
1235
}
3,934✔
1236

1237
bool Connection::is_flx_sync_connection() const noexcept
1238
{
115,984✔
1239
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
115,984✔
1240
}
115,984✔
1241

1242
void Connection::receive_pong(milliseconds_type timestamp)
1243
{
174✔
1244
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
174✔
1245

1246
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
174✔
1247
    if (REALM_UNLIKELY(!legal_at_this_time)) {
174✔
1248
        close_due_to_protocol_error(
×
1249
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1250
        return;
×
1251
    }
×
1252

1253
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
174✔
1254
        close_due_to_protocol_error(
×
1255
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1256
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1257
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1258
        return;
×
1259
    }
×
1260

1261
    milliseconds_type now = monotonic_clock_now();
174✔
1262
    milliseconds_type round_trip_time = now - timestamp;
174✔
1263
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
174✔
1264
    m_previous_ping_rtt = round_trip_time;
174✔
1265

1266
    // If this PONG message is a response to a PING mesage that was sent after
1267
    // the last invocation of cancel_reconnect_delay(), then the connection is
1268
    // still good, and we do not have to skip the next reconnect delay.
1269
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
174✔
1270
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
124✔
1271
        m_ping_after_scheduled_reset_of_reconnect_info = false;
124✔
1272
        m_reconnect_info.scheduled_reset = false;
124✔
1273
    }
124✔
1274

1275
    m_heartbeat_timer.reset();
174✔
1276
    m_waiting_for_pong = false;
174✔
1277

1278
    initiate_ping_delay(now); // Throws
174✔
1279

1280
    if (m_client.m_roundtrip_time_handler)
174✔
1281
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1282
}
174✔
1283

1284
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1285
{
74,892✔
1286
    if (session_ident == 0) {
74,892✔
1287
        return nullptr;
×
1288
    }
×
1289

1290
    auto* sess = get_session(session_ident);
74,892✔
1291
    if (REALM_LIKELY(sess)) {
74,894✔
1292
        return sess;
74,894✔
1293
    }
74,894✔
1294
    // Check the history to see if the message received was for a previous session
1295
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,647✔
1296
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1297
        close_due_to_protocol_error(
×
1298
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1299
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1300
                          session_ident)});
×
1301
    }
×
1302
    else {
2,147,483,647✔
1303
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1304
                     session_ident); // Throws
2,147,483,647✔
1305
    }
2,147,483,647✔
1306
    return nullptr;
2,147,483,647✔
1307
}
74,892✔
1308

1309
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1310
{
968✔
1311
    Session* sess = nullptr;
968✔
1312
    if (session_ident != 0) {
968✔
1313
        sess = find_and_validate_session(session_ident, "ERROR");
896✔
1314
        if (REALM_UNLIKELY(!sess)) {
896✔
1315
            return;
×
1316
        }
×
1317
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
896✔
1318
            close_due_to_protocol_error(std::move(status)); // Throws
×
1319
            return;
×
1320
        }
×
1321

1322
        if (sess->m_state == Session::Deactivated) {
896✔
1323
            finish_session_deactivation(sess);
10✔
1324
        }
10✔
1325
        return;
896✔
1326
    }
896✔
1327

1328
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
72✔
1329
                info.message, info.raw_error_code, info.is_fatal, session_ident,
72✔
1330
                info.server_requests_action); // Throws
72✔
1331

1332
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
72✔
1333
    if (REALM_LIKELY(known_error_code)) {
72✔
1334
        ProtocolError error_code = ProtocolError(info.raw_error_code);
68✔
1335
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
68✔
1336
            close_due_to_server_side_error(error_code, info); // Throws
68✔
1337
            return;
68✔
1338
        }
68✔
1339
        close_due_to_protocol_error(
×
1340
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1341
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1342
                          info.raw_error_code)});
×
1343
    }
×
1344
    else {
4✔
1345
        close_due_to_protocol_error(
4✔
1346
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1347
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1348
    }
4✔
1349
}
72✔
1350

1351

1352
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1353
                                             session_ident_type session_ident)
1354
{
20✔
1355
    if (session_ident == 0) {
20✔
1356
        return close_due_to_protocol_error(
×
1357
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1358
    }
×
1359

1360
    if (!is_flx_sync_connection()) {
20✔
1361
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1362
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1363
    }
×
1364

1365
    if (Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR")) {
20✔
1366
        sess->receive_query_error_message(raw_error_code, message, query_version);
20✔
1367
    }
20✔
1368
}
20✔
1369

1370

1371
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1372
{
3,632✔
1373
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,632✔
1374
    if (REALM_UNLIKELY(!sess)) {
3,632✔
1375
        return;
×
1376
    }
×
1377

1378
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,632✔
1379
        close_due_to_protocol_error(std::move(status)); // Throws
×
1380
}
3,632✔
1381

1382
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1383
{
49,234✔
1384
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
49,234✔
1385
    if (REALM_UNLIKELY(!sess)) {
49,234✔
1386
        return;
×
1387
    }
×
1388

1389
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
49,234✔
1390
        close_due_to_protocol_error(std::move(status));
4✔
1391
    }
4✔
1392
}
49,234✔
1393

1394
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1395
{
17,044✔
1396
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,044✔
1397
    if (REALM_UNLIKELY(!sess)) {
17,044✔
1398
        return;
×
1399
    }
×
1400

1401
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,044✔
1402
        close_due_to_protocol_error(std::move(status)); // Throws
8✔
1403
}
17,044✔
1404

1405

1406
void Connection::receive_unbound_message(session_ident_type session_ident)
1407
{
4,002✔
1408
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,002✔
1409
    if (REALM_UNLIKELY(!sess)) {
4,002✔
1410
        return;
×
1411
    }
×
1412

1413
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,002✔
1414
        close_due_to_protocol_error(std::move(status)); // Throws
×
1415
        return;
×
1416
    }
×
1417

1418
    if (sess->m_state == Session::Deactivated) {
4,002✔
1419
        finish_session_deactivation(sess);
4,002✔
1420
    }
4,002✔
1421
}
4,002✔
1422

1423

1424
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1425
                                               std::string_view body)
1426
{
64✔
1427
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
64✔
1428
    if (REALM_UNLIKELY(!sess)) {
64✔
1429
        return;
×
1430
    }
×
1431

1432
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
64✔
1433
        close_due_to_protocol_error(std::move(status));
×
1434
    }
×
1435
}
64✔
1436

1437

1438
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1439
                                            std::string_view message)
1440
{
6,108✔
1441
    if (session_ident != 0) {
6,108✔
1442
        if (auto sess = get_session(session_ident)) {
4,010✔
1443
            sess->logger.log(LogCategory::session, level, "Server log: %1", message);
3,998✔
1444
            return;
3,998✔
1445
        }
3,998✔
1446

1447
        logger.log(util::LogCategory::session, level, "Server log for unknown session %1: %2", session_ident,
12✔
1448
                   message);
12✔
1449
        return;
12✔
1450
    }
4,010✔
1451

1452
    logger.log(level, "Server log: %1", message);
2,098✔
1453
}
2,098✔
1454

1455

1456
void Connection::receive_appservices_request_id(std::string_view coid)
1457
{
5,828✔
1458
    if (coid.empty() || !m_appservices_coid.empty()) {
5,828✔
1459
        return;
2,140✔
1460
    }
2,140✔
1461
    m_appservices_coid = coid;
3,688✔
1462
    logger.log(util::LogCategory::session, util::LogCategory::Level::info,
3,688✔
1463
               "Connected to app services with request id: \"%1\". Further log entries for this connection will be "
3,688✔
1464
               "prefixed with \"Connection[%2:%1]\" instead of \"Connection[%2]\"",
3,688✔
1465
               m_appservices_coid, m_ident);
3,688✔
1466
    logger.base_logger = make_logger(m_ident, m_appservices_coid, get_client().logger.base_logger);
3,688✔
1467

1468
    for (auto& [ident, sess] : m_sessions) {
4,890✔
1469
        sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
4,890✔
1470
    }
4,890✔
1471
}
3,688✔
1472

1473

1474
void Connection::handle_protocol_error(Status status)
1475
{
×
1476
    close_due_to_protocol_error(std::move(status));
×
1477
}
×
1478

1479

1480
// Sessions are guaranteed to be granted the opportunity to send a message in
1481
// the order that they enlist. Note that this is important to ensure
1482
// nonoverlapping communication with the server for consecutive sessions
1483
// associated with the same Realm file.
1484
//
1485
// CAUTION: The specified session may get destroyed before this function
1486
// returns, but only if its Session::send_message() puts it into the Deactivated
1487
// state.
1488
void Connection::enlist_to_send(Session* sess)
1489
{
179,540✔
1490
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
179,540✔
1491
    m_sessions_enlisted_to_send.push_back(sess); // Throws
179,540✔
1492
    if (!m_sending)
179,540✔
1493
        send_next_message(); // Throws
77,036✔
1494
}
179,540✔
1495

1496

1497
std::string Connection::get_active_appservices_connection_id()
1498
{
76✔
1499
    return m_appservices_coid;
76✔
1500
}
76✔
1501

1502
void Session::cancel_resumption_delay()
1503
{
4,594✔
1504
    REALM_ASSERT_EX(m_state == Active, m_state);
4,594✔
1505

1506
    if (!m_suspended)
4,594✔
1507
        return;
4,418✔
1508

1509
    m_suspended = false;
176✔
1510

1511
    logger.debug("Resumed"); // Throws
176✔
1512

1513
    if (unbind_process_complete())
176✔
1514
        initiate_rebind(); // Throws
140✔
1515

1516
    try {
176✔
1517
        process_pending_flx_bootstrap(); // throws
176✔
1518
    }
176✔
1519
    catch (const IntegrationException& error) {
176✔
1520
        on_integration_failure(error);
×
1521
    }
×
1522
    catch (...) {
176✔
1523
        on_integration_failure(IntegrationException(exception_to_status()));
×
1524
    }
×
1525

1526
    m_conn.one_more_active_unsuspended_session(); // Throws
176✔
1527
    if (m_try_again_activation_timer) {
176✔
1528
        m_try_again_activation_timer.reset();
8✔
1529
    }
8✔
1530

1531
    on_resumed(); // Throws
176✔
1532
}
176✔
1533

1534

1535
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1536
                                                 std::vector<ProtocolErrorInfo>* out)
1537
{
22,708✔
1538
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
22,708✔
1539
        return;
22,652✔
1540
    }
22,652✔
1541

1542
#ifdef REALM_DEBUG
56✔
1543
    REALM_ASSERT_DEBUG(
56✔
1544
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
56✔
1545
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
56✔
1546
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
56✔
1547
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
56✔
1548
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
56✔
1549
                       }));
56✔
1550
#endif
56✔
1551

1552
    while (!m_pending_compensating_write_errors.empty() &&
112✔
1553
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
112✔
1554
               changesets.back().version) {
56✔
1555
        auto& cur_error = m_pending_compensating_write_errors.front();
56✔
1556
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
56✔
1557
        out->push_back(std::move(cur_error));
56✔
1558
        m_pending_compensating_write_errors.pop_front();
56✔
1559
    }
56✔
1560
}
56✔
1561

1562

1563
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1564
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1565
                                   DownloadBatchState download_batch_state)
1566
{
44,660✔
1567
    auto& history = get_history();
44,660✔
1568
    if (received_changesets.empty()) {
44,660✔
1569
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,928✔
1570
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1571
                                       "received empty download message that was not the last in batch",
×
1572
                                       ProtocolError::bad_progress);
×
1573
        }
×
1574
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,928✔
1575
        return;
21,928✔
1576
    }
21,928✔
1577

1578
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
22,732✔
1579
    auto transact = get_db()->start_read();
22,732✔
1580
    history.integrate_server_changesets(
22,732✔
1581
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
22,732✔
1582
        [&](const Transaction&, util::Span<Changeset> changesets) {
22,732✔
1583
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
22,708✔
1584
        }); // Throws
22,708✔
1585
    if (received_changesets.size() == 1) {
22,732✔
1586
        logger.debug("1 remote changeset integrated, producing client version %1",
15,400✔
1587
                     version_info.sync_version.version); // Throws
15,400✔
1588
    }
15,400✔
1589
    else {
7,332✔
1590
        logger.debug("%2 remote changesets integrated, producing client version %1",
7,332✔
1591
                     version_info.sync_version.version, received_changesets.size()); // Throws
7,332✔
1592
    }
7,332✔
1593

1594
    for (const auto& pending_error : pending_compensating_write_errors) {
22,732✔
1595
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
56✔
1596
                    pending_error.compensating_write_rejected_client_version,
56✔
1597
                    *pending_error.compensating_write_server_version, pending_error.message);
56✔
1598
        try {
56✔
1599
            on_connection_state_changed(
56✔
1600
                m_conn.get_state(),
56✔
1601
                SessionErrorInfo{pending_error,
56✔
1602
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
56✔
1603
                                                          pending_error.message)});
56✔
1604
        }
56✔
1605
        catch (...) {
56✔
1606
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1607
        }
×
1608
    }
56✔
1609
}
22,732✔
1610

1611

1612
void Session::on_integration_failure(const IntegrationException& error)
1613
{
40✔
1614
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1615
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1616
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1617

1618
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1619
    m_error_to_send = true;
40✔
1620
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1621
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1622
    // Surface the error to the user otherwise is lost.
1623
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1624

1625
    // Since the deactivation process has not been initiated, the UNBIND
1626
    // message cannot have been sent unless an ERROR message was received.
1627
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1628
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1629
        ensure_enlisted_to_send(); // Throws
36✔
1630
    }
36✔
1631
}
40✔
1632

1633
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1634
{
47,028✔
1635
    REALM_ASSERT_EX(m_state == Active, m_state);
47,028✔
1636
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,028✔
1637

1638
    m_download_progress = progress.download;
47,028✔
1639
    m_progress = progress;
47,028✔
1640

1641
    if (progress.upload.client_version > m_upload_progress.client_version)
47,028✔
1642
        m_upload_progress = progress.upload;
584✔
1643

1644
    do_recognize_sync_version(client_version); // Allows upload process to resume
47,028✔
1645

1646
    check_for_download_completion(); // Throws
47,028✔
1647

1648
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1649
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,028✔
1650
        auto& flx_subscription_store = *get_flx_subscription_store();
3,942✔
1651
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,942✔
1652
    }
3,942✔
1653

1654
    // Since the deactivation process has not been initiated, the UNBIND
1655
    // message cannot have been sent unless an ERROR message was received.
1656
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,028✔
1657
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,028✔
1658
        ensure_enlisted_to_send(); // Throws
47,018✔
1659
    }
47,018✔
1660
}
47,028✔
1661

1662

1663
Session::~Session()
1664
{
10,422✔
1665
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1666
}
10,422✔
1667

1668

1669
std::shared_ptr<util::Logger> Session::make_logger(session_ident_type ident,
1670
                                                   std::shared_ptr<util::Logger> base_logger)
1671
{
17,614✔
1672
    auto prefix = util::format("Session[%1]: ", ident);
17,614✔
1673
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix),
17,614✔
1674
                                                std::move(base_logger));
17,614✔
1675
}
17,614✔
1676

1677
void Session::activate()
1678
{
10,422✔
1679
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,422✔
1680

1681
    logger.debug("Activating"); // Throws
10,422✔
1682

1683
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,422✔
1684
        bool file_exists = util::File::exists(get_realm_path());
10,422✔
1685

1686
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,422✔
1687
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,422✔
1688
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,422✔
1689
    }
10,422✔
1690
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,422✔
1691
                 m_client_file_ident.salt); // Throws
10,422✔
1692
    m_upload_progress = m_progress.upload;
10,422✔
1693
    m_download_progress = m_progress.download;
10,422✔
1694
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,422✔
1695
    init_progress_handler();
10,422✔
1696

1697
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,422✔
1698
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,422✔
1699
    logger.debug("progress_download_client_version = %1",
10,422✔
1700
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,422✔
1701
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,422✔
1702
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,422✔
1703

1704
    reset_protocol_state();
10,422✔
1705
    m_state = Active;
10,422✔
1706

1707
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,422✔
1708

1709
    REALM_ASSERT(!m_suspended);
10,422✔
1710
    m_conn.one_more_active_unsuspended_session(); // Throws
10,422✔
1711

1712
    try {
10,422✔
1713
        process_pending_flx_bootstrap(); // throws
10,422✔
1714
    }
10,422✔
1715
    catch (const IntegrationException& error) {
10,422✔
1716
        on_integration_failure(error);
×
1717
    }
×
1718
    catch (...) {
10,422✔
1719
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1720
    }
4✔
1721

1722
    // Checks if there is a pending client reset
1723
    handle_pending_client_reset_acknowledgement();
10,422✔
1724
}
10,420✔
1725

1726

1727
// The caller (Connection) must discard the session if the session has become
1728
// deactivated upon return.
1729
void Session::initiate_deactivation()
1730
{
10,422✔
1731
    REALM_ASSERT_EX(m_state == Active, m_state);
10,422✔
1732

1733
    logger.debug("Initiating deactivation"); // Throws
10,422✔
1734

1735
    m_state = Deactivating;
10,422✔
1736

1737
    if (!m_suspended)
10,422✔
1738
        m_conn.one_less_active_unsuspended_session(); // Throws
9,762✔
1739

1740
    if (m_enlisted_to_send) {
10,422✔
1741
        REALM_ASSERT(!unbind_process_complete());
5,422✔
1742
        return;
5,422✔
1743
    }
5,422✔
1744

1745
    // Deactivate immediately if the BIND message has not yet been sent and the
1746
    // session is not enlisted to send, or if the unbinding process has already
1747
    // completed.
1748
    if (!m_bind_message_sent || unbind_process_complete()) {
5,000✔
1749
        complete_deactivation(); // Throws
944✔
1750
        // Life cycle state is now Deactivated
1751
        return;
944✔
1752
    }
944✔
1753

1754
    // Ready to send the UNBIND message, if it has not already been sent
1755
    if (!m_unbind_message_sent) {
4,056✔
1756
        enlist_to_send(); // Throws
3,850✔
1757
        return;
3,850✔
1758
    }
3,850✔
1759
}
4,056✔
1760

1761

1762
void Session::complete_deactivation()
1763
{
10,422✔
1764
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,422✔
1765
    m_state = Deactivated;
10,422✔
1766

1767
    logger.debug("Deactivation completed"); // Throws
10,422✔
1768
}
10,422✔
1769

1770

1771
// Called by the associated Connection object when this session is granted an
1772
// opportunity to send a message.
1773
//
1774
// The caller (Connection) must discard the session if the session has become
1775
// deactivated upon return.
1776
void Session::send_message()
1777
{
177,874✔
1778
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
177,874✔
1779
    REALM_ASSERT(m_enlisted_to_send);
177,874✔
1780
    m_enlisted_to_send = false;
177,874✔
1781
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
177,874✔
1782
        // Deactivation has been initiated. If the UNBIND message has not been
1783
        // sent yet, there is no point in sending it. Instead, we can let the
1784
        // deactivation process complete.
1785
        if (!m_bind_message_sent) {
9,712✔
1786
            return complete_deactivation(); // Throws
2,972✔
1787
            // Life cycle state is now Deactivated
1788
        }
2,972✔
1789

1790
        // Session life cycle state is Deactivating or the unbinding process has
1791
        // been initiated by a session specific ERROR message
1792
        if (!m_unbind_message_sent)
6,740✔
1793
            send_unbind_message(); // Throws
6,740✔
1794
        return;
6,740✔
1795
    }
9,712✔
1796

1797
    // Session life cycle state is Active and the unbinding process has
1798
    // not been initiated
1799
    REALM_ASSERT(!m_unbind_message_sent);
168,162✔
1800

1801
    if (!m_bind_message_sent)
168,162✔
1802
        return send_bind_message(); // Throws
9,366✔
1803

1804
    // Pending test commands can be sent any time after the BIND message is sent
1805
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
158,796✔
1806
                                                      [](const PendingTestCommand& command) {
158,796✔
1807
                                                          return command.pending;
154✔
1808
                                                      });
154✔
1809
    if (has_pending_test_command) {
158,796✔
1810
        return send_test_command_message();
64✔
1811
    }
64✔
1812

1813
    if (!m_ident_message_sent) {
158,732✔
1814
        if (have_client_file_ident())
7,988✔
1815
            send_ident_message(); // Throws
7,988✔
1816
        return;
7,988✔
1817
    }
7,988✔
1818

1819
    if (m_error_to_send)
150,744✔
1820
        return send_json_error_message(); // Throws
32✔
1821

1822
    // Stop sending upload, mark and query messages when the client detects an error.
1823
    if (m_client_error) {
150,712✔
1824
        return;
12✔
1825
    }
12✔
1826

1827
    if (m_target_download_mark > m_last_download_mark_sent)
150,700✔
1828
        return send_mark_message(); // Throws
17,906✔
1829

1830
    auto is_upload_allowed = [&]() -> bool {
132,802✔
1831
        if (!m_is_flx_sync_session) {
132,802✔
1832
            return true;
111,710✔
1833
        }
111,710✔
1834

1835
        auto migration_store = get_migration_store();
21,092✔
1836
        if (!migration_store) {
21,092✔
1837
            return true;
×
1838
        }
×
1839

1840
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
21,092✔
1841
        if (!sentinel_query_version) {
21,092✔
1842
            return true;
21,060✔
1843
        }
21,060✔
1844

1845
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1846
        return m_last_sent_flx_query_version != *sentinel_query_version;
32✔
1847
    };
21,092✔
1848

1849
    if (!is_upload_allowed()) {
132,794✔
1850
        return;
16✔
1851
    }
16✔
1852

1853
    auto check_pending_flx_version = [&]() -> bool {
132,786✔
1854
        if (!m_is_flx_sync_session) {
132,786✔
1855
            return false;
111,710✔
1856
        }
111,710✔
1857

1858
        if (m_delay_uploads) {
21,076✔
1859
            return false;
2,936✔
1860
        }
2,936✔
1861

1862
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
18,140✔
1863

1864
        if (!m_pending_flx_sub_set) {
18,140✔
1865
            return false;
15,666✔
1866
        }
15,666✔
1867

1868
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1869
        // of a pending subscription
1870
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,474✔
1871
    };
18,140✔
1872

1873
    if (check_pending_flx_version()) {
132,778✔
1874
        return send_query_change_message(); // throws
1,380✔
1875
    }
1,380✔
1876

1877
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
131,398✔
1878
        return send_upload_message(); // Throws
62,374✔
1879
    }
62,374✔
1880
}
131,398✔
1881

1882

1883
void Session::send_bind_message()
1884
{
9,366✔
1885
    REALM_ASSERT_EX(m_state == Active, m_state);
9,366✔
1886

1887
    session_ident_type session_ident = m_ident;
9,366✔
1888
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1889
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1890
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1891
    // to the server.
1892
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
9,366✔
1893
    const bool is_subserver = false;
9,366✔
1894

1895
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,366✔
1896
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,366✔
1897
    OutputBuffer& out = m_conn.get_output_buffer();
9,366✔
1898
    // Discard the token since it's ignored by the server.
1899
    std::string empty_access_token;
9,366✔
1900
    if (m_is_flx_sync_session) {
9,366✔
1901
        nlohmann::json bind_json_data;
1,900✔
1902
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,900✔
1903
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1904
        }
60✔
1905
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,900✔
1906
        auto schema_version = get_schema_version();
1,900✔
1907
        // Send 0 if schema is not versioned.
1908
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,900✔
1909
        if (logger.would_log(util::Logger::Level::debug)) {
1,900✔
1910
            std::string json_data_dump;
1,900✔
1911
            if (!bind_json_data.empty()) {
1,900✔
1912
                json_data_dump = bind_json_data.dump();
1,900✔
1913
            }
1,900✔
1914
            logger.debug(
1,900✔
1915
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,900✔
1916
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,900✔
1917
        }
1,900✔
1918
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,900✔
1919
                                       need_client_file_ident, is_subserver); // Throws
1,900✔
1920
    }
1,900✔
1921
    else {
7,466✔
1922
        std::string server_path = get_virt_path();
7,466✔
1923
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,466✔
1924
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,466✔
1925
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,466✔
1926
                                       need_client_file_ident, is_subserver); // Throws
7,466✔
1927
    }
7,466✔
1928
    m_conn.initiate_write_message(out, this); // Throws
9,366✔
1929

1930
    m_bind_message_sent = true;
9,366✔
1931
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
9,366✔
1932

1933
    // If there is a pending client reset diff, process that when the BIND message has
1934
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1935
    // ready to send the IDENT message if the file identifier pair is already available.
1936
    if (!need_client_file_ident)
9,366✔
1937
        enlist_to_send(); // Throws
5,538✔
1938
}
9,366✔
1939

1940

1941
void Session::send_ident_message()
1942
{
7,988✔
1943
    REALM_ASSERT_EX(m_state == Active, m_state);
7,988✔
1944
    REALM_ASSERT(m_bind_message_sent);
7,988✔
1945
    REALM_ASSERT(!m_unbind_message_sent);
7,988✔
1946
    REALM_ASSERT(have_client_file_ident());
7,988✔
1947

1948
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,988✔
1949
    OutputBuffer& out = m_conn.get_output_buffer();
7,988✔
1950
    session_ident_type session_ident = m_ident;
7,988✔
1951

1952
    if (m_is_flx_sync_session) {
7,988✔
1953
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,804✔
1954
        const auto active_query_body = active_query_set.to_ext_json();
1,804✔
1955
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,804✔
1956
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,804✔
1957
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,804✔
1958
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,804✔
1959
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,804✔
1960
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,804✔
1961
                     active_query_body); // Throws
1,804✔
1962
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,804✔
1963
                                        active_query_set.version(), active_query_body); // Throws
1,804✔
1964
        m_last_sent_flx_query_version = active_query_set.version();
1,804✔
1965
    }
1,804✔
1966
    else {
6,184✔
1967
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,184✔
1968
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,184✔
1969
                     "latest_server_version_salt=%6)",
6,184✔
1970
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,184✔
1971
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,184✔
1972
                     m_progress.latest_server_version.salt);                                  // Throws
6,184✔
1973
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
6,184✔
1974
    }
6,184✔
1975
    m_conn.initiate_write_message(out, this); // Throws
7,988✔
1976

1977
    m_ident_message_sent = true;
7,988✔
1978
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,988✔
1979

1980
    // Other messages may be waiting to be sent
1981
    enlist_to_send(); // Throws
7,988✔
1982
}
7,988✔
1983

1984
void Session::send_query_change_message()
1985
{
1,380✔
1986
    REALM_ASSERT_EX(m_state == Active, m_state);
1,380✔
1987
    REALM_ASSERT(m_ident_message_sent);
1,380✔
1988
    REALM_ASSERT(!m_unbind_message_sent);
1,380✔
1989
    REALM_ASSERT(m_pending_flx_sub_set);
1,380✔
1990
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,380✔
1991

1992
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,380✔
1993
        return;
×
1994
    }
×
1995

1996
    auto sub_store = get_flx_subscription_store();
1,380✔
1997
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,380✔
1998
    auto latest_queries = latest_sub_set.to_ext_json();
1,380✔
1999
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,380✔
2000
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,380✔
2001

2002
    OutputBuffer& out = m_conn.get_output_buffer();
1,380✔
2003
    session_ident_type session_ident = get_ident();
1,380✔
2004
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,380✔
2005
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,380✔
2006
    m_conn.initiate_write_message(out, this);
1,380✔
2007

2008
    m_last_sent_flx_query_version = latest_sub_set.version();
1,380✔
2009

2010
    request_download_completion_notification();
1,380✔
2011
}
1,380✔
2012

2013
void Session::send_upload_message()
2014
{
62,374✔
2015
    REALM_ASSERT_EX(m_state == Active, m_state);
62,374✔
2016
    REALM_ASSERT(m_ident_message_sent);
62,374✔
2017
    REALM_ASSERT(!m_unbind_message_sent);
62,374✔
2018

2019
    if (REALM_UNLIKELY(get_client().is_dry_run()))
62,374✔
2020
        return;
×
2021

2022
    version_type target_upload_version = m_last_version_available;
62,374✔
2023
    if (m_pending_flx_sub_set) {
62,374✔
2024
        REALM_ASSERT(m_is_flx_sync_session);
1,094✔
2025
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,094✔
2026
    }
1,094✔
2027

2028
    bool server_version_to_ack =
62,374✔
2029
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
62,374✔
2030

2031
    std::vector<UploadChangeset> uploadable_changesets;
62,374✔
2032
    version_type locked_server_version = 0;
62,374✔
2033
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
62,374✔
2034
                                             locked_server_version); // Throws
62,374✔
2035

2036
    if (uploadable_changesets.empty()) {
62,374✔
2037
        // Nothing more to upload right now if:
2038
        //  1. We need to limit upload up to some version other than the last client version
2039
        //     available and there are no changes to upload
2040
        //  2. There are no changes to upload and no server version(s) to acknowledge
2041
        if (m_pending_flx_sub_set || !server_version_to_ack) {
32,040✔
2042
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
5,608✔
2043
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
5,608✔
2044
            // Other messages may be waiting to be sent
2045
            return enlist_to_send(); // Throws
5,608✔
2046
        }
5,608✔
2047
    }
32,040✔
2048

2049
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
56,766✔
2050
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
748✔
2051
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
748✔
2052
    }
748✔
2053

2054
    version_type progress_client_version = m_upload_progress.client_version;
56,766✔
2055
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
56,766✔
2056

2057
    if (!upload_messages_allowed()) {
56,766✔
2058
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
604✔
2059
                     "locked_server_version=%3, num_changesets=%4)",
604✔
2060
                     progress_client_version, progress_server_version, locked_server_version,
604✔
2061
                     uploadable_changesets.size()); // Throws
604✔
2062
        // Other messages may be waiting to be sent
2063
        return enlist_to_send(); // Throws
604✔
2064
    }
604✔
2065

2066
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
56,162✔
2067
                 "locked_server_version=%3, num_changesets=%4)",
56,162✔
2068
                 progress_client_version, progress_server_version, locked_server_version,
56,162✔
2069
                 uploadable_changesets.size()); // Throws
56,162✔
2070

2071
    ClientProtocol& protocol = m_conn.get_client_protocol();
56,162✔
2072
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
56,162✔
2073

2074
    for (const UploadChangeset& uc : uploadable_changesets) {
56,162✔
2075
        logger.debug(util::LogCategory::changeset,
43,602✔
2076
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,602✔
2077
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,602✔
2078
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,602✔
2079
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,602✔
2080
        if (logger.would_log(util::Logger::Level::trace)) {
43,602✔
2081
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2082
            if (changeset_data.size() < 1024) {
×
2083
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2084
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2085
            }
×
2086
            else {
×
2087
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2088
                             protocol.compressed_hex_dump(changeset_data));
×
2089
            }
×
2090

2091
#if REALM_DEBUG
×
2092
            ChunkedBinaryInputStream in{changeset_data};
×
2093
            Changeset log;
×
2094
            try {
×
2095
                parse_changeset(in, log);
×
2096
                std::stringstream ss;
×
2097
                log.print(ss);
×
2098
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2099
            }
×
2100
            catch (const BadChangesetError& err) {
×
2101
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2102
            }
×
2103
#endif
×
2104
        }
×
2105

2106
        {
43,602✔
2107
            upload_message_builder.add_changeset(uc.progress.client_version,
43,602✔
2108
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,602✔
2109
                                                 uc.origin_file_ident,
43,602✔
2110
                                                 uc.changeset); // Throws
43,602✔
2111
        }
43,602✔
2112
    }
43,602✔
2113

2114
    int protocol_version = m_conn.get_negotiated_protocol_version();
56,162✔
2115
    OutputBuffer& out = m_conn.get_output_buffer();
56,162✔
2116
    session_ident_type session_ident = get_ident();
56,162✔
2117
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
56,162✔
2118
                                               progress_server_version,
56,162✔
2119
                                               locked_server_version); // Throws
56,162✔
2120
    m_conn.initiate_write_message(out, this);                          // Throws
56,162✔
2121

2122
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
56,162✔
2123

2124
    // Other messages may be waiting to be sent
2125
    enlist_to_send(); // Throws
56,162✔
2126
}
56,162✔
2127

2128

2129
void Session::send_mark_message()
2130
{
17,906✔
2131
    REALM_ASSERT_EX(m_state == Active, m_state);
17,906✔
2132
    REALM_ASSERT(m_ident_message_sent);
17,906✔
2133
    REALM_ASSERT(!m_unbind_message_sent);
17,906✔
2134
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,906✔
2135

2136
    request_ident_type request_ident = m_target_download_mark;
17,906✔
2137
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,906✔
2138

2139
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,906✔
2140
    OutputBuffer& out = m_conn.get_output_buffer();
17,906✔
2141
    session_ident_type session_ident = get_ident();
17,906✔
2142
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,906✔
2143
    m_conn.initiate_write_message(out, this);                      // Throws
17,906✔
2144

2145
    m_last_download_mark_sent = request_ident;
17,906✔
2146

2147
    // Other messages may be waiting to be sent
2148
    enlist_to_send(); // Throws
17,906✔
2149
}
17,906✔
2150

2151

2152
void Session::send_unbind_message()
2153
{
6,740✔
2154
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,740✔
2155
    REALM_ASSERT(m_bind_message_sent);
6,740✔
2156
    REALM_ASSERT(!m_unbind_message_sent);
6,740✔
2157

2158
    logger.debug("Sending: UNBIND"); // Throws
6,740✔
2159

2160
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,740✔
2161
    OutputBuffer& out = m_conn.get_output_buffer();
6,740✔
2162
    session_ident_type session_ident = get_ident();
6,740✔
2163
    protocol.make_unbind_message(out, session_ident); // Throws
6,740✔
2164
    m_conn.initiate_write_message(out, this);         // Throws
6,740✔
2165

2166
    m_unbind_message_sent = true;
6,740✔
2167
}
6,740✔
2168

2169

2170
void Session::send_json_error_message()
2171
{
32✔
2172
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2173
    REALM_ASSERT(m_ident_message_sent);
32✔
2174
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2175
    REALM_ASSERT(m_error_to_send);
32✔
2176
    REALM_ASSERT(m_client_error);
32✔
2177

2178
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2179
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2180
    session_ident_type session_ident = get_ident();
32✔
2181
    auto protocol_error = m_client_error->error_for_server;
32✔
2182

2183
    auto message = util::format("%1", m_client_error->to_status());
32✔
2184
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2185
                session_ident); // Throws
32✔
2186

2187
    nlohmann::json error_body_json;
32✔
2188
    error_body_json["message"] = std::move(message);
32✔
2189
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2190
                                     error_body_json.dump()); // Throws
32✔
2191
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2192

2193
    m_error_to_send = false;
32✔
2194
    enlist_to_send(); // Throws
32✔
2195
}
32✔
2196

2197

2198
void Session::send_test_command_message()
2199
{
64✔
2200
    REALM_ASSERT_EX(m_state == Active, m_state);
64✔
2201

2202
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2203
                           [](const PendingTestCommand& command) {
68✔
2204
                               return command.pending;
68✔
2205
                           });
68✔
2206
    REALM_ASSERT(it != m_pending_test_commands.end());
64✔
2207

2208
    ClientProtocol& protocol = m_conn.get_client_protocol();
64✔
2209
    OutputBuffer& out = m_conn.get_output_buffer();
64✔
2210
    auto session_ident = get_ident();
64✔
2211

2212
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
64✔
2213
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
64✔
2214

2215
    m_conn.initiate_write_message(out, this); // Throws;
64✔
2216
    it->pending = false;
64✔
2217

2218
    enlist_to_send();
64✔
2219
}
64✔
2220

2221
bool Session::client_reset_if_needed()
2222
{
424✔
2223
    // Even if we end up not actually performing a client reset, consume the
2224
    // config to ensure that the resources it holds are released
2225
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2226
    if (!client_reset_config) {
424✔
2227
        return false;
×
2228
    }
×
2229

2230
    // Save a copy of the status and action in case an error/exception occurs
2231
    Status cr_status = client_reset_config->error;
424✔
2232
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2233

2234
    try {
424✔
2235
        // The file ident from the fresh realm will be copied over to the local realm
2236
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2237
                                                            get_flx_subscription_store());
424✔
2238

2239
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2240
        if (!did_reset) {
424✔
2241
            return false;
×
2242
        }
×
2243
    }
424✔
2244
    catch (const std::exception& e) {
424✔
2245
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2246
                                    cr_status, e.what());
80✔
2247
        logger.error(err_msg.c_str());
80✔
2248
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2249
        suspend(err_info);
80✔
2250
        return false;
80✔
2251
    }
80✔
2252

2253
    // The fresh Realm has been used to reset the state
2254
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2255

2256
    // Update the version, file ident and progress info after the client reset diff is done
2257
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2258
    // Print the version/progress information before performing the asserts
2259
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2260
                 m_client_file_ident.salt);                                // Throws
344✔
2261
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2262
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2263
                 m_progress.upload.client_version,
344✔
2264
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2265
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2266
                 m_progress.download.last_integrated_client_version,
344✔
2267
                 m_progress.download.server_version); // Throws
344✔
2268

2269
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2270
                    m_progress.download.last_integrated_client_version);
344✔
2271
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2272

2273
    m_upload_progress = m_progress.upload;
344✔
2274
    m_download_progress = m_progress.download;
344✔
2275
    init_progress_handler();
344✔
2276
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2277
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2278
    // For both, we want to allow uploads again without needing external changes to download first.
2279
    m_delay_uploads = false;
344✔
2280

2281
    // Checks if there is a pending client reset
2282
    handle_pending_client_reset_acknowledgement();
344✔
2283

2284
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2285
    if (auto migration_store = get_migration_store()) {
344✔
2286
        migration_store->complete_migration_or_rollback();
316✔
2287
    }
316✔
2288

2289
    return true;
344✔
2290
}
424✔
2291

2292
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2293
{
3,632✔
2294
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,632✔
2295
                 client_file_ident.salt); // Throws
3,632✔
2296

2297
    // Ignore the message if the deactivation process has been initiated,
2298
    // because in that case, the associated Realm and SessionWrapper must
2299
    // not be accessed any longer.
2300
    if (m_state != Active)
3,632✔
2301
        return Status::OK(); // Success
74✔
2302

2303
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,558✔
2304
                               !m_unbound_message_received);
3,558✔
2305
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,558✔
2306
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2307
    }
×
2308
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,558✔
2309
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2310
    }
×
2311
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,558✔
2312
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2313
    }
×
2314

2315
    m_client_file_ident = client_file_ident;
3,558✔
2316

2317
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,558✔
2318
        // Ready to send the IDENT message
2319
        ensure_enlisted_to_send(); // Throws
×
2320
        return Status::OK();       // Success
×
2321
    }
×
2322

2323
    get_history().set_client_file_ident(client_file_ident,
3,558✔
2324
                                        m_fix_up_object_ids); // Throws
3,558✔
2325
    m_progress.download.last_integrated_client_version = 0;
3,558✔
2326
    m_progress.upload.client_version = 0;
3,558✔
2327

2328
    // Ready to send the IDENT message
2329
    ensure_enlisted_to_send(); // Throws
3,558✔
2330
    return Status::OK();       // Success
3,558✔
2331
}
3,558✔
2332

2333
Status Session::receive_download_message(const DownloadMessage& message)
2334
{
49,236✔
2335
    // Ignore the message if the deactivation process has been initiated,
2336
    // because in that case, the associated Realm and SessionWrapper must
2337
    // not be accessed any longer.
2338
    if (m_state != Active)
49,236✔
2339
        return Status::OK();
592✔
2340

2341
    bool is_flx = m_conn.is_flx_sync_connection();
48,644✔
2342
    int64_t query_version = is_flx ? *message.query_version : 0;
48,644✔
2343

2344
    if (!is_flx || query_version > 0)
48,644✔
2345
        enable_progress_notifications();
46,680✔
2346

2347
    auto&& progress = message.progress;
48,644✔
2348
    if (is_flx) {
48,644✔
2349
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,530✔
2350
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,530✔
2351
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,530✔
2352
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,530✔
2353
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,530✔
2354
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,530✔
2355
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,530✔
2356
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,530✔
2357
                     message.changesets.size()); // Throws
5,530✔
2358
    }
5,530✔
2359
    else {
43,114✔
2360
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,114✔
2361
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,114✔
2362
                     "upload_client_version=%5, upload_server_version=%6, "
43,114✔
2363
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,114✔
2364
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,114✔
2365
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,114✔
2366
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,114✔
2367
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,114✔
2368
    }
43,114✔
2369

2370
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2371
    // changeset over and over again.
2372
    if (m_client_error) {
48,644✔
2373
        logger.debug("Ignoring download message because the client detected an integration error");
×
2374
        return Status::OK();
×
2375
    }
×
2376

2377
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
48,644✔
2378
    if (REALM_UNLIKELY(!legal_at_this_time)) {
48,644✔
2379
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
4✔
2380
    }
4✔
2381
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
48,640✔
2382
        logger.error("Bad sync progress received (%1)", status);
×
2383
        return status;
×
2384
    }
×
2385

2386
    version_type server_version = m_progress.download.server_version;
48,640✔
2387
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
48,640✔
2388
    for (const RemoteChangeset& changeset : message.changesets) {
50,286✔
2389
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2390
        // version must be increasing, but can stay the same during bootstraps.
2391
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
47,526✔
2392
                                                         : (changeset.remote_version > server_version);
47,526✔
2393
        // Each server version cannot be greater than the one in the header of the download message.
2394
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
47,526✔
2395
        if (!good_server_version) {
47,526✔
2396
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2397
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2398
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2399
        }
×
2400
        server_version = changeset.remote_version;
47,526✔
2401

2402
        // Check that per-changeset last integrated client version is "weakly"
2403
        // increasing.
2404
        bool good_client_version =
47,526✔
2405
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
47,526✔
2406
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
47,526✔
2407
        if (!good_client_version) {
47,526✔
2408
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2409
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2410
                                 "(%1, %2, %3)",
×
2411
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2412
                                 progress.download.last_integrated_client_version)};
×
2413
        }
×
2414
        last_integrated_client_version = changeset.last_integrated_local_version;
47,526✔
2415
        // Server shouldn't send our own changes, and zero is not a valid client
2416
        // file identifier.
2417
        bool good_file_ident =
47,526✔
2418
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
47,526✔
2419
        if (!good_file_ident) {
47,526✔
2420
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2421
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2422
                                 changeset.origin_file_ident)};
×
2423
        }
×
2424
    }
47,526✔
2425

2426
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
48,640✔
2427
                                       message.batch_state, message.changesets.size());
48,640✔
2428
    if (hook_action == SyncClientHookAction::EarlyReturn) {
48,640✔
2429
        return Status::OK();
26✔
2430
    }
26✔
2431
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
48,614✔
2432

2433
    if (process_flx_bootstrap_message(message)) {
48,614✔
2434
        clear_resumption_delay_state();
3,954✔
2435
        return Status::OK();
3,954✔
2436
    }
3,954✔
2437

2438
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
44,660✔
2439
                                  message.changesets); // Throws
44,660✔
2440

2441
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
44,660✔
2442
                                  message.batch_state, message.changesets.size());
44,660✔
2443
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,660✔
2444
        return Status::OK();
×
2445
    }
×
2446
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,660✔
2447

2448
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2449
    // after a retryable session error.
2450
    clear_resumption_delay_state();
44,660✔
2451
    return Status::OK();
44,660✔
2452
}
44,660✔
2453

2454
Status Session::receive_mark_message(request_ident_type request_ident)
2455
{
17,044✔
2456
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,044✔
2457

2458
    // Ignore the message if the deactivation process has been initiated,
2459
    // because in that case, the associated Realm and SessionWrapper must
2460
    // not be accessed any longer.
2461
    if (m_state != Active)
17,044✔
2462
        return Status::OK(); // Success
74✔
2463

2464
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,970✔
2465
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,970✔
2466
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
8✔
2467
    }
8✔
2468
    bool good_request_ident =
16,962✔
2469
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,962✔
2470
    if (REALM_UNLIKELY(!good_request_ident)) {
16,962✔
2471
        return {
×
2472
            ErrorCodes::SyncProtocolInvariantFailed,
×
2473
            util::format(
×
2474
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2475
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2476
    }
×
2477

2478
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,962✔
2479
    m_last_download_mark_received = request_ident;
16,962✔
2480
    check_for_download_completion(); // Throws
16,962✔
2481

2482
    return Status::OK(); // Success
16,962✔
2483
}
16,962✔
2484

2485

2486
// The caller (Connection) must discard the session if the session has become
2487
// deactivated upon return.
2488
Status Session::receive_unbound_message()
2489
{
4,002✔
2490
    logger.debug("Received: UNBOUND");
4,002✔
2491

2492
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,002✔
2493
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,002✔
2494
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2495
    }
×
2496

2497
    // The fact that the UNBIND message has been sent, but an ERROR message has
2498
    // not been received, implies that the deactivation process must have been
2499
    // initiated, so this session must be in the Deactivating state or the session
2500
    // has been suspended because of a client side error.
2501
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,002!
2502

2503
    m_unbound_message_received = true;
4,002✔
2504

2505
    // Detect completion of the unbinding process
2506
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,002✔
2507
        // The deactivation process completes when the unbinding process
2508
        // completes.
2509
        complete_deactivation(); // Throws
4,002✔
2510
        // Life cycle state is now Deactivated
2511
    }
4,002✔
2512

2513
    return Status::OK(); // Success
4,002✔
2514
}
4,002✔
2515

2516

2517
void Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2518
{
20✔
2519
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2520
    on_flx_sync_error(query_version, message); // throws
20✔
2521
}
20✔
2522

2523
// The caller (Connection) must discard the session if the session has become
2524
// deactivated upon return.
2525
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2526
{
908✔
2527
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
908✔
2528
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
908✔
2529

2530
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
908✔
2531
    if (REALM_UNLIKELY(!legal_at_this_time)) {
908✔
2532
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2533
    }
×
2534

2535
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
908✔
2536
    auto status = protocol_error_to_status(protocol_error, info.message);
908✔
2537
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
908✔
2538
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2539
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2540
                             info.raw_error_code)};
×
2541
    }
×
2542

2543
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2544
    // the SessionWrapper may not be available
2545
    if (m_state == Active) {
908✔
2546
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
898✔
2547
        if (debug_action == SyncClientHookAction::EarlyReturn) {
898✔
2548
            return Status::OK();
12✔
2549
        }
12✔
2550
    }
898✔
2551

2552
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2553
    // containing the compensating write has appeared in a download message.
2554
    if (status == ErrorCodes::SyncCompensatingWrite) {
896✔
2555
        // If the client is not active, the compensating writes will not be processed now, but will be
2556
        // sent again the next time the client connects
2557
        if (m_state == Active) {
60✔
2558
            REALM_ASSERT(info.compensating_write_server_version.has_value());
60✔
2559
            m_pending_compensating_write_errors.push_back(info);
60✔
2560
        }
60✔
2561
        return Status::OK();
60✔
2562
    }
60✔
2563

2564
    if (protocol_error == ProtocolError::schema_version_changed) {
836✔
2565
        // Enable upload immediately if the session is still active.
2566
        if (m_state == Active) {
70✔
2567
            auto wt = get_db()->start_write();
70✔
2568
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
70✔
2569
            wt->commit();
70✔
2570
            // Notify SyncSession a schema migration is required.
2571
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
70✔
2572
        }
70✔
2573
        // Keep the session active to upload any unsynced changes.
2574
        return Status::OK();
70✔
2575
    }
70✔
2576

2577
    m_error_message_received = true;
766✔
2578
    suspend(SessionErrorInfo{info, std::move(status)});
766✔
2579
    return Status::OK();
766✔
2580
}
836✔
2581

2582
void Session::suspend(const SessionErrorInfo& info)
2583
{
846✔
2584
    REALM_ASSERT(!m_suspended);
846✔
2585
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
846✔
2586
    logger.debug("Suspended"); // Throws
846✔
2587

2588
    m_suspended = true;
846✔
2589

2590
    // Detect completion of the unbinding process
2591
    if (m_unbind_message_send_complete && m_error_message_received) {
846✔
2592
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2593
        // we received an ERROR message implies that the deactivation process must
2594
        // have been initiated, so this session must be in the Deactivating state.
2595
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
10✔
2596

2597
        // The deactivation process completes when the unbinding process
2598
        // completes.
2599
        complete_deactivation(); // Throws
10✔
2600
        // Life cycle state is now Deactivated
2601
    }
10✔
2602

2603
    // Notify the application of the suspension of the session if the session is
2604
    // still in the Active state
2605
    if (m_state == Active) {
846✔
2606
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
836✔
2607
        m_conn.one_less_active_unsuspended_session(); // Throws
836✔
2608
        on_suspended(info);                           // Throws
836✔
2609
    }
836✔
2610

2611
    if (!info.is_fatal) {
846✔
2612
        begin_resumption_delay(info);
182✔
2613
    }
182✔
2614

2615
    // Ready to send the UNBIND message, if it has not been sent already
2616
    if (!m_unbind_message_sent)
846✔
2617
        ensure_enlisted_to_send(); // Throws
836✔
2618
}
846✔
2619

2620
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2621
{
64✔
2622
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
64✔
2623
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2624
                           [&](const PendingTestCommand& command) {
64✔
2625
                               return command.id == ident;
64✔
2626
                           });
64✔
2627
    if (it == m_pending_test_commands.end()) {
64✔
2628
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2629
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2630
    }
×
2631

2632
    it->promise.emplace_value(std::string{body});
64✔
2633
    m_pending_test_commands.erase(it);
64✔
2634

2635
    return Status::OK();
64✔
2636
}
64✔
2637

2638
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2639
{
182✔
2640
    REALM_ASSERT(!m_try_again_activation_timer);
182✔
2641

2642
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
182✔
2643
                                  error_info.resumption_delay_interval);
182✔
2644
    auto try_again_interval = m_try_again_delay_info.delay_interval();
182✔
2645
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
182✔
2646
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2647
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2648
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2649
        try_again_interval = std::chrono::milliseconds{1000};
144✔
2650
    }
144✔
2651
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
182✔
2652
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
182✔
2653
        if (status == ErrorCodes::OperationAborted)
182✔
2654
            return;
26✔
2655
        else if (!status.is_ok())
156✔
2656
            throw Exception(status);
×
2657

2658
        m_try_again_activation_timer.reset();
156✔
2659
        cancel_resumption_delay();
156✔
2660
    });
156✔
2661
}
182✔
2662

2663
void Session::clear_resumption_delay_state()
2664
{
48,614✔
2665
    if (m_try_again_activation_timer) {
48,614✔
2666
        logger.debug("Clearing resumption delay state after successful download");
×
2667
        m_try_again_delay_info.reset();
×
2668
    }
×
2669
}
48,614✔
2670

2671
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2672
{
48,640✔
2673
    const SyncProgress& a = m_progress;
48,640✔
2674
    const SyncProgress& b = progress;
48,640✔
2675
    std::string message;
48,640✔
2676
    if (b.latest_server_version.version < a.latest_server_version.version) {
48,640✔
2677
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2678
                               "session (current: %1, received: %2)",
×
2679
                               a.latest_server_version.version, b.latest_server_version.version);
×
2680
    }
×
2681
    if (b.upload.client_version < a.upload.client_version) {
48,640✔
2682
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2683
                               "throughout a session (current: %1, received: %2)",
×
2684
                               a.upload.client_version, b.upload.client_version);
×
2685
    }
×
2686
    if (b.upload.client_version > m_last_version_available) {
48,640✔
2687
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2688
                               "version in existence (current: %1, received: %2)",
×
2689
                               m_last_version_available, b.upload.client_version);
×
2690
    }
×
2691
    if (b.download.server_version < a.download.server_version) {
48,640✔
2692
        message =
×
2693
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2694
                         a.download.server_version, b.download.server_version);
×
2695
    }
×
2696
    if (b.download.server_version > b.latest_server_version.version) {
48,640✔
2697
        message = util::format(
×
2698
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2699
            b.download.server_version, b.latest_server_version.version);
×
2700
    }
×
2701
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
48,640✔
2702
        message = util::format(
×
2703
            "Last integrated client version on the server at the position in the server's history of the download "
×
2704
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2705
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2706
    }
×
2707
    if (b.download.last_integrated_client_version > b.upload.client_version) {
48,640✔
2708
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2709
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2710
                               "on the server (download: %1, upload: %2)",
×
2711
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2712
    }
×
2713
    if (b.download.server_version < b.upload.last_integrated_server_version) {
48,640✔
2714
        message = util::format(
×
2715
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2716
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2717
            b.download.server_version, b.upload.last_integrated_server_version);
×
2718
    }
×
2719

2720
    if (message.empty()) {
48,640✔
2721
        return Status::OK();
48,640✔
2722
    }
48,640✔
2723
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
2724
}
48,640✔
2725

2726

2727
void Session::check_for_download_completion()
2728
{
63,986✔
2729
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
63,986✔
2730
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
63,986✔
2731
    if (m_last_download_mark_received == m_last_triggering_download_mark)
63,986✔
2732
        return;
46,790✔
2733
    if (m_last_download_mark_received < m_target_download_mark)
17,196✔
2734
        return;
360✔
2735
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,836✔
2736
        return;
×
2737
    m_last_triggering_download_mark = m_target_download_mark;
16,836✔
2738
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,836✔
2739
        // Activate the upload process now, and enable immediate reactivation
2740
        // after a subsequent fast reconnect.
2741
        m_delay_uploads = false;
4,762✔
2742
        ensure_enlisted_to_send(); // Throws
4,762✔
2743
    }
4,762✔
2744
    on_download_completion(); // Throws
16,836✔
2745
}
16,836✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc