• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_478

02 Aug 2024 05:19PM UTC coverage: 91.089% (-0.01%) from 91.1%
thomas.goyne_478

Pull #7944

Evergreen

tgoyne
Only track pending client resets done by the same core version

If the previous attempt at performing a client reset was done with a different
core version then we should retry the client reset as the new version may have
fixed a bug that made the previous attempt fail (or may be a downgrade to a
version before when the bug was introduced). This also simplifies the tracking
as it means that we don't need to be able to read trackers created by different
versions.

This also means that we can freely change the schema of the table, which this
takes advantage of to drop the unused primary key and make the error required,
as we never actually stored null and the code reading it would have crashed if
it encountered a null error.
Pull Request #7944: Only track pending client resets done by the same core version

102704 of 181534 branches covered (56.58%)

138 of 153 new or added lines in 10 files covered. (90.2%)

85 existing lines in 16 files now uncovered.

216717 of 237917 relevant lines covered (91.09%)

5947762.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.34
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/client_reset_operation.hpp>
10
#include <realm/sync/noinst/sync_schema_migration.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <system_error>
24
#include <sstream>
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,910✔
48
    m_backoff_state.reset();
1,910✔
49
    scheduled_reset = false;
1,910✔
50
}
1,910✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,818✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,818✔
57
}
3,818✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
6,084✔
62
    if (scheduled_reset) {
6,084✔
63
        reset();
8✔
64
    }
8✔
65

66
    if (!m_backoff_state.triggering_error) {
6,084✔
67
        return std::chrono::milliseconds::zero();
4,652✔
68
    }
4,652✔
69

70
    switch (*m_backoff_state.triggering_error) {
1,432✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,334✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,334✔
77
                return std::chrono::milliseconds::max();
976✔
78
            }
976✔
79

80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
358✔
81
            return m_backoff_state.delay_interval();
358✔
82
    }
1,432✔
83
}
1,432✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
4,382✔
89
    util::Uri uri(url); // Throws
4,382✔
90
    uri.canonicalize(); // Throws
4,382✔
91
    std::string userinfo, address_2, port_2;
4,382✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,382✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,382✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,382✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,382✔
96
    if (REALM_UNLIKELY(!good))
4,382✔
97
        return false;
×
98
    ProtocolEnvelope protocol_2;
4,382✔
99
    port_type port_3;
4,382✔
100
    if (realm_scheme) {
4,382✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
4,382✔
111
        REALM_ASSERT(ws_scheme);
4,382✔
112
        if (uri.get_scheme() == "ws:") {
4,382✔
113
            protocol_2 = ProtocolEnvelope::ws;
4,302✔
114
            port_3 = 80;
4,302✔
115
        }
4,302✔
116
        else {
80✔
117
            protocol_2 = ProtocolEnvelope::wss;
80✔
118
            port_3 = 443;
80✔
119
        }
80✔
120
    }
4,382✔
121
    if (!port_2.empty()) {
4,382✔
122
        std::istringstream in(port_2);    // Throws
4,278✔
123
        in.imbue(std::locale::classic()); // Throws
4,278✔
124
        in >> port_3;
4,278✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,278✔
126
            return false;
×
127
    }
4,278✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
4,382✔
129

130
    protocol = protocol_2;
4,382✔
131
    address = std::move(address_2);
4,382✔
132
    port = port_3;
4,382✔
133
    path = std::move(path_2);
4,382✔
134
    return true;
4,382✔
135
}
4,382✔
136

137
ClientImpl::ClientImpl(ClientConfig config)
138
    : logger(std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger)))
4,914✔
139
    , m_reconnect_mode{config.reconnect_mode}
4,914✔
140
    , m_connect_timeout{config.connect_timeout}
4,914✔
141
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,914✔
142
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,914✔
143
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,914✔
144
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,914✔
145
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,914✔
146
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,914✔
147
    , m_dry_run{config.dry_run}
4,914✔
148
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,914✔
149
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,914✔
150
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,914✔
151
    , m_socket_provider{std::move(config.socket_provider)}
4,914✔
152
    , m_client_protocol{} // Throws
4,914✔
153
    , m_one_connection_per_session{config.one_connection_per_session}
4,914✔
154
    , m_random{}
4,914✔
155
{
9,966✔
156
    // FIXME: Would be better if seeding was up to the application.
157
    util::seed_prng_nondeterministically(m_random); // Throws
9,966✔
158

159
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,966✔
160
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,966✔
161
                 get_current_protocol_version()); // Throws
9,966✔
162
    logger.info("Platform: %1", util::get_platform_info());
9,966✔
163
    const char* build_mode;
9,966✔
164
#if REALM_DEBUG
9,966✔
165
    build_mode = "Debug";
9,966✔
166
#else
167
    build_mode = "Release";
168
#endif
169
    logger.debug("Build mode: %1", build_mode);
9,966✔
170
    logger.debug("Config param: one_connection_per_session = %1",
9,966✔
171
                 config.one_connection_per_session); // Throws
9,966✔
172
    logger.debug("Config param: connect_timeout = %1 ms",
9,966✔
173
                 config.connect_timeout); // Throws
9,966✔
174
    logger.debug("Config param: connection_linger_time = %1 ms",
9,966✔
175
                 config.connection_linger_time); // Throws
9,966✔
176
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,966✔
177
                 config.ping_keepalive_period); // Throws
9,966✔
178
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,966✔
179
                 config.pong_keepalive_timeout); // Throws
9,966✔
180
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,966✔
181
                 config.fast_reconnect_limit); // Throws
9,966✔
182
    logger.debug("Config param: disable_sync_to_disk = %1",
9,966✔
183
                 config.disable_sync_to_disk); // Throws
9,966✔
184
    logger.debug(
9,966✔
185
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,966✔
186
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,966✔
187
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,966✔
188
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,966✔
189

190
    if (config.reconnect_mode != ReconnectMode::normal) {
9,966✔
191
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
192
                    "never do this in production!");
776✔
193
    }
776✔
194

195
    if (config.dry_run) {
9,966✔
196
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
197
                    "never do this in production!");
×
198
    }
×
199

200
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,966✔
201

202
    if (m_one_connection_per_session) {
9,966✔
203
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
204
                    "never do this in production");
4✔
205
    }
4✔
206

207
    if (config.disable_upload_activation_delay) {
9,966✔
208
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
209
                    "never do this in production");
×
210
    }
×
211

212
    if (config.disable_sync_to_disk) {
9,966✔
213
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
214
                    "never do this in production");
×
215
    }
×
216

217
    m_actualize_and_finalize = create_trigger([this](Status status) {
15,606✔
218
        if (status == ErrorCodes::OperationAborted)
15,606✔
219
            return;
×
220
        else if (!status.is_ok())
15,606✔
221
            throw Exception(status);
×
222
        actualize_and_finalize_session_wrappers(); // Throws
15,606✔
223
    });
15,606✔
224
}
9,966✔
225

226
void ClientImpl::incr_outstanding_posts()
227
{
205,608✔
228
    util::CheckedLockGuard lock(m_drain_mutex);
205,608✔
229
    ++m_outstanding_posts;
205,608✔
230
    m_drained = false;
205,608✔
231
}
205,608✔
232

233
void ClientImpl::decr_outstanding_posts()
234
{
205,618✔
235
    util::CheckedLockGuard lock(m_drain_mutex);
205,618✔
236
    REALM_ASSERT(m_outstanding_posts);
205,618✔
237
    if (--m_outstanding_posts <= 0) {
205,618✔
238
        // Notify must happen with lock held or another thread could destroy
239
        // ClientImpl between when we release the lock and when we call notify
240
        m_drain_cv.notify_all();
18,242✔
241
    }
18,242✔
242
}
205,618✔
243

244
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
245
{
57,338✔
246
    REALM_ASSERT(m_socket_provider);
57,338✔
247
    incr_outstanding_posts();
57,338✔
248
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
57,338✔
249
        auto decr_guard = util::make_scope_exit([&]() noexcept {
57,336✔
250
            decr_outstanding_posts();
57,336✔
251
        });
57,336✔
252
        handler(status);
57,334✔
253
    });
57,334✔
254
}
57,338✔
255

256
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
257
{
130,300✔
258
    REALM_ASSERT(m_socket_provider);
130,300✔
259
    incr_outstanding_posts();
130,300✔
260
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
130,304✔
261
        auto decr_guard = util::make_scope_exit([&]() noexcept {
130,302✔
262
            decr_outstanding_posts();
130,300✔
263
        });
130,300✔
264
        if (status == ErrorCodes::OperationAborted)
130,302✔
265
            return;
×
266
        if (!status.is_ok())
130,302✔
267
            throw Exception(status);
×
268
        handler();
130,302✔
269
    });
130,302✔
270
}
130,300✔
271

272

273
void ClientImpl::drain_connections()
274
{
9,966✔
275
    logger.debug("Draining connections during sync client shutdown");
9,966✔
276
    for (auto& server_slot_pair : m_server_slots) {
9,966✔
277
        auto& server_slot = server_slot_pair.second;
2,730✔
278

279
        if (server_slot.connection) {
2,730✔
280
            auto& conn = server_slot.connection;
2,502✔
281
            conn->force_close();
2,502✔
282
        }
2,502✔
283
        else {
228✔
284
            for (auto& conn_pair : server_slot.alt_connections) {
228✔
285
                conn_pair.second->force_close();
6✔
286
            }
6✔
287
        }
228✔
288
    }
2,730✔
289
}
9,966✔
290

291

292
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
293
                                                       SyncSocketProvider::FunctionHandler&& handler)
294
{
17,980✔
295
    REALM_ASSERT(m_socket_provider);
17,980✔
296
    incr_outstanding_posts();
17,980✔
297
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,980✔
298
        auto decr_guard = util::make_scope_exit([&]() noexcept {
17,982✔
299
            decr_outstanding_posts();
17,982✔
300
        });
17,982✔
301
        handler(status);
17,972✔
302
    });
17,972✔
303
}
17,980✔
304

305

306
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
307
{
12,802✔
308
    REALM_ASSERT(m_socket_provider);
12,802✔
309
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,802✔
310
}
12,802✔
311

312
Connection::~Connection()
313
{
2,832✔
314
    if (m_websocket_sentinel) {
2,832✔
315
        m_websocket_sentinel->destroyed = true;
×
316
        m_websocket_sentinel.reset();
×
317
    }
×
318
}
2,832✔
319

320
void Connection::activate()
321
{
2,836✔
322
    REALM_ASSERT(m_on_idle);
2,836✔
323
    m_activated = true;
2,836✔
324
    if (m_num_active_sessions == 0)
2,836✔
325
        m_on_idle->trigger();
×
326
    // We cannot in general connect immediately, because a prior failure to
327
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
328
    initiate_reconnect_wait(); // Throws
2,836✔
329
}
2,836✔
330

331

332
void Connection::activate_session(std::unique_ptr<Session> sess)
333
{
10,414✔
334
    REALM_ASSERT(sess);
10,414✔
335
    REALM_ASSERT(&sess->m_conn == this);
10,414✔
336
    REALM_ASSERT(!m_force_closed);
10,414✔
337
    Session& sess_2 = *sess;
10,414✔
338
    session_ident_type ident = sess->m_ident;
10,414✔
339
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,414✔
340
    bool was_inserted = p.second;
10,414✔
341
    REALM_ASSERT(was_inserted);
10,414✔
342
    // Save the session ident to the historical list of session idents
343
    m_session_history.insert(ident);
10,414✔
344
    sess_2.activate(); // Throws
10,414✔
345
    if (m_state == ConnectionState::connected) {
10,414✔
346
        bool fast_reconnect = false;
7,258✔
347
        sess_2.connection_established(fast_reconnect); // Throws
7,258✔
348
    }
7,258✔
349
    ++m_num_active_sessions;
10,414✔
350
}
10,414✔
351

352

353
void Connection::initiate_session_deactivation(Session* sess)
354
{
10,410✔
355
    REALM_ASSERT(sess);
10,410✔
356
    REALM_ASSERT(&sess->m_conn == this);
10,410✔
357
    REALM_ASSERT(m_num_active_sessions);
10,410✔
358
    // Since the client may be waiting for m_num_active_sessions to reach 0
359
    // in stop_and_wait() (on a separate thread), deactivate Session before
360
    // decrementing the num active sessions value.
361
    sess->initiate_deactivation(); // Throws
10,410✔
362
    if (sess->m_state == Session::Deactivated) {
10,410✔
363
        finish_session_deactivation(sess);
954✔
364
    }
954✔
365
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,410✔
366
        if (m_activated && m_state == ConnectionState::disconnected)
4,642✔
367
            m_on_idle->trigger();
374✔
368
    }
4,642✔
369
}
10,410✔
370

371

372
void Connection::cancel_reconnect_delay()
373
{
2,134✔
374
    REALM_ASSERT(m_activated);
2,134✔
375

376
    if (m_reconnect_delay_in_progress) {
2,134✔
377
        if (m_nonzero_reconnect_delay)
1,898✔
378
            logger.detail("Canceling reconnect delay"); // Throws
952✔
379

380
        // Cancel the in-progress wait operation by destroying the timer
381
        // object. Destruction is needed in this case, because a new wait
382
        // operation might have to be initiated before the previous one
383
        // completes (its completion handler starts to execute), so the new wait
384
        // operation must be done on a new timer object.
385
        m_reconnect_disconnect_timer.reset();
1,898✔
386
        m_reconnect_delay_in_progress = false;
1,898✔
387
        m_reconnect_info.reset();
1,898✔
388
        initiate_reconnect_wait(); // Throws
1,898✔
389
        return;
1,898✔
390
    }
1,898✔
391

392
    // If we are not disconnected, then we need to make sure the next time we get disconnected
393
    // that we are allowed to re-connect as quickly as possible.
394
    //
395
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
396
    // backoff/delay state before calculating the next delay, unless a PONG message is received
397
    // for the urgent PING message we send below.
398
    //
399
    // If we get a PONG message for the urgent PING message sent below, then the connection is
400
    // healthy and we can calculate the next delay normally.
401
    if (m_state != ConnectionState::disconnected) {
236✔
402
        m_reconnect_info.scheduled_reset = true;
236✔
403
        m_ping_after_scheduled_reset_of_reconnect_info = false;
236✔
404

405
        schedule_urgent_ping(); // Throws
236✔
406
        return;
236✔
407
    }
236✔
408
    // Nothing to do in this case. The next reconnect attemp will be made as
409
    // soon as there are any sessions that are both active and unsuspended.
410
}
236✔
411

412
void Connection::finish_session_deactivation(Session* sess)
413
{
8,486✔
414
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,486✔
415
    auto ident = sess->m_ident;
8,486✔
416
    m_sessions.erase(ident);
8,486✔
417
    m_session_history.erase(ident);
8,486✔
418
}
8,486✔
419

420
void Connection::force_close()
421
{
2,508✔
422
    if (m_force_closed) {
2,508✔
423
        return;
×
424
    }
×
425

426
    m_force_closed = true;
2,508✔
427

428
    if (m_state != ConnectionState::disconnected) {
2,508✔
429
        voluntary_disconnect();
2,470✔
430
    }
2,470✔
431

432
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,508✔
433
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,508✔
434
        m_reconnect_disconnect_timer.reset();
38✔
435
        m_reconnect_delay_in_progress = false;
38✔
436
        m_disconnect_delay_in_progress = false;
38✔
437
    }
38✔
438

439
    // We must copy any session pointers we want to close to a vector because force_closing
440
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
441
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
442
    std::vector<Session*> to_close;
2,508✔
443
    for (auto& session_pair : m_sessions) {
2,508✔
444
        if (session_pair.second->m_state == Session::State::Active) {
102✔
445
            to_close.push_back(session_pair.second.get());
102✔
446
        }
102✔
447
    }
102✔
448

449
    for (auto& sess : to_close) {
2,508✔
450
        sess->force_close();
102✔
451
    }
102✔
452

453
    logger.debug("Force closed idle connection");
2,508✔
454
}
2,508✔
455

456

457
void Connection::websocket_connected_handler(const std::string& protocol)
458
{
3,618✔
459
    if (!protocol.empty()) {
3,618✔
460
        std::string_view expected_prefix =
3,618✔
461
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,618✔
462
        // FIXME: Use std::string_view::begins_with() in C++20.
463
        auto prefix_matches = [&](std::string_view other) {
3,618✔
464
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,618✔
465
        };
3,618✔
466
        if (prefix_matches(expected_prefix)) {
3,618✔
467
            util::MemoryInputStream in;
3,618✔
468
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,618✔
469
            in.imbue(std::locale::classic());
3,618✔
470
            in.unsetf(std::ios_base::skipws);
3,618✔
471
            int value_2 = 0;
3,618✔
472
            in >> value_2;
3,618✔
473
            if (in && in.eof() && value_2 >= 0) {
3,618✔
474
                bool good_version =
3,618✔
475
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,618✔
476
                if (good_version) {
3,618✔
477
                    logger.detail("Negotiated protocol version: %1", value_2);
3,616✔
478
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
479
                    // will provide the appservices connection ID via a log message.
480
                    // TODO: Remove once the server starts sending the connection ID
481
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,616✔
482
                    m_negotiated_protocol_version = value_2;
3,616✔
483
                    handle_connection_established(); // Throws
3,616✔
484
                    return;
3,616✔
485
                }
3,616✔
486
            }
3,618✔
487
        }
3,618✔
488
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
2✔
489
                                        util::format("Bad protocol info from server: '%1'", protocol)},
2✔
490
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
2✔
491
    }
2✔
492
    else {
×
493
        close_due_to_client_side_error(
×
494
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
495
            ConnectionTerminationReason::bad_headers_in_http_response);
×
496
    }
×
497
}
3,618✔
498

499

500
bool Connection::websocket_binary_message_received(util::Span<const char> data)
501
{
81,658✔
502
    if (m_force_closed) {
81,658✔
503
        logger.debug("Received binary message after connection was force closed");
×
504
        return false;
×
505
    }
×
506

507
    using sf = SimulatedFailure;
81,658✔
508
    if (sf::check_trigger(sf::sync_client__read_head)) {
81,658✔
509
        close_due_to_client_side_error(
402✔
510
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
402✔
511
            ConnectionTerminationReason::read_or_write_error);
402✔
512
        return bool(m_websocket);
402✔
513
    }
402✔
514

515
    handle_message_received(data);
81,256✔
516
    return bool(m_websocket);
81,256✔
517
}
81,658✔
518

519

520
void Connection::websocket_error_handler()
521
{
750✔
522
    m_websocket_error_received = true;
750✔
523
}
750✔
524

525
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
526
{
868✔
527
    if (m_force_closed) {
868✔
528
        logger.debug("Received websocket close message after connection was force closed");
×
529
        return false;
×
530
    }
×
531
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
868✔
532

533
    switch (error_code) {
868✔
534
        case WebSocketError::websocket_ok:
68✔
535
            break;
68✔
536
        case WebSocketError::websocket_resolve_failed:
4✔
537
            [[fallthrough]];
4✔
538
        case WebSocketError::websocket_connection_failed: {
112✔
539
            SessionErrorInfo error_info(
112✔
540
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
541
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
542
            // to make sure the websocket URL is correct
543
            if (!m_server_endpoint.is_verified) {
112✔
544
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
545
            }
84✔
546
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
547
            break;
112✔
548
        }
4✔
549
        case WebSocketError::websocket_read_error:
616✔
550
            [[fallthrough]];
616✔
551
        case WebSocketError::websocket_write_error: {
616✔
552
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
616✔
553
                                         ConnectionTerminationReason::read_or_write_error);
616✔
554
            break;
616✔
555
        }
616✔
556
        case WebSocketError::websocket_going_away:
✔
557
            [[fallthrough]];
×
558
        case WebSocketError::websocket_protocol_error:
✔
559
            [[fallthrough]];
×
560
        case WebSocketError::websocket_unsupported_data:
✔
561
            [[fallthrough]];
×
562
        case WebSocketError::websocket_invalid_payload_data:
✔
563
            [[fallthrough]];
×
564
        case WebSocketError::websocket_policy_violation:
✔
565
            [[fallthrough]];
×
566
        case WebSocketError::websocket_reserved:
✔
567
            [[fallthrough]];
×
568
        case WebSocketError::websocket_no_status_received:
✔
569
            [[fallthrough]];
×
570
        case WebSocketError::websocket_invalid_extension: {
✔
571
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
572
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
573
            break;
×
574
        }
×
575
        case WebSocketError::websocket_message_too_big: {
4✔
576
            auto message = util::format(
4✔
577
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
578
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
579
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
580
            involuntary_disconnect(std::move(error_info),
4✔
581
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
582
            break;
4✔
583
        }
×
584
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
585
            close_due_to_client_side_error(
10✔
586
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
587
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
588
            break;
10✔
589
        }
×
590
        case WebSocketError::websocket_fatal_error: {
✔
591
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
592
            // been verified, then use a non-fatal error and try to perform a location update.
593
            SessionErrorInfo error_info(
×
594
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
595
                IsFatal{m_server_endpoint.is_verified});
×
596
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
597
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
598
            // to make sure the websocket URL is correct
599
            if (!m_server_endpoint.is_verified) {
×
600
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
601
                reason = ConnectionTerminationReason::connect_operation_failed;
×
602
            }
×
603
            involuntary_disconnect(std::move(error_info), reason);
×
604
            break;
×
605
        }
×
606
        case WebSocketError::websocket_forbidden: {
✔
607
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
608
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
609
            involuntary_disconnect(std::move(error_info),
×
610
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
611
            break;
×
612
        }
×
613
        case WebSocketError::websocket_unauthorized: {
44✔
614
            SessionErrorInfo error_info(
44✔
615
                {ErrorCodes::AuthError,
44✔
616
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
617
                IsFatal{false});
44✔
618
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
619
            involuntary_disconnect(std::move(error_info),
44✔
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
621
            break;
44✔
622
        }
×
623
        case WebSocketError::websocket_moved_permanently: {
14✔
624
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
14✔
625
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
14✔
626
            involuntary_disconnect(std::move(error_info),
14✔
627
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
14✔
628
            break;
14✔
629
        }
×
630
        case WebSocketError::websocket_abnormal_closure: {
✔
631
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
632
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
633
            involuntary_disconnect(std::move(error_info),
×
634
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
635
            break;
×
636
        }
×
637
        case WebSocketError::websocket_internal_server_error:
✔
638
            [[fallthrough]];
×
639
        case WebSocketError::websocket_retry_error: {
✔
640
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
641
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
642
            break;
×
643
        }
×
644
    }
868✔
645

646
    return bool(m_websocket);
868✔
647
}
868✔
648

649
// Guarantees that handle_reconnect_wait() is never called from within the
650
// execution of initiate_reconnect_wait() (no callback reentrance).
651
void Connection::initiate_reconnect_wait()
652
{
8,550✔
653
    REALM_ASSERT(m_activated);
8,550✔
654
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,550✔
655
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,550✔
656

657
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
658
    if (m_force_closed) {
8,550✔
659
        return;
2,466✔
660
    }
2,466✔
661

662
    m_reconnect_delay_in_progress = true;
6,084✔
663
    auto delay = m_reconnect_info.delay_interval();
6,084✔
664
    if (delay == std::chrono::milliseconds::max()) {
6,084✔
665
        logger.detail("Reconnection delayed indefinitely"); // Throws
994✔
666
        // Not actually starting a timer corresponds to an infinite wait
667
        m_nonzero_reconnect_delay = true;
994✔
668
        return;
994✔
669
    }
994✔
670

671
    if (delay == std::chrono::milliseconds::zero()) {
5,090✔
672
        m_nonzero_reconnect_delay = false;
4,728✔
673
    }
4,728✔
674
    else {
362✔
675
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
362✔
676
        m_nonzero_reconnect_delay = true;
362✔
677
    }
362✔
678

679
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
680
    // we need it to be cancelable in case the connection is terminated before the timer
681
    // callback is run.
682
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,090✔
683
        // If the operation is aborted, the connection object may have been
684
        // destroyed.
685
        if (status != ErrorCodes::OperationAborted)
5,088✔
686
            handle_reconnect_wait(status); // Throws
3,818✔
687
    });                                    // Throws
5,088✔
688
}
5,090✔
689

690

691
void Connection::handle_reconnect_wait(Status status)
692
{
3,818✔
693
    if (!status.is_ok()) {
3,818✔
694
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
695
        throw Exception(status);
×
696
    }
×
697

698
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,818✔
699
    m_reconnect_delay_in_progress = false;
3,818✔
700

701
    if (m_num_active_unsuspended_sessions > 0)
3,818✔
702
        initiate_reconnect(); // Throws
3,810✔
703
}
3,818✔
704

705
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
706
    explicit WebSocketObserverShim(Connection* conn)
707
        : conn(conn)
1,800✔
708
        , sentinel(conn->m_websocket_sentinel)
1,800✔
709
    {
3,820✔
710
    }
3,820✔
711

712
    Connection* conn;
713
    util::bind_ptr<LifecycleSentinel> sentinel;
714

715
    void websocket_connected_handler(const std::string& protocol) override
716
    {
3,618✔
717
        if (sentinel->destroyed) {
3,618✔
718
            return;
×
719
        }
×
720

721
        return conn->websocket_connected_handler(protocol);
3,618✔
722
    }
3,618✔
723

724
    void websocket_error_handler() override
725
    {
750✔
726
        if (sentinel->destroyed) {
750✔
727
            return;
×
728
        }
×
729

730
        conn->websocket_error_handler();
750✔
731
    }
750✔
732

733
    bool websocket_binary_message_received(util::Span<const char> data) override
734
    {
81,658✔
735
        if (sentinel->destroyed) {
81,658✔
736
            return false;
×
737
        }
×
738

739
        return conn->websocket_binary_message_received(data);
81,658✔
740
    }
81,658✔
741

742
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
743
    {
868✔
744
        if (sentinel->destroyed) {
868✔
745
            return true;
×
746
        }
×
747

748
        return conn->websocket_closed_handler(was_clean, error_code, msg);
868✔
749
    }
868✔
750
};
751

752
void Connection::initiate_reconnect()
753
{
3,818✔
754
    REALM_ASSERT(m_activated);
3,818✔
755

756
    m_state = ConnectionState::connecting;
3,818✔
757
    report_connection_state_change(ConnectionState::connecting); // Throws
3,818✔
758
    if (m_websocket_sentinel) {
3,818✔
759
        m_websocket_sentinel->destroyed = true;
×
760
    }
×
761
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,818✔
762
    m_websocket.reset();
3,818✔
763

764
    // Watchdog
765
    initiate_connect_wait(); // Throws
3,818✔
766

767
    std::vector<std::string> sec_websocket_protocol;
3,818✔
768
    {
3,818✔
769
        auto protocol_prefix =
3,818✔
770
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,818✔
771
        int min = get_oldest_supported_protocol_version();
3,818✔
772
        int max = get_current_protocol_version();
3,818✔
773
        REALM_ASSERT_3(min, <=, max);
3,818✔
774
        // List protocol version in descending order to ensure that the server
775
        // selects the highest possible version.
776
        for (int version = max; version >= min; --version) {
53,460✔
777
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
49,642✔
778
        }
49,642✔
779
    }
3,818✔
780

781
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,818✔
782
                m_server_endpoint.port, m_http_request_path_prefix);
3,818✔
783

784
    m_websocket_error_received = false;
3,818✔
785
    m_websocket =
3,818✔
786
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,818✔
787
                                            WebSocketEndpoint{
3,818✔
788
                                                m_server_endpoint.address,
3,818✔
789
                                                m_server_endpoint.port,
3,818✔
790
                                                get_http_request_path(),
3,818✔
791
                                                std::move(sec_websocket_protocol),
3,818✔
792
                                                is_ssl(m_server_endpoint.envelope),
3,818✔
793
                                                /// DEPRECATED - The following will be removed in a future release
794
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,818✔
795
                                                m_verify_servers_ssl_certificate,
3,818✔
796
                                                m_ssl_trust_certificate_path,
3,818✔
797
                                                m_ssl_verify_callback,
3,818✔
798
                                                m_proxy_config,
3,818✔
799
                                            });
3,818✔
800
}
3,818✔
801

802

803
void Connection::initiate_connect_wait()
804
{
3,820✔
805
    // Deploy a watchdog to enforce an upper bound on the time it can take to
806
    // fully establish the connection (including SSL and WebSocket
807
    // handshakes). Without such a watchdog, connect operations could take very
808
    // long, or even indefinite time.
809
    milliseconds_type time = m_client.m_connect_timeout;
3,820✔
810

811
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,820✔
812
        // If the operation is aborted, the connection object may have been
813
        // destroyed.
814
        if (status != ErrorCodes::OperationAborted)
3,820✔
815
            handle_connect_wait(status); // Throws
×
816
    });                                  // Throws
3,820✔
817
}
3,820✔
818

819

820
void Connection::handle_connect_wait(Status status)
821
{
×
822
    if (!status.is_ok()) {
×
823
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
824
        throw Exception(status);
×
825
    }
×
826

827
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
828
    logger.info("Connect timeout"); // Throws
×
829
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
830
                                IsFatal{false});
×
831
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
832
    // to make sure the websocket URL is correct
833
    if (!m_server_endpoint.is_verified) {
×
834
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
835
    }
×
836
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
837
}
×
838

839

840
void Connection::handle_connection_established()
841
{
3,616✔
842
    // Cancel connect timeout watchdog
843
    m_connect_timer.reset();
3,616✔
844

845
    m_state = ConnectionState::connected;
3,616✔
846
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,616✔
847

848
    milliseconds_type now = monotonic_clock_now();
3,616✔
849
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,616✔
850
    initiate_ping_delay(now);     // Throws
3,616✔
851

852
    bool fast_reconnect = false;
3,616✔
853
    if (m_disconnect_has_occurred) {
3,616✔
854
        milliseconds_type time = now - m_disconnect_time;
1,032✔
855
        if (time <= m_client.m_fast_reconnect_limit)
1,032✔
856
            fast_reconnect = true;
1,032✔
857
    }
1,032✔
858

859
    for (auto& p : m_sessions) {
4,772✔
860
        Session& sess = *p.second;
4,772✔
861
        sess.connection_established(fast_reconnect); // Throws
4,772✔
862
    }
4,772✔
863

864
    report_connection_state_change(ConnectionState::connected); // Throws
3,616✔
865
}
3,616✔
866

867

868
void Connection::schedule_urgent_ping()
869
{
236✔
870
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
236✔
871
    if (m_ping_delay_in_progress) {
236✔
872
        m_heartbeat_timer.reset();
136✔
873
        m_ping_delay_in_progress = false;
136✔
874
        m_minimize_next_ping_delay = true;
136✔
875
        milliseconds_type now = monotonic_clock_now();
136✔
876
        initiate_ping_delay(now); // Throws
136✔
877
        return;
136✔
878
    }
136✔
879
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
100✔
880
    if (!m_send_ping)
100✔
881
        m_minimize_next_ping_delay = true;
100✔
882
}
100✔
883

884

885
void Connection::initiate_ping_delay(milliseconds_type now)
886
{
3,918✔
887
    REALM_ASSERT(!m_ping_delay_in_progress);
3,918✔
888
    REALM_ASSERT(!m_waiting_for_pong);
3,918✔
889
    REALM_ASSERT(!m_send_ping);
3,918✔
890

891
    milliseconds_type delay = 0;
3,918✔
892
    if (!m_minimize_next_ping_delay) {
3,918✔
893
        delay = m_client.m_ping_keepalive_period;
3,768✔
894
        // Make a randomized deduction of up to 10%, or up to 100% if this is
895
        // the first PING message to be sent since the connection was
896
        // established. The purpose of this randomized deduction is to reduce
897
        // the risk of many connections sending PING messages simultaneously to
898
        // the server.
899
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,768✔
900
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,768✔
901
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,768✔
902
        delay -= randomized_deduction;
3,768✔
903
        // Deduct the time spent waiting for PONG
904
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,768✔
905
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,768✔
906
        if (spent_time < delay) {
3,768✔
907
            delay -= spent_time;
3,760✔
908
        }
3,760✔
909
        else {
8✔
910
            delay = 0;
8✔
911
        }
8✔
912
    }
3,768✔
913
    else {
150✔
914
        m_minimize_next_ping_delay = false;
150✔
915
    }
150✔
916

917

918
    m_ping_delay_in_progress = true;
3,918✔
919

920
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,918✔
921
        if (status == ErrorCodes::OperationAborted)
3,918✔
922
            return;
3,738✔
923
        else if (!status.is_ok())
180✔
924
            throw Exception(status);
×
925

926
        handle_ping_delay();                                    // Throws
180✔
927
    });                                                         // Throws
180✔
928
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,918✔
929
}
3,918✔
930

931

932
void Connection::handle_ping_delay()
933
{
180✔
934
    REALM_ASSERT(m_ping_delay_in_progress);
180✔
935
    m_ping_delay_in_progress = false;
180✔
936
    m_send_ping = true;
180✔
937

938
    initiate_pong_timeout(); // Throws
180✔
939

940
    if (m_state == ConnectionState::connected && !m_sending)
180✔
941
        send_next_message(); // Throws
126✔
942
}
180✔
943

944

945
void Connection::initiate_pong_timeout()
946
{
180✔
947
    REALM_ASSERT(!m_ping_delay_in_progress);
180✔
948
    REALM_ASSERT(!m_waiting_for_pong);
180✔
949
    REALM_ASSERT(m_send_ping);
180✔
950

951
    m_waiting_for_pong = true;
180✔
952
    m_pong_wait_started_at = monotonic_clock_now();
180✔
953

954
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
180✔
955
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
180✔
956
        if (status == ErrorCodes::OperationAborted)
180✔
957
            return;
168✔
958
        else if (!status.is_ok())
12✔
959
            throw Exception(status);
×
960

961
        handle_pong_timeout(); // Throws
12✔
962
    });                        // Throws
12✔
963
}
180✔
964

965

966
void Connection::handle_pong_timeout()
967
{
12✔
968
    REALM_ASSERT(m_waiting_for_pong);
12✔
969
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
970
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
971
                                 ConnectionTerminationReason::pong_timeout);
12✔
972
}
12✔
973

974

975
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
976
{
97,896✔
977
    // Stop sending messages if an websocket error was received.
978
    if (m_websocket_error_received)
97,896✔
979
        return;
×
980

981
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
97,896✔
982
        if (sentinel->destroyed) {
97,818✔
983
            return;
1,402✔
984
        }
1,402✔
985
        if (!status.is_ok()) {
96,416✔
986
            if (status != ErrorCodes::Error::OperationAborted) {
×
987
                // Write errors will be handled by the websocket_write_error_handler() callback
988
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
989
            }
×
990
            return;
×
991
        }
×
992
        handle_write_message(); // Throws
96,416✔
993
    });                         // Throws
96,416✔
994
    m_sending_session = sess;
97,896✔
995
    m_sending = true;
97,896✔
996
}
97,896✔
997

998

999
void Connection::handle_write_message()
1000
{
96,416✔
1001
    m_sending_session->message_sent(); // Throws
96,416✔
1002
    if (m_sending_session->m_state == Session::Deactivated) {
96,416✔
1003
        finish_session_deactivation(m_sending_session);
124✔
1004
    }
124✔
1005
    m_sending_session = nullptr;
96,416✔
1006
    m_sending = false;
96,416✔
1007
    send_next_message(); // Throws
96,416✔
1008
}
96,416✔
1009

1010

1011
void Connection::send_next_message()
1012
{
172,320✔
1013
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
172,320✔
1014
    REALM_ASSERT(!m_sending_session);
172,320✔
1015
    REALM_ASSERT(!m_sending);
172,320✔
1016
    if (m_send_ping) {
172,320✔
1017
        send_ping(); // Throws
168✔
1018
        return;
168✔
1019
    }
168✔
1020
    while (!m_sessions_enlisted_to_send.empty()) {
249,478✔
1021
        // The state of being connected is not supposed to be able to change
1022
        // across this loop thanks to the "no callback reentrance" guarantee
1023
        // provided by Websocket::async_write_text(), and friends.
1024
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
175,596✔
1025

1026
        Session& sess = *m_sessions_enlisted_to_send.front();
175,596✔
1027
        m_sessions_enlisted_to_send.pop_front();
175,596✔
1028
        sess.send_message(); // Throws
175,596✔
1029

1030
        if (sess.m_state == Session::Deactivated) {
175,596✔
1031
            finish_session_deactivation(&sess);
3,418✔
1032
        }
3,418✔
1033

1034
        // An enlisted session may choose to not send a message. In that case,
1035
        // we should pass the opportunity to the next enlisted session.
1036
        if (m_sending)
175,596✔
1037
            break;
98,270✔
1038
    }
175,596✔
1039
}
172,152✔
1040

1041

1042
void Connection::send_ping()
1043
{
168✔
1044
    REALM_ASSERT(!m_ping_delay_in_progress);
168✔
1045
    REALM_ASSERT(m_waiting_for_pong);
168✔
1046
    REALM_ASSERT(m_send_ping);
168✔
1047

1048
    m_send_ping = false;
168✔
1049
    if (m_reconnect_info.scheduled_reset)
168✔
1050
        m_ping_after_scheduled_reset_of_reconnect_info = true;
138✔
1051

1052
    m_last_ping_sent_at = monotonic_clock_now();
168✔
1053
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
168✔
1054
                 m_previous_ping_rtt); // Throws
168✔
1055

1056
    ClientProtocol& protocol = get_client_protocol();
168✔
1057
    OutputBuffer& out = get_output_buffer();
168✔
1058
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
168✔
1059
    initiate_write_ping(out);                                          // Throws
168✔
1060
    m_ping_sent = true;
168✔
1061
}
168✔
1062

1063

1064
void Connection::initiate_write_ping(const OutputBuffer& out)
1065
{
168✔
1066
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
168✔
1067
        if (sentinel->destroyed) {
168✔
1068
            return;
2✔
1069
        }
2✔
1070
        if (!status.is_ok()) {
166✔
1071
            if (status != ErrorCodes::Error::OperationAborted) {
×
1072
                // Write errors will be handled by the websocket_write_error_handler() callback
1073
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1074
            }
×
1075
            return;
×
1076
        }
×
1077
        handle_write_ping(); // Throws
166✔
1078
    });                      // Throws
166✔
1079
    m_sending = true;
168✔
1080
}
168✔
1081

1082

1083
void Connection::handle_write_ping()
1084
{
166✔
1085
    REALM_ASSERT(m_sending);
166✔
1086
    REALM_ASSERT(!m_sending_session);
166✔
1087
    m_sending = false;
166✔
1088
    send_next_message(); // Throws
166✔
1089
}
166✔
1090

1091

1092
void Connection::handle_message_received(util::Span<const char> data)
1093
{
81,256✔
1094
    // parse_message_received() parses the message and calls the proper handler
1095
    // on the Connection object (this).
1096
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,256✔
1097
}
81,256✔
1098

1099

1100
void Connection::initiate_disconnect_wait()
1101
{
4,792✔
1102
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,792✔
1103

1104
    if (m_disconnect_delay_in_progress) {
4,792✔
1105
        m_reconnect_disconnect_timer.reset();
2,222✔
1106
        m_disconnect_delay_in_progress = false;
2,222✔
1107
    }
2,222✔
1108

1109
    milliseconds_type time = m_client.m_connection_linger_time;
4,792✔
1110

1111
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,792✔
1112
        // If the operation is aborted, the connection object may have been
1113
        // destroyed.
1114
        if (status != ErrorCodes::OperationAborted)
4,792✔
1115
            handle_disconnect_wait(status); // Throws
12✔
1116
    });                                     // Throws
4,792✔
1117
    m_disconnect_delay_in_progress = true;
4,792✔
1118
}
4,792✔
1119

1120

1121
void Connection::handle_disconnect_wait(Status status)
1122
{
12✔
1123
    if (!status.is_ok()) {
12✔
1124
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1125
        throw Exception(status);
×
1126
    }
×
1127

1128
    m_disconnect_delay_in_progress = false;
12✔
1129

1130
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1131
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1132
        if (m_client.m_connection_linger_time > 0)
12✔
1133
            logger.detail("Linger time expired"); // Throws
×
1134
        voluntary_disconnect();                   // Throws
12✔
1135
        logger.info("Disconnected");              // Throws
12✔
1136
    }
12✔
1137
}
12✔
1138

1139

1140
void Connection::close_due_to_protocol_error(Status status)
1141
{
18✔
1142
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
18✔
1143
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
18✔
1144
    involuntary_disconnect(std::move(error_info),
18✔
1145
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
18✔
1146
}
18✔
1147

1148

1149
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1150
{
412✔
1151
    logger.info("Connection closed due to error: %1", status); // Throws
412✔
1152

1153
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
412✔
1154
}
412✔
1155

1156

1157
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1158
{
628✔
1159
    logger.info("Connection closed due to transient error: %1", status); // Throws
628✔
1160
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
628✔
1161
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
628✔
1162

1163
    involuntary_disconnect(std::move(error_info), reason); // Throw
628✔
1164
}
628✔
1165

1166

1167
// Close connection due to error discovered on the server-side, and then
1168
// reported to the client by way of a connection-level ERROR message.
1169
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1170
{
68✔
1171
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
68✔
1172
                int(error_code)); // Throws
68✔
1173

1174
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
68✔
1175
                                      : ConnectionTerminationReason::server_said_try_again_later;
68✔
1176
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
68✔
1177
                           reason); // Throws
68✔
1178
}
68✔
1179

1180

1181
void Connection::disconnect(const SessionErrorInfo& info)
1182
{
3,818✔
1183
    // Cancel connect timeout watchdog
1184
    m_connect_timer.reset();
3,818✔
1185

1186
    if (m_state == ConnectionState::connected) {
3,818✔
1187
        m_disconnect_time = monotonic_clock_now();
3,616✔
1188
        m_disconnect_has_occurred = true;
3,616✔
1189

1190
        // Sessions that are in the Deactivating state at this time can be
1191
        // immediately discarded, in part because they are no longer enlisted to
1192
        // send. Such sessions will be taken to the Deactivated state by
1193
        // Session::connection_lost(), and then they will be removed from
1194
        // `m_sessions`.
1195
        auto i = m_sessions.begin(), end = m_sessions.end();
3,616✔
1196
        while (i != end) {
7,698✔
1197
            // Prevent invalidation of the main iterator when erasing elements
1198
            auto j = i++;
4,082✔
1199
            Session& sess = *j->second;
4,082✔
1200
            sess.connection_lost(); // Throws
4,082✔
1201
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,082✔
1202
                m_sessions.erase(j);
1,928✔
1203
        }
4,082✔
1204
    }
3,616✔
1205

1206
    change_state_to_disconnected();
3,818✔
1207

1208
    m_ping_delay_in_progress = false;
3,818✔
1209
    m_waiting_for_pong = false;
3,818✔
1210
    m_send_ping = false;
3,818✔
1211
    m_minimize_next_ping_delay = false;
3,818✔
1212
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,818✔
1213
    m_ping_sent = false;
3,818✔
1214
    m_heartbeat_timer.reset();
3,818✔
1215
    m_previous_ping_rtt = 0;
3,818✔
1216

1217
    m_websocket_sentinel->destroyed = true;
3,818✔
1218
    m_websocket_sentinel.reset();
3,818✔
1219
    m_websocket.reset();
3,818✔
1220
    m_input_body_buffer.reset();
3,818✔
1221
    m_sending_session = nullptr;
3,818✔
1222
    m_sessions_enlisted_to_send.clear();
3,818✔
1223
    m_sending = false;
3,818✔
1224

1225
    if (!m_appservices_coid.empty()) {
3,818✔
1226
        m_appservices_coid.clear();
3,572✔
1227
        logger.base_logger = make_logger(m_ident, std::nullopt, get_client().logger.base_logger);
3,572✔
1228
        for (auto& [ident, sess] : m_sessions) {
3,572✔
1229
            sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
2,092✔
1230
        }
2,092✔
1231
    }
3,572✔
1232

1233
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,818✔
1234
    initiate_reconnect_wait();                                           // Throws
3,818✔
1235
}
3,818✔
1236

1237
bool Connection::is_flx_sync_connection() const noexcept
1238
{
116,294✔
1239
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
116,294✔
1240
}
116,294✔
1241

1242
void Connection::receive_pong(milliseconds_type timestamp)
1243
{
164✔
1244
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
164✔
1245

1246
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
164✔
1247
    if (REALM_UNLIKELY(!legal_at_this_time)) {
164✔
1248
        close_due_to_protocol_error(
×
1249
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1250
        return;
×
1251
    }
×
1252

1253
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
164✔
1254
        close_due_to_protocol_error(
×
1255
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1256
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1257
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1258
        return;
×
1259
    }
×
1260

1261
    milliseconds_type now = monotonic_clock_now();
164✔
1262
    milliseconds_type round_trip_time = now - timestamp;
164✔
1263
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
164✔
1264
    m_previous_ping_rtt = round_trip_time;
164✔
1265

1266
    // If this PONG message is a response to a PING mesage that was sent after
1267
    // the last invocation of cancel_reconnect_delay(), then the connection is
1268
    // still good, and we do not have to skip the next reconnect delay.
1269
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
164✔
1270
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
128✔
1271
        m_ping_after_scheduled_reset_of_reconnect_info = false;
128✔
1272
        m_reconnect_info.scheduled_reset = false;
128✔
1273
    }
128✔
1274

1275
    m_heartbeat_timer.reset();
164✔
1276
    m_waiting_for_pong = false;
164✔
1277

1278
    initiate_ping_delay(now); // Throws
164✔
1279

1280
    if (m_client.m_roundtrip_time_handler)
164✔
1281
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1282
}
164✔
1283

1284
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1285
{
75,098✔
1286
    if (session_ident == 0) {
75,098✔
1287
        return nullptr;
×
1288
    }
×
1289

1290
    auto* sess = get_session(session_ident);
75,098✔
1291
    if (REALM_LIKELY(sess)) {
75,100✔
1292
        return sess;
75,098✔
1293
    }
75,098✔
1294
    // Check the history to see if the message received was for a previous session
1295
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,649✔
1296
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
2✔
1297
        close_due_to_protocol_error(
2✔
1298
            {ErrorCodes::SyncProtocolInvariantFailed,
2✔
1299
             util::format("Received message %1 for session iden %2 when that session never existed", message,
2✔
1300
                          session_ident)});
2✔
1301
    }
2✔
1302
    else {
2,147,483,647✔
1303
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1304
                     session_ident); // Throws
2,147,483,647✔
1305
    }
2,147,483,647✔
1306
    return nullptr;
2,147,483,649✔
1307
}
75,098✔
1308

1309
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1310
{
970✔
1311
    Session* sess = nullptr;
970✔
1312
    if (session_ident != 0) {
970✔
1313
        sess = find_and_validate_session(session_ident, "ERROR");
898✔
1314
        if (REALM_UNLIKELY(!sess)) {
898✔
1315
            return;
×
1316
        }
×
1317
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
898✔
1318
            close_due_to_protocol_error(std::move(status)); // Throws
×
1319
            return;
×
1320
        }
×
1321

1322
        if (sess->m_state == Session::Deactivated) {
898✔
1323
            finish_session_deactivation(sess);
14✔
1324
        }
14✔
1325
        return;
898✔
1326
    }
898✔
1327

1328
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
72✔
1329
                info.message, info.raw_error_code, info.is_fatal, session_ident,
72✔
1330
                info.server_requests_action); // Throws
72✔
1331

1332
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
72✔
1333
    if (REALM_LIKELY(known_error_code)) {
72✔
1334
        ProtocolError error_code = ProtocolError(info.raw_error_code);
68✔
1335
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
68✔
1336
            close_due_to_server_side_error(error_code, info); // Throws
68✔
1337
            return;
68✔
1338
        }
68✔
1339
        close_due_to_protocol_error(
×
1340
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1341
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1342
                          info.raw_error_code)});
×
1343
    }
×
1344
    else {
4✔
1345
        close_due_to_protocol_error(
4✔
1346
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1347
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1348
    }
4✔
1349
}
72✔
1350

1351

1352
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1353
                                             session_ident_type session_ident)
1354
{
20✔
1355
    if (session_ident == 0) {
20✔
1356
        return close_due_to_protocol_error(
×
1357
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1358
    }
×
1359

1360
    if (!is_flx_sync_connection()) {
20✔
1361
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1362
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1363
    }
×
1364

1365
    if (Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR")) {
20✔
1366
        sess->receive_query_error_message(raw_error_code, message, query_version);
20✔
1367
    }
20✔
1368
}
20✔
1369

1370

1371
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1372
{
3,622✔
1373
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,622✔
1374
    if (REALM_UNLIKELY(!sess)) {
3,622✔
1375
        return;
×
1376
    }
×
1377

1378
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,622✔
1379
        close_due_to_protocol_error(std::move(status)); // Throws
×
1380
}
3,622✔
1381

1382
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1383
{
49,492✔
1384
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
49,492✔
1385
    if (REALM_UNLIKELY(!sess)) {
49,492✔
1386
        return;
2✔
1387
    }
2✔
1388

1389
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
49,490✔
UNCOV
1390
        close_due_to_protocol_error(std::move(status));
×
UNCOV
1391
    }
×
1392
}
49,490✔
1393

1394
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1395
{
17,026✔
1396
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,026✔
1397
    if (REALM_UNLIKELY(!sess)) {
17,026✔
1398
        return;
×
1399
    }
×
1400

1401
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,026✔
1402
        close_due_to_protocol_error(std::move(status)); // Throws
12✔
1403
}
17,026✔
1404

1405

1406
void Connection::receive_unbound_message(session_ident_type session_ident)
1407
{
3,976✔
1408
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,976✔
1409
    if (REALM_UNLIKELY(!sess)) {
3,976✔
1410
        return;
×
1411
    }
×
1412

1413
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,976✔
1414
        close_due_to_protocol_error(std::move(status)); // Throws
×
1415
        return;
×
1416
    }
×
1417

1418
    if (sess->m_state == Session::Deactivated) {
3,976✔
1419
        finish_session_deactivation(sess);
3,976✔
1420
    }
3,976✔
1421
}
3,976✔
1422

1423

1424
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1425
                                               std::string_view body)
1426
{
64✔
1427
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
64✔
1428
    if (REALM_UNLIKELY(!sess)) {
64✔
1429
        return;
×
1430
    }
×
1431

1432
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
64✔
1433
        close_due_to_protocol_error(std::move(status));
×
1434
    }
×
1435
}
64✔
1436

1437

1438
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1439
                                            std::string_view message)
1440
{
5,918✔
1441
    if (session_ident != 0) {
5,918✔
1442
        if (auto sess = get_session(session_ident)) {
3,934✔
1443
            sess->logger.log(LogCategory::session, level, "Server log: %1", message);
3,920✔
1444
            return;
3,920✔
1445
        }
3,920✔
1446

1447
        logger.log(util::LogCategory::session, level, "Server log for unknown session %1: %2", session_ident,
14✔
1448
                   message);
14✔
1449
        return;
14✔
1450
    }
3,934✔
1451

1452
    logger.log(level, "Server log: %1", message);
1,984✔
1453
}
1,984✔
1454

1455

1456
void Connection::receive_appservices_request_id(std::string_view coid)
1457
{
5,602✔
1458
    if (coid.empty() || !m_appservices_coid.empty()) {
5,602✔
1459
        return;
2,028✔
1460
    }
2,028✔
1461
    m_appservices_coid = coid;
3,574✔
1462
    logger.log(util::LogCategory::session, util::LogCategory::Level::info,
3,574✔
1463
               "Connected to app services with request id: \"%1\". Further log entries for this connection will be "
3,574✔
1464
               "prefixed with \"Connection[%2:%1]\" instead of \"Connection[%2]\"",
3,574✔
1465
               m_appservices_coid, m_ident);
3,574✔
1466
    logger.base_logger = make_logger(m_ident, m_appservices_coid, get_client().logger.base_logger);
3,574✔
1467

1468
    for (auto& [ident, sess] : m_sessions) {
4,706✔
1469
        sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
4,706✔
1470
    }
4,706✔
1471
}
3,574✔
1472

1473

1474
void Connection::handle_protocol_error(Status status)
1475
{
×
1476
    close_due_to_protocol_error(std::move(status));
×
1477
}
×
1478

1479

1480
// Sessions are guaranteed to be granted the opportunity to send a message in
1481
// the order that they enlist. Note that this is important to ensure
1482
// nonoverlapping communication with the server for consecutive sessions
1483
// associated with the same Realm file.
1484
//
1485
// CAUTION: The specified session may get destroyed before this function
1486
// returns, but only if its Session::send_message() puts it into the Deactivated
1487
// state.
1488
void Connection::enlist_to_send(Session* sess)
1489
{
177,114✔
1490
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
177,114✔
1491
    m_sessions_enlisted_to_send.push_back(sess); // Throws
177,114✔
1492
    if (!m_sending)
177,114✔
1493
        send_next_message(); // Throws
75,610✔
1494
}
177,114✔
1495

1496

1497
std::string Connection::get_active_appservices_connection_id()
1498
{
76✔
1499
    return m_appservices_coid;
76✔
1500
}
76✔
1501

1502
void Session::cancel_resumption_delay()
1503
{
4,166✔
1504
    REALM_ASSERT_EX(m_state == Active, m_state);
4,166✔
1505

1506
    if (!m_suspended)
4,166✔
1507
        return;
3,994✔
1508

1509
    m_suspended = false;
172✔
1510

1511
    logger.debug("Resumed"); // Throws
172✔
1512

1513
    if (unbind_process_complete())
172✔
1514
        initiate_rebind(); // Throws
134✔
1515

1516
    try {
172✔
1517
        process_pending_flx_bootstrap(); // throws
172✔
1518
    }
172✔
1519
    catch (const IntegrationException& error) {
172✔
1520
        on_integration_failure(error);
×
1521
    }
×
1522
    catch (...) {
172✔
1523
        on_integration_failure(IntegrationException(exception_to_status()));
×
1524
    }
×
1525

1526
    m_conn.one_more_active_unsuspended_session(); // Throws
172✔
1527
    if (m_try_again_activation_timer) {
172✔
1528
        m_try_again_activation_timer.reset();
8✔
1529
    }
8✔
1530

1531
    on_resumed(); // Throws
172✔
1532
}
172✔
1533

1534

1535
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1536
                                                 std::vector<ProtocolErrorInfo>* out)
1537
{
23,396✔
1538
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,396✔
1539
        return;
23,340✔
1540
    }
23,340✔
1541

1542
#ifdef REALM_DEBUG
56✔
1543
    REALM_ASSERT_DEBUG(
56✔
1544
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
56✔
1545
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
56✔
1546
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
56✔
1547
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
56✔
1548
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
56✔
1549
                       }));
56✔
1550
#endif
56✔
1551

1552
    while (!m_pending_compensating_write_errors.empty() &&
112✔
1553
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
112✔
1554
               changesets.back().version) {
56✔
1555
        auto& cur_error = m_pending_compensating_write_errors.front();
56✔
1556
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
56✔
1557
        out->push_back(std::move(cur_error));
56✔
1558
        m_pending_compensating_write_errors.pop_front();
56✔
1559
    }
56✔
1560
}
56✔
1561

1562

1563
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1564
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1565
                                   DownloadBatchState download_batch_state)
1566
{
44,942✔
1567
    auto& history = get_history();
44,942✔
1568
    if (received_changesets.empty()) {
44,942✔
1569
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,520✔
1570
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1571
                                       "received empty download message that was not the last in batch",
×
1572
                                       ProtocolError::bad_progress);
×
1573
        }
×
1574
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,520✔
1575
        return;
21,520✔
1576
    }
21,520✔
1577

1578
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,422✔
1579
    auto transact = get_db()->start_read();
23,422✔
1580
    history.integrate_server_changesets(
23,422✔
1581
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,422✔
1582
        [&](const Transaction&, util::Span<Changeset> changesets) {
23,422✔
1583
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,396✔
1584
        }); // Throws
23,396✔
1585
    if (received_changesets.size() == 1) {
23,422✔
1586
        logger.debug("1 remote changeset integrated, producing client version %1",
15,414✔
1587
                     version_info.sync_version.version); // Throws
15,414✔
1588
    }
15,414✔
1589
    else {
8,008✔
1590
        logger.debug("%2 remote changesets integrated, producing client version %1",
8,008✔
1591
                     version_info.sync_version.version, received_changesets.size()); // Throws
8,008✔
1592
    }
8,008✔
1593

1594
    for (const auto& pending_error : pending_compensating_write_errors) {
23,422✔
1595
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
56✔
1596
                    pending_error.compensating_write_rejected_client_version,
56✔
1597
                    *pending_error.compensating_write_server_version, pending_error.message);
56✔
1598
        try {
56✔
1599
            on_connection_state_changed(
56✔
1600
                m_conn.get_state(),
56✔
1601
                SessionErrorInfo{pending_error,
56✔
1602
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
56✔
1603
                                                          pending_error.message)});
56✔
1604
        }
56✔
1605
        catch (...) {
56✔
1606
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1607
        }
×
1608
    }
56✔
1609
}
23,422✔
1610

1611

1612
void Session::on_integration_failure(const IntegrationException& error)
1613
{
40✔
1614
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1615
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1616
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1617

1618
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1619
    m_error_to_send = true;
40✔
1620
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1621
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1622
    // Surface the error to the user otherwise is lost.
1623
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1624

1625
    // Since the deactivation process has not been initiated, the UNBIND
1626
    // message cannot have been sent unless an ERROR message was received.
1627
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1628
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1629
        ensure_enlisted_to_send(); // Throws
36✔
1630
    }
36✔
1631
}
40✔
1632

1633
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1634
{
47,310✔
1635
    REALM_ASSERT_EX(m_state == Active, m_state);
47,310✔
1636
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,310✔
1637

1638
    m_download_progress = progress.download;
47,310✔
1639
    m_progress = progress;
47,310✔
1640

1641
    if (progress.upload.client_version > m_upload_progress.client_version)
47,310✔
1642
        m_upload_progress = progress.upload;
610✔
1643

1644
    do_recognize_sync_version(client_version); // Allows upload process to resume
47,310✔
1645

1646
    check_for_download_completion(); // Throws
47,310✔
1647

1648
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1649
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,310✔
1650
        auto& flx_subscription_store = *get_flx_subscription_store();
3,906✔
1651
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,906✔
1652
    }
3,906✔
1653

1654
    // Since the deactivation process has not been initiated, the UNBIND
1655
    // message cannot have been sent unless an ERROR message was received.
1656
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,310✔
1657
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,310✔
1658
        ensure_enlisted_to_send(); // Throws
47,298✔
1659
    }
47,298✔
1660
}
47,310✔
1661

1662

1663
Session::~Session()
1664
{
10,412✔
1665
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1666
}
10,412✔
1667

1668

1669
std::shared_ptr<util::Logger> Session::make_logger(session_ident_type ident,
1670
                                                   std::shared_ptr<util::Logger> base_logger)
1671
{
17,210✔
1672
    auto prefix = util::format("Session[%1]: ", ident);
17,210✔
1673
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix),
17,210✔
1674
                                                std::move(base_logger));
17,210✔
1675
}
17,210✔
1676

1677
void Session::activate()
1678
{
10,414✔
1679
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,414✔
1680

1681
    logger.debug("Activating"); // Throws
10,414✔
1682

1683
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,414✔
1684
        bool file_exists = util::File::exists(get_realm_path());
10,414✔
1685

1686
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,414✔
1687
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,414✔
1688
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,414✔
1689
    }
10,414✔
1690
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,414✔
1691
                 m_client_file_ident.salt); // Throws
10,414✔
1692
    m_upload_progress = m_progress.upload;
10,414✔
1693
    m_download_progress = m_progress.download;
10,414✔
1694
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,414✔
1695
    init_progress_handler();
10,414✔
1696

1697
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,414✔
1698
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,414✔
1699
    logger.debug("progress_download_client_version = %1",
10,414✔
1700
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,414✔
1701
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,414✔
1702
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,414✔
1703

1704
    reset_protocol_state();
10,414✔
1705
    m_state = Active;
10,414✔
1706

1707
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,414✔
1708

1709
    REALM_ASSERT(!m_suspended);
10,414✔
1710
    m_conn.one_more_active_unsuspended_session(); // Throws
10,414✔
1711

1712
    try {
10,414✔
1713
        process_pending_flx_bootstrap(); // throws
10,414✔
1714
    }
10,414✔
1715
    catch (const IntegrationException& error) {
10,414✔
1716
        on_integration_failure(error);
×
1717
    }
×
1718
    catch (...) {
10,414✔
1719
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1720
    }
4✔
1721

1722
    // Checks if there is a pending client reset
1723
    handle_pending_client_reset_acknowledgement();
10,414✔
1724
}
10,412✔
1725

1726

1727
// The caller (Connection) must discard the session if the session has become
1728
// deactivated upon return.
1729
void Session::initiate_deactivation()
1730
{
10,414✔
1731
    REALM_ASSERT_EX(m_state == Active, m_state);
10,414✔
1732

1733
    logger.debug("Initiating deactivation"); // Throws
10,414✔
1734

1735
    m_state = Deactivating;
10,414✔
1736

1737
    if (!m_suspended)
10,414✔
1738
        m_conn.one_less_active_unsuspended_session(); // Throws
9,752✔
1739

1740
    if (m_enlisted_to_send) {
10,414✔
1741
        REALM_ASSERT(!unbind_process_complete());
5,412✔
1742
        return;
5,412✔
1743
    }
5,412✔
1744

1745
    // Deactivate immediately if the BIND message has not yet been sent and the
1746
    // session is not enlisted to send, or if the unbinding process has already
1747
    // completed.
1748
    if (!m_bind_message_sent || unbind_process_complete()) {
5,002✔
1749
        complete_deactivation(); // Throws
954✔
1750
        // Life cycle state is now Deactivated
1751
        return;
954✔
1752
    }
954✔
1753

1754
    // Ready to send the UNBIND message, if it has not already been sent
1755
    if (!m_unbind_message_sent) {
4,048✔
1756
        enlist_to_send(); // Throws
3,838✔
1757
        return;
3,838✔
1758
    }
3,838✔
1759
}
4,048✔
1760

1761

1762
void Session::complete_deactivation()
1763
{
10,414✔
1764
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,414✔
1765
    m_state = Deactivated;
10,414✔
1766

1767
    logger.debug("Deactivation completed"); // Throws
10,414✔
1768
}
10,414✔
1769

1770

1771
// Called by the associated Connection object when this session is granted an
1772
// opportunity to send a message.
1773
//
1774
// The caller (Connection) must discard the session if the session has become
1775
// deactivated upon return.
1776
void Session::send_message()
1777
{
175,594✔
1778
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
175,594✔
1779
    REALM_ASSERT(m_enlisted_to_send);
175,594✔
1780
    m_enlisted_to_send = false;
175,594✔
1781
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
175,594✔
1782
        // Deactivation has been initiated. If the UNBIND message has not been
1783
        // sent yet, there is no point in sending it. Instead, we can let the
1784
        // deactivation process complete.
1785
        if (!m_bind_message_sent) {
9,692✔
1786
            return complete_deactivation(); // Throws
3,418✔
1787
            // Life cycle state is now Deactivated
1788
        }
3,418✔
1789

1790
        // Session life cycle state is Deactivating or the unbinding process has
1791
        // been initiated by a session specific ERROR message
1792
        if (!m_unbind_message_sent)
6,274✔
1793
            send_unbind_message(); // Throws
6,274✔
1794
        return;
6,274✔
1795
    }
9,692✔
1796

1797
    // Session life cycle state is Active and the unbinding process has
1798
    // not been initiated
1799
    REALM_ASSERT(!m_unbind_message_sent);
165,902✔
1800

1801
    if (!m_bind_message_sent)
165,902✔
1802
        return send_bind_message(); // Throws
8,708✔
1803

1804
    // Pending test commands can be sent any time after the BIND message is sent
1805
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
157,194✔
1806
                                                      [](const PendingTestCommand& command) {
157,194✔
1807
                                                          return command.pending;
156✔
1808
                                                      });
156✔
1809
    if (has_pending_test_command) {
157,194✔
1810
        return send_test_command_message();
64✔
1811
    }
64✔
1812

1813
    if (!m_ident_message_sent) {
157,130✔
1814
        if (have_client_file_ident())
7,794✔
1815
            send_ident_message(); // Throws
7,794✔
1816
        return;
7,794✔
1817
    }
7,794✔
1818

1819
    if (m_error_to_send)
149,336✔
1820
        return send_json_error_message(); // Throws
30✔
1821

1822
    // Stop sending upload, mark and query messages when the client detects an error.
1823
    if (m_client_error) {
149,306✔
1824
        return;
12✔
1825
    }
12✔
1826

1827
    if (m_target_download_mark > m_last_download_mark_sent)
149,294✔
1828
        return send_mark_message(); // Throws
17,896✔
1829

1830
    auto is_upload_allowed = [&]() -> bool {
131,402✔
1831
        if (!m_is_flx_sync_session) {
131,398✔
1832
            return true;
110,388✔
1833
        }
110,388✔
1834

1835
        auto migration_store = get_migration_store();
21,010✔
1836
        if (!migration_store) {
21,010✔
1837
            return true;
×
1838
        }
×
1839

1840
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
21,010✔
1841
        if (!sentinel_query_version) {
21,010✔
1842
            return true;
20,980✔
1843
        }
20,980✔
1844

1845
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1846
        return m_last_sent_flx_query_version != *sentinel_query_version;
30✔
1847
    };
21,010✔
1848

1849
    if (!is_upload_allowed()) {
131,398✔
1850
        return;
16✔
1851
    }
16✔
1852

1853
    auto check_pending_flx_version = [&]() -> bool {
131,386✔
1854
        if (!m_is_flx_sync_session) {
131,382✔
1855
            return false;
110,388✔
1856
        }
110,388✔
1857

1858
        if (m_delay_uploads) {
20,994✔
1859
            return false;
2,966✔
1860
        }
2,966✔
1861

1862
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
18,028✔
1863

1864
        if (!m_pending_flx_sub_set) {
18,028✔
1865
            return false;
15,560✔
1866
        }
15,560✔
1867

1868
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1869
        // of a pending subscription
1870
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,468✔
1871
    };
18,028✔
1872

1873
    if (check_pending_flx_version()) {
131,382✔
1874
        return send_query_change_message(); // throws
1,374✔
1875
    }
1,374✔
1876

1877
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
130,008✔
1878
        return send_upload_message(); // Throws
61,750✔
1879
    }
61,750✔
1880
}
130,008✔
1881

1882

1883
void Session::send_bind_message()
1884
{
8,708✔
1885
    REALM_ASSERT_EX(m_state == Active, m_state);
8,708✔
1886

1887
    session_ident_type session_ident = m_ident;
8,708✔
1888
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1889
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1890
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1891
    // to the server.
1892
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
8,708✔
1893
    const bool is_subserver = false;
8,708✔
1894

1895
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,708✔
1896
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,708✔
1897
    OutputBuffer& out = m_conn.get_output_buffer();
8,708✔
1898
    // Discard the token since it's ignored by the server.
1899
    std::string empty_access_token;
8,708✔
1900
    if (m_is_flx_sync_session) {
8,708✔
1901
        nlohmann::json bind_json_data;
1,890✔
1902
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,890✔
1903
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1904
        }
60✔
1905
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,890✔
1906
        auto schema_version = get_schema_version();
1,890✔
1907
        // Send 0 if schema is not versioned.
1908
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,890✔
1909
        if (logger.would_log(util::Logger::Level::debug)) {
1,890✔
1910
            std::string json_data_dump;
1,890✔
1911
            if (!bind_json_data.empty()) {
1,890✔
1912
                json_data_dump = bind_json_data.dump();
1,890✔
1913
            }
1,890✔
1914
            logger.debug(
1,890✔
1915
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,890✔
1916
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,890✔
1917
        }
1,890✔
1918
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,890✔
1919
                                       need_client_file_ident, is_subserver); // Throws
1,890✔
1920
    }
1,890✔
1921
    else {
6,818✔
1922
        std::string server_path = get_virt_path();
6,818✔
1923
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
6,818✔
1924
                     session_ident, need_client_file_ident, is_subserver, server_path);
6,818✔
1925
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
6,818✔
1926
                                       need_client_file_ident, is_subserver); // Throws
6,818✔
1927
    }
6,818✔
1928
    m_conn.initiate_write_message(out, this); // Throws
8,708✔
1929

1930
    m_bind_message_sent = true;
8,708✔
1931
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
8,708✔
1932

1933
    // If there is a pending client reset diff, process that when the BIND message has
1934
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1935
    // ready to send the IDENT message if the file identifier pair is already available.
1936
    if (!need_client_file_ident)
8,708✔
1937
        enlist_to_send(); // Throws
4,892✔
1938
}
8,708✔
1939

1940

1941
void Session::send_ident_message()
1942
{
7,794✔
1943
    REALM_ASSERT_EX(m_state == Active, m_state);
7,794✔
1944
    REALM_ASSERT(m_bind_message_sent);
7,794✔
1945
    REALM_ASSERT(!m_unbind_message_sent);
7,794✔
1946
    REALM_ASSERT(have_client_file_ident());
7,794✔
1947

1948
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,794✔
1949
    OutputBuffer& out = m_conn.get_output_buffer();
7,794✔
1950
    session_ident_type session_ident = m_ident;
7,794✔
1951

1952
    if (m_is_flx_sync_session) {
7,794✔
1953
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,798✔
1954
        const auto active_query_body = active_query_set.to_ext_json();
1,798✔
1955
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,798✔
1956
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,798✔
1957
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,798✔
1958
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,798✔
1959
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,798✔
1960
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,798✔
1961
                     active_query_body); // Throws
1,798✔
1962
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,798✔
1963
                                        active_query_set.version(), active_query_body); // Throws
1,798✔
1964
        m_last_sent_flx_query_version = active_query_set.version();
1,798✔
1965
    }
1,798✔
1966
    else {
5,996✔
1967
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,996✔
1968
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,996✔
1969
                     "latest_server_version_salt=%6)",
5,996✔
1970
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,996✔
1971
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,996✔
1972
                     m_progress.latest_server_version.salt);                                  // Throws
5,996✔
1973
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,996✔
1974
    }
5,996✔
1975
    m_conn.initiate_write_message(out, this); // Throws
7,794✔
1976

1977
    m_ident_message_sent = true;
7,794✔
1978
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,794✔
1979

1980
    // Other messages may be waiting to be sent
1981
    enlist_to_send(); // Throws
7,794✔
1982
}
7,794✔
1983

1984
void Session::send_query_change_message()
1985
{
1,374✔
1986
    REALM_ASSERT_EX(m_state == Active, m_state);
1,374✔
1987
    REALM_ASSERT(m_ident_message_sent);
1,374✔
1988
    REALM_ASSERT(!m_unbind_message_sent);
1,374✔
1989
    REALM_ASSERT(m_pending_flx_sub_set);
1,374✔
1990
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,374✔
1991

1992
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,374✔
1993
        return;
×
1994
    }
×
1995

1996
    auto sub_store = get_flx_subscription_store();
1,374✔
1997
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,374✔
1998
    auto latest_queries = latest_sub_set.to_ext_json();
1,374✔
1999
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,374✔
2000
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,374✔
2001

2002
    OutputBuffer& out = m_conn.get_output_buffer();
1,374✔
2003
    session_ident_type session_ident = get_ident();
1,374✔
2004
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,374✔
2005
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,374✔
2006
    m_conn.initiate_write_message(out, this);
1,374✔
2007

2008
    m_last_sent_flx_query_version = latest_sub_set.version();
1,374✔
2009

2010
    request_download_completion_notification();
1,374✔
2011
}
1,374✔
2012

2013
void Session::send_upload_message()
2014
{
61,748✔
2015
    REALM_ASSERT_EX(m_state == Active, m_state);
61,748✔
2016
    REALM_ASSERT(m_ident_message_sent);
61,748✔
2017
    REALM_ASSERT(!m_unbind_message_sent);
61,748✔
2018

2019
    if (REALM_UNLIKELY(get_client().is_dry_run()))
61,748✔
2020
        return;
×
2021

2022
    version_type target_upload_version = m_last_version_available;
61,748✔
2023
    if (m_pending_flx_sub_set) {
61,748✔
2024
        REALM_ASSERT(m_is_flx_sync_session);
1,094✔
2025
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,094✔
2026
    }
1,094✔
2027

2028
    bool server_version_to_ack =
61,748✔
2029
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
61,748✔
2030

2031
    std::vector<UploadChangeset> uploadable_changesets;
61,748✔
2032
    version_type locked_server_version = 0;
61,748✔
2033
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
61,748✔
2034
                                             locked_server_version); // Throws
61,748✔
2035

2036
    if (uploadable_changesets.empty()) {
61,748✔
2037
        // Nothing more to upload right now if:
2038
        //  1. We need to limit upload up to some version other than the last client version
2039
        //     available and there are no changes to upload
2040
        //  2. There are no changes to upload and no server version(s) to acknowledge
2041
        if (m_pending_flx_sub_set || !server_version_to_ack) {
31,952✔
2042
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
5,388✔
2043
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
5,388✔
2044
            // Other messages may be waiting to be sent
2045
            return enlist_to_send(); // Throws
5,388✔
2046
        }
5,388✔
2047
    }
31,952✔
2048

2049
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
56,360✔
2050
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
748✔
2051
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
748✔
2052
    }
748✔
2053

2054
    version_type progress_client_version = m_upload_progress.client_version;
56,360✔
2055
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
56,360✔
2056

2057
    if (!upload_messages_allowed()) {
56,360✔
2058
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
612✔
2059
                     "locked_server_version=%3, num_changesets=%4)",
612✔
2060
                     progress_client_version, progress_server_version, locked_server_version,
612✔
2061
                     uploadable_changesets.size()); // Throws
612✔
2062
        // Other messages may be waiting to be sent
2063
        return enlist_to_send(); // Throws
612✔
2064
    }
612✔
2065

2066
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,748✔
2067
                 "locked_server_version=%3, num_changesets=%4)",
55,748✔
2068
                 progress_client_version, progress_server_version, locked_server_version,
55,748✔
2069
                 uploadable_changesets.size()); // Throws
55,748✔
2070

2071
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,748✔
2072
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,748✔
2073

2074
    for (const UploadChangeset& uc : uploadable_changesets) {
55,748✔
2075
        logger.debug(util::LogCategory::changeset,
43,572✔
2076
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,572✔
2077
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,572✔
2078
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,572✔
2079
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,572✔
2080
        if (logger.would_log(util::Logger::Level::trace)) {
43,572✔
2081
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2082
            if (changeset_data.size() < 1024) {
×
2083
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2084
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2085
            }
×
2086
            else {
×
2087
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2088
                             protocol.compressed_hex_dump(changeset_data));
×
2089
            }
×
2090

2091
#if REALM_DEBUG
×
2092
            ChunkedBinaryInputStream in{changeset_data};
×
2093
            Changeset log;
×
2094
            try {
×
2095
                parse_changeset(in, log);
×
2096
                std::stringstream ss;
×
2097
                log.print(ss);
×
2098
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2099
            }
×
2100
            catch (const BadChangesetError& err) {
×
2101
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2102
            }
×
2103
#endif
×
2104
        }
×
2105

2106
        {
43,572✔
2107
            upload_message_builder.add_changeset(uc.progress.client_version,
43,572✔
2108
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,572✔
2109
                                                 uc.origin_file_ident,
43,572✔
2110
                                                 uc.changeset); // Throws
43,572✔
2111
        }
43,572✔
2112
    }
43,572✔
2113

2114
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,748✔
2115
    OutputBuffer& out = m_conn.get_output_buffer();
55,748✔
2116
    session_ident_type session_ident = get_ident();
55,748✔
2117
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,748✔
2118
                                               progress_server_version,
55,748✔
2119
                                               locked_server_version); // Throws
55,748✔
2120
    m_conn.initiate_write_message(out, this);                          // Throws
55,748✔
2121

2122
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
55,748✔
2123

2124
    // Other messages may be waiting to be sent
2125
    enlist_to_send(); // Throws
55,748✔
2126
}
55,748✔
2127

2128

2129
void Session::send_mark_message()
2130
{
17,896✔
2131
    REALM_ASSERT_EX(m_state == Active, m_state);
17,896✔
2132
    REALM_ASSERT(m_ident_message_sent);
17,896✔
2133
    REALM_ASSERT(!m_unbind_message_sent);
17,896✔
2134
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,896✔
2135

2136
    request_ident_type request_ident = m_target_download_mark;
17,896✔
2137
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,896✔
2138

2139
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,896✔
2140
    OutputBuffer& out = m_conn.get_output_buffer();
17,896✔
2141
    session_ident_type session_ident = get_ident();
17,896✔
2142
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,896✔
2143
    m_conn.initiate_write_message(out, this);                      // Throws
17,896✔
2144

2145
    m_last_download_mark_sent = request_ident;
17,896✔
2146

2147
    // Other messages may be waiting to be sent
2148
    enlist_to_send(); // Throws
17,896✔
2149
}
17,896✔
2150

2151

2152
void Session::send_unbind_message()
2153
{
6,272✔
2154
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,272✔
2155
    REALM_ASSERT(m_bind_message_sent);
6,272✔
2156
    REALM_ASSERT(!m_unbind_message_sent);
6,272✔
2157

2158
    logger.debug("Sending: UNBIND"); // Throws
6,272✔
2159

2160
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,272✔
2161
    OutputBuffer& out = m_conn.get_output_buffer();
6,272✔
2162
    session_ident_type session_ident = get_ident();
6,272✔
2163
    protocol.make_unbind_message(out, session_ident); // Throws
6,272✔
2164
    m_conn.initiate_write_message(out, this);         // Throws
6,272✔
2165

2166
    m_unbind_message_sent = true;
6,272✔
2167
}
6,272✔
2168

2169

2170
void Session::send_json_error_message()
2171
{
30✔
2172
    REALM_ASSERT_EX(m_state == Active, m_state);
30✔
2173
    REALM_ASSERT(m_ident_message_sent);
30✔
2174
    REALM_ASSERT(!m_unbind_message_sent);
30✔
2175
    REALM_ASSERT(m_error_to_send);
30✔
2176
    REALM_ASSERT(m_client_error);
30✔
2177

2178
    ClientProtocol& protocol = m_conn.get_client_protocol();
30✔
2179
    OutputBuffer& out = m_conn.get_output_buffer();
30✔
2180
    session_ident_type session_ident = get_ident();
30✔
2181
    auto protocol_error = m_client_error->error_for_server;
30✔
2182

2183
    auto message = util::format("%1", m_client_error->to_status());
30✔
2184
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
30✔
2185
                session_ident); // Throws
30✔
2186

2187
    nlohmann::json error_body_json;
30✔
2188
    error_body_json["message"] = std::move(message);
30✔
2189
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
30✔
2190
                                     error_body_json.dump()); // Throws
30✔
2191
    m_conn.initiate_write_message(out, this);                 // Throws
30✔
2192

2193
    m_error_to_send = false;
30✔
2194
    enlist_to_send(); // Throws
30✔
2195
}
30✔
2196

2197

2198
void Session::send_test_command_message()
2199
{
64✔
2200
    REALM_ASSERT_EX(m_state == Active, m_state);
64✔
2201

2202
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2203
                           [](const PendingTestCommand& command) {
68✔
2204
                               return command.pending;
68✔
2205
                           });
68✔
2206
    REALM_ASSERT(it != m_pending_test_commands.end());
64✔
2207

2208
    ClientProtocol& protocol = m_conn.get_client_protocol();
64✔
2209
    OutputBuffer& out = m_conn.get_output_buffer();
64✔
2210
    auto session_ident = get_ident();
64✔
2211

2212
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
64✔
2213
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
64✔
2214

2215
    m_conn.initiate_write_message(out, this); // Throws;
64✔
2216
    it->pending = false;
64✔
2217

2218
    enlist_to_send();
64✔
2219
}
64✔
2220

2221
bool Session::client_reset_if_needed()
2222
{
424✔
2223
    // Even if we end up not actually performing a client reset, consume the
2224
    // config to ensure that the resources it holds are released
2225
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2226
    if (!client_reset_config) {
424✔
2227
        return false;
×
2228
    }
×
2229

2230
    // Save a copy of the status and action in case an error/exception occurs
2231
    Status cr_status = client_reset_config->error;
424✔
2232
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2233

2234
    try {
424✔
2235
        // The file ident from the fresh realm will be copied over to the local realm
2236
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2237
                                                            get_flx_subscription_store());
424✔
2238

2239
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2240
        if (!did_reset) {
424✔
2241
            return false;
×
2242
        }
×
2243
    }
424✔
2244
    catch (const std::exception& e) {
424✔
2245
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2246
                                    cr_status, e.what());
80✔
2247
        logger.error(err_msg.c_str());
80✔
2248
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2249
        suspend(err_info);
80✔
2250
        return false;
80✔
2251
    }
80✔
2252

2253
    // The fresh Realm has been used to reset the state
2254
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2255

2256
    // Update the version, file ident and progress info after the client reset diff is done
2257
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2258
    // Print the version/progress information before performing the asserts
2259
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2260
                 m_client_file_ident.salt);                                // Throws
344✔
2261
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2262
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2263
                 m_progress.upload.client_version,
344✔
2264
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2265
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2266
                 m_progress.download.last_integrated_client_version,
344✔
2267
                 m_progress.download.server_version); // Throws
344✔
2268

2269
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2270
                    m_progress.download.last_integrated_client_version);
344✔
2271
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2272

2273
    m_upload_progress = m_progress.upload;
344✔
2274
    m_download_progress = m_progress.download;
344✔
2275
    init_progress_handler();
344✔
2276
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2277
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2278
    // For both, we want to allow uploads again without needing external changes to download first.
2279
    m_delay_uploads = false;
344✔
2280

2281
    // Checks if there is a pending client reset
2282
    handle_pending_client_reset_acknowledgement();
344✔
2283

2284
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2285
    if (auto migration_store = get_migration_store()) {
344✔
2286
        migration_store->complete_migration_or_rollback();
316✔
2287
    }
316✔
2288

2289
    return true;
344✔
2290
}
424✔
2291

2292
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2293
{
3,622✔
2294
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,622✔
2295
                 client_file_ident.salt); // Throws
3,622✔
2296

2297
    // Ignore the message if the deactivation process has been initiated,
2298
    // because in that case, the associated Realm and SessionWrapper must
2299
    // not be accessed any longer.
2300
    if (m_state != Active)
3,622✔
2301
        return Status::OK(); // Success
70✔
2302

2303
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,552✔
2304
                               !m_unbound_message_received);
3,552✔
2305
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,552✔
2306
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2307
    }
×
2308
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,552✔
2309
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2310
    }
×
2311
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,552✔
2312
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2313
    }
×
2314

2315
    m_client_file_ident = client_file_ident;
3,552✔
2316

2317
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,552✔
2318
        // Ready to send the IDENT message
2319
        ensure_enlisted_to_send(); // Throws
×
2320
        return Status::OK();       // Success
×
2321
    }
×
2322

2323
    get_history().set_client_file_ident(client_file_ident,
3,552✔
2324
                                        m_fix_up_object_ids); // Throws
3,552✔
2325
    m_progress.download.last_integrated_client_version = 0;
3,552✔
2326
    m_progress.upload.client_version = 0;
3,552✔
2327

2328
    // Ready to send the IDENT message
2329
    ensure_enlisted_to_send(); // Throws
3,552✔
2330
    return Status::OK();       // Success
3,552✔
2331
}
3,552✔
2332

2333
Status Session::receive_download_message(const DownloadMessage& message)
2334
{
49,490✔
2335
    // Ignore the message if the deactivation process has been initiated,
2336
    // because in that case, the associated Realm and SessionWrapper must
2337
    // not be accessed any longer.
2338
    if (m_state != Active)
49,490✔
2339
        return Status::OK();
556✔
2340

2341
    bool is_flx = m_conn.is_flx_sync_connection();
48,934✔
2342
    int64_t query_version = is_flx ? *message.query_version : 0;
48,934✔
2343

2344
    if (!is_flx || query_version > 0)
48,934✔
2345
        enable_progress_notifications();
47,002✔
2346

2347
    auto&& progress = message.progress;
48,934✔
2348
    if (is_flx) {
48,934✔
2349
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,502✔
2350
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,502✔
2351
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,502✔
2352
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,502✔
2353
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,502✔
2354
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,502✔
2355
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,502✔
2356
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,502✔
2357
                     message.changesets.size()); // Throws
5,502✔
2358
    }
5,502✔
2359
    else {
43,432✔
2360
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,432✔
2361
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,432✔
2362
                     "upload_client_version=%5, upload_server_version=%6, "
43,432✔
2363
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,432✔
2364
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,432✔
2365
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,432✔
2366
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,432✔
2367
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,432✔
2368
    }
43,432✔
2369

2370
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2371
    // changeset over and over again.
2372
    if (m_client_error) {
48,934✔
2373
        logger.debug("Ignoring download message because the client detected an integration error");
×
2374
        return Status::OK();
×
2375
    }
×
2376

2377
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
48,934✔
2378
    if (REALM_UNLIKELY(!legal_at_this_time)) {
48,934✔
UNCOV
2379
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
UNCOV
2380
    }
×
2381
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
48,934✔
2382
        logger.error("Bad sync progress received (%1)", status);
×
2383
        return status;
×
2384
    }
×
2385

2386
    version_type server_version = m_progress.download.server_version;
48,934✔
2387
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
48,934✔
2388
    for (const RemoteChangeset& changeset : message.changesets) {
50,698✔
2389
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2390
        // version must be increasing, but can stay the same during bootstraps.
2391
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
47,654✔
2392
                                                         : (changeset.remote_version > server_version);
47,654✔
2393
        // Each server version cannot be greater than the one in the header of the download message.
2394
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
47,654✔
2395
        if (!good_server_version) {
47,654✔
2396
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2397
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2398
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2399
        }
×
2400
        server_version = changeset.remote_version;
47,654✔
2401

2402
        // Check that per-changeset last integrated client version is "weakly"
2403
        // increasing.
2404
        bool good_client_version =
47,654✔
2405
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
47,654✔
2406
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
47,654✔
2407
        if (!good_client_version) {
47,654✔
2408
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2409
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2410
                                 "(%1, %2, %3)",
×
2411
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2412
                                 progress.download.last_integrated_client_version)};
×
2413
        }
×
2414
        last_integrated_client_version = changeset.last_integrated_local_version;
47,654✔
2415
        // Server shouldn't send our own changes, and zero is not a valid client
2416
        // file identifier.
2417
        bool good_file_ident =
47,654✔
2418
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
47,654✔
2419
        if (!good_file_ident) {
47,654✔
2420
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2421
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2422
                                 changeset.origin_file_ident)};
×
2423
        }
×
2424
    }
47,654✔
2425

2426
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
48,934✔
2427
                                       message.batch_state, message.changesets.size());
48,934✔
2428
    if (hook_action == SyncClientHookAction::EarlyReturn) {
48,934✔
2429
        return Status::OK();
24✔
2430
    }
24✔
2431
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
48,910✔
2432

2433
    if (process_flx_bootstrap_message(message)) {
48,910✔
2434
        clear_resumption_delay_state();
3,966✔
2435
        return Status::OK();
3,966✔
2436
    }
3,966✔
2437

2438
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
44,944✔
2439
                                  message.changesets); // Throws
44,944✔
2440

2441
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
44,944✔
2442
                                  message.batch_state, message.changesets.size());
44,944✔
2443
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,944✔
2444
        return Status::OK();
×
2445
    }
×
2446
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,944✔
2447

2448
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2449
    // after a retryable session error.
2450
    clear_resumption_delay_state();
44,944✔
2451
    return Status::OK();
44,944✔
2452
}
44,944✔
2453

2454
Status Session::receive_mark_message(request_ident_type request_ident)
2455
{
17,028✔
2456
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,028✔
2457

2458
    // Ignore the message if the deactivation process has been initiated,
2459
    // because in that case, the associated Realm and SessionWrapper must
2460
    // not be accessed any longer.
2461
    if (m_state != Active)
17,028✔
2462
        return Status::OK(); // Success
60✔
2463

2464
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,968✔
2465
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,968✔
2466
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
12✔
2467
    }
12✔
2468
    bool good_request_ident =
16,956✔
2469
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,956✔
2470
    if (REALM_UNLIKELY(!good_request_ident)) {
16,956✔
2471
        return {
×
2472
            ErrorCodes::SyncProtocolInvariantFailed,
×
2473
            util::format(
×
2474
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2475
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2476
    }
×
2477

2478
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,956✔
2479
    m_last_download_mark_received = request_ident;
16,956✔
2480
    check_for_download_completion(); // Throws
16,956✔
2481

2482
    return Status::OK(); // Success
16,956✔
2483
}
16,956✔
2484

2485

2486
// The caller (Connection) must discard the session if the session has become
2487
// deactivated upon return.
2488
Status Session::receive_unbound_message()
2489
{
3,976✔
2490
    logger.debug("Received: UNBOUND");
3,976✔
2491

2492
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,976✔
2493
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,976✔
2494
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2495
    }
×
2496

2497
    // The fact that the UNBIND message has been sent, but an ERROR message has
2498
    // not been received, implies that the deactivation process must have been
2499
    // initiated, so this session must be in the Deactivating state or the session
2500
    // has been suspended because of a client side error.
2501
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,976!
2502

2503
    m_unbound_message_received = true;
3,976✔
2504

2505
    // Detect completion of the unbinding process
2506
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,976✔
2507
        // The deactivation process completes when the unbinding process
2508
        // completes.
2509
        complete_deactivation(); // Throws
3,976✔
2510
        // Life cycle state is now Deactivated
2511
    }
3,976✔
2512

2513
    return Status::OK(); // Success
3,976✔
2514
}
3,976✔
2515

2516

2517
void Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2518
{
20✔
2519
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2520
    on_flx_sync_error(query_version, message); // throws
20✔
2521
}
20✔
2522

2523
// The caller (Connection) must discard the session if the session has become
2524
// deactivated upon return.
2525
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2526
{
910✔
2527
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
910✔
2528
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
910✔
2529

2530
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
910✔
2531
    if (REALM_UNLIKELY(!legal_at_this_time)) {
910✔
2532
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2533
    }
×
2534

2535
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
910✔
2536
    auto status = protocol_error_to_status(protocol_error, info.message);
910✔
2537
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
910✔
2538
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2539
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2540
                             info.raw_error_code)};
×
2541
    }
×
2542

2543
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2544
    // the SessionWrapper may not be available
2545
    if (m_state == Active) {
910✔
2546
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
896✔
2547
        if (debug_action == SyncClientHookAction::EarlyReturn) {
896✔
2548
            return Status::OK();
12✔
2549
        }
12✔
2550
    }
896✔
2551

2552
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2553
    // containing the compensating write has appeared in a download message.
2554
    if (status == ErrorCodes::SyncCompensatingWrite) {
898✔
2555
        // If the client is not active, the compensating writes will not be processed now, but will be
2556
        // sent again the next time the client connects
2557
        if (m_state == Active) {
60✔
2558
            REALM_ASSERT(info.compensating_write_server_version.has_value());
60✔
2559
            m_pending_compensating_write_errors.push_back(info);
60✔
2560
        }
60✔
2561
        return Status::OK();
60✔
2562
    }
60✔
2563

2564
    if (protocol_error == ProtocolError::schema_version_changed) {
838✔
2565
        // Enable upload immediately if the session is still active.
2566
        if (m_state == Active) {
70✔
2567
            auto wt = get_db()->start_write();
70✔
2568
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
70✔
2569
            wt->commit();
70✔
2570
            // Notify SyncSession a schema migration is required.
2571
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
70✔
2572
        }
70✔
2573
        // Keep the session active to upload any unsynced changes.
2574
        return Status::OK();
70✔
2575
    }
70✔
2576

2577
    m_error_message_received = true;
768✔
2578
    suspend(SessionErrorInfo{info, std::move(status)});
768✔
2579
    return Status::OK();
768✔
2580
}
838✔
2581

2582
void Session::suspend(const SessionErrorInfo& info)
2583
{
848✔
2584
    REALM_ASSERT(!m_suspended);
848✔
2585
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
848✔
2586
    logger.debug("Suspended"); // Throws
848✔
2587

2588
    m_suspended = true;
848✔
2589

2590
    // Detect completion of the unbinding process
2591
    if (m_unbind_message_send_complete && m_error_message_received) {
848✔
2592
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2593
        // we received an ERROR message implies that the deactivation process must
2594
        // have been initiated, so this session must be in the Deactivating state.
2595
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
14✔
2596

2597
        // The deactivation process completes when the unbinding process
2598
        // completes.
2599
        complete_deactivation(); // Throws
14✔
2600
        // Life cycle state is now Deactivated
2601
    }
14✔
2602

2603
    // Notify the application of the suspension of the session if the session is
2604
    // still in the Active state
2605
    if (m_state == Active) {
848✔
2606
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
834✔
2607
        m_conn.one_less_active_unsuspended_session(); // Throws
834✔
2608
        on_suspended(info);                           // Throws
834✔
2609
    }
834✔
2610

2611
    if (!info.is_fatal) {
848✔
2612
        begin_resumption_delay(info);
184✔
2613
    }
184✔
2614

2615
    // Ready to send the UNBIND message, if it has not been sent already
2616
    if (!m_unbind_message_sent)
848✔
2617
        ensure_enlisted_to_send(); // Throws
834✔
2618
}
848✔
2619

2620
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2621
{
64✔
2622
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
64✔
2623
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2624
                           [&](const PendingTestCommand& command) {
64✔
2625
                               return command.id == ident;
64✔
2626
                           });
64✔
2627
    if (it == m_pending_test_commands.end()) {
64✔
2628
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2629
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2630
    }
×
2631

2632
    it->promise.emplace_value(std::string{body});
64✔
2633
    m_pending_test_commands.erase(it);
64✔
2634

2635
    return Status::OK();
64✔
2636
}
64✔
2637

2638
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2639
{
184✔
2640
    REALM_ASSERT(!m_try_again_activation_timer);
184✔
2641

2642
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
184✔
2643
                                  error_info.resumption_delay_interval);
184✔
2644
    auto try_again_interval = m_try_again_delay_info.delay_interval();
184✔
2645
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
184✔
2646
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2647
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2648
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2649
        try_again_interval = std::chrono::milliseconds{1000};
148✔
2650
    }
148✔
2651
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
184✔
2652
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
184✔
2653
        if (status == ErrorCodes::OperationAborted)
184✔
2654
            return;
32✔
2655
        else if (!status.is_ok())
152✔
2656
            throw Exception(status);
×
2657

2658
        m_try_again_activation_timer.reset();
152✔
2659
        cancel_resumption_delay();
152✔
2660
    });
152✔
2661
}
184✔
2662

2663
void Session::clear_resumption_delay_state()
2664
{
48,908✔
2665
    if (m_try_again_activation_timer) {
48,908✔
2666
        logger.debug("Clearing resumption delay state after successful download");
×
2667
        m_try_again_delay_info.reset();
×
2668
    }
×
2669
}
48,908✔
2670

2671
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2672
{
48,934✔
2673
    const SyncProgress& a = m_progress;
48,934✔
2674
    const SyncProgress& b = progress;
48,934✔
2675
    std::string message;
48,934✔
2676
    if (b.latest_server_version.version < a.latest_server_version.version) {
48,934✔
2677
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2678
                               "session (current: %1, received: %2)",
×
2679
                               a.latest_server_version.version, b.latest_server_version.version);
×
2680
    }
×
2681
    if (b.upload.client_version < a.upload.client_version) {
48,934✔
2682
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2683
                               "throughout a session (current: %1, received: %2)",
×
2684
                               a.upload.client_version, b.upload.client_version);
×
2685
    }
×
2686
    if (b.upload.client_version > m_last_version_available) {
48,934✔
2687
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2688
                               "version in existence (current: %1, received: %2)",
×
2689
                               m_last_version_available, b.upload.client_version);
×
2690
    }
×
2691
    if (b.download.server_version < a.download.server_version) {
48,934✔
2692
        message =
×
2693
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2694
                         a.download.server_version, b.download.server_version);
×
2695
    }
×
2696
    if (b.download.server_version > b.latest_server_version.version) {
48,934✔
2697
        message = util::format(
×
2698
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2699
            b.download.server_version, b.latest_server_version.version);
×
2700
    }
×
2701
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
48,934✔
2702
        message = util::format(
×
2703
            "Last integrated client version on the server at the position in the server's history of the download "
×
2704
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2705
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2706
    }
×
2707
    if (b.download.last_integrated_client_version > b.upload.client_version) {
48,934✔
2708
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2709
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2710
                               "on the server (download: %1, upload: %2)",
×
2711
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2712
    }
×
2713
    if (b.download.server_version < b.upload.last_integrated_server_version) {
48,934✔
2714
        message = util::format(
×
2715
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2716
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2717
            b.download.server_version, b.upload.last_integrated_server_version);
×
2718
    }
×
2719

2720
    if (message.empty()) {
48,934✔
2721
        return Status::OK();
48,932✔
2722
    }
48,932✔
2723
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2724
}
48,934✔
2725

2726

2727
void Session::check_for_download_completion()
2728
{
64,260✔
2729
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
64,260✔
2730
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
64,260✔
2731
    if (m_last_download_mark_received == m_last_triggering_download_mark)
64,260✔
2732
        return;
47,064✔
2733
    if (m_last_download_mark_received < m_target_download_mark)
17,196✔
2734
        return;
368✔
2735
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,828✔
2736
        return;
×
2737
    m_last_triggering_download_mark = m_target_download_mark;
16,828✔
2738
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,828✔
2739
        // Activate the upload process now, and enable immediate reactivation
2740
        // after a subsequent fast reconnect.
2741
        m_delay_uploads = false;
4,758✔
2742
        ensure_enlisted_to_send(); // Throws
4,758✔
2743
    }
4,758✔
2744
    on_download_completion(); // Throws
16,828✔
2745
}
16,828✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc