• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2524

26 Jul 2024 08:18PM UTC coverage: 91.081% (-0.01%) from 91.092%
2524

push

Evergreen

web-flow
RCORE-2171 Add testing mode where baas integration tests always start out talking to the wrong base url (#7813)

102768 of 181518 branches covered (56.62%)

145 of 192 new or added lines in 3 files covered. (75.52%)

59 existing lines in 16 files now uncovered.

216541 of 237745 relevant lines covered (91.08%)

5836891.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.52
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/client_reset_operation.hpp>
10
#include <realm/sync/noinst/sync_schema_migration.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
24

25
#include <system_error>
26
#include <sstream>
27

28
// NOTE: The protocol specification is in `/doc/protocol.md`
29

30
using namespace realm;
31
using namespace _impl;
32
using namespace realm::util;
33
using namespace realm::sync;
34
using namespace realm::sync::websocket;
35

36
// clang-format off
37
using Connection      = ClientImpl::Connection;
38
using Session         = ClientImpl::Session;
39
using UploadChangeset = ClientHistory::UploadChangeset;
40

41
// These are a work-around for a bug in MSVC. It cannot find in-class types
42
// mentioned in signature of out-of-line member function definitions.
43
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
44
using OutputBuffer                = ClientImpl::OutputBuffer;
45
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
46
// clang-format on
47

48
void ClientImpl::ReconnectInfo::reset() noexcept
49
{
1,990✔
50
    m_backoff_state.reset();
1,990✔
51
    scheduled_reset = false;
1,990✔
52
}
1,990✔
53

54

55
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
56
                                       std::optional<ResumptionDelayInfo> new_delay_info)
57
{
3,854✔
58
    m_backoff_state.update(new_reason, new_delay_info);
3,854✔
59
}
3,854✔
60

61

62
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
63
{
6,194✔
64
    if (scheduled_reset) {
6,194✔
65
        reset();
8✔
66
    }
8✔
67

68
    if (!m_backoff_state.triggering_error) {
6,194✔
69
        return std::chrono::milliseconds::zero();
4,724✔
70
    }
4,724✔
71

72
    switch (*m_backoff_state.triggering_error) {
1,470✔
73
        case ConnectionTerminationReason::closed_voluntarily:
80✔
74
            return std::chrono::milliseconds::zero();
80✔
75
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
76
            return std::chrono::milliseconds::max();
18✔
77
        default:
1,372✔
78
            if (m_reconnect_mode == ReconnectMode::testing) {
1,372✔
79
                return std::chrono::milliseconds::max();
1,016✔
80
            }
1,016✔
81

82
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
356✔
83
            return m_backoff_state.delay_interval();
356✔
84
    }
1,470✔
85
}
1,470✔
86

87

88
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
89
                                      port_type& port, std::string& path) const
90
{
4,378✔
91
    util::Uri uri(url); // Throws
4,378✔
92
    uri.canonicalize(); // Throws
4,378✔
93
    std::string userinfo, address_2, port_2;
4,378✔
94
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,378✔
95
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,378✔
96
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,378✔
97
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,378✔
98
    if (REALM_UNLIKELY(!good))
4,378✔
99
        return false;
×
100
    ProtocolEnvelope protocol_2;
4,378✔
101
    port_type port_3;
4,378✔
102
    if (realm_scheme) {
4,378✔
103
        if (uri.get_scheme() == "realm:") {
×
104
            protocol_2 = ProtocolEnvelope::realm;
×
105
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
106
        }
×
107
        else {
×
108
            protocol_2 = ProtocolEnvelope::realms;
×
109
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
110
        }
×
111
    }
×
112
    else {
4,378✔
113
        REALM_ASSERT(ws_scheme);
4,378✔
114
        if (uri.get_scheme() == "ws:") {
4,378✔
115
            protocol_2 = ProtocolEnvelope::ws;
4,298✔
116
            port_3 = 80;
4,298✔
117
        }
4,298✔
118
        else {
80✔
119
            protocol_2 = ProtocolEnvelope::wss;
80✔
120
            port_3 = 443;
80✔
121
        }
80✔
122
    }
4,378✔
123
    if (!port_2.empty()) {
4,378✔
124
        std::istringstream in(port_2);    // Throws
4,274✔
125
        in.imbue(std::locale::classic()); // Throws
4,274✔
126
        in >> port_3;
4,274✔
127
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,274✔
128
            return false;
×
129
    }
4,274✔
130
    std::string path_2 = uri.get_path(); // Throws (copy)
4,378✔
131

132
    protocol = protocol_2;
4,378✔
133
    address = std::move(address_2);
4,378✔
134
    port = port_3;
4,378✔
135
    path = std::move(path_2);
4,378✔
136
    return true;
4,378✔
137
}
4,378✔
138

139
ClientImpl::ClientImpl(ClientConfig config)
140
    : logger(std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger)))
4,912✔
141
    , m_reconnect_mode{config.reconnect_mode}
4,912✔
142
    , m_connect_timeout{config.connect_timeout}
4,912✔
143
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,912✔
144
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,912✔
145
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,912✔
146
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,912✔
147
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,912✔
148
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,912✔
149
    , m_dry_run{config.dry_run}
4,912✔
150
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,912✔
151
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,912✔
152
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,912✔
153
    , m_socket_provider{std::move(config.socket_provider)}
4,912✔
154
    , m_client_protocol{} // Throws
4,912✔
155
    , m_one_connection_per_session{config.one_connection_per_session}
4,912✔
156
    , m_random{}
4,912✔
157
{
9,962✔
158
    // FIXME: Would be better if seeding was up to the application.
159
    util::seed_prng_nondeterministically(m_random); // Throws
9,962✔
160

161
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,962✔
162
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,962✔
163
                 get_current_protocol_version()); // Throws
9,962✔
164
    logger.info("Platform: %1", util::get_platform_info());
9,962✔
165
    const char* build_mode;
9,962✔
166
#if REALM_DEBUG
9,962✔
167
    build_mode = "Debug";
9,962✔
168
#else
169
    build_mode = "Release";
170
#endif
171
    logger.debug("Build mode: %1", build_mode);
9,962✔
172
    logger.debug("Config param: one_connection_per_session = %1",
9,962✔
173
                 config.one_connection_per_session); // Throws
9,962✔
174
    logger.debug("Config param: connect_timeout = %1 ms",
9,962✔
175
                 config.connect_timeout); // Throws
9,962✔
176
    logger.debug("Config param: connection_linger_time = %1 ms",
9,962✔
177
                 config.connection_linger_time); // Throws
9,962✔
178
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,962✔
179
                 config.ping_keepalive_period); // Throws
9,962✔
180
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,962✔
181
                 config.pong_keepalive_timeout); // Throws
9,962✔
182
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,962✔
183
                 config.fast_reconnect_limit); // Throws
9,962✔
184
    logger.debug("Config param: disable_sync_to_disk = %1",
9,962✔
185
                 config.disable_sync_to_disk); // Throws
9,962✔
186
    logger.debug(
9,962✔
187
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,962✔
188
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,962✔
189
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,962✔
190
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,962✔
191

192
    if (config.reconnect_mode != ReconnectMode::normal) {
9,962✔
193
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
194
                    "never do this in production!");
776✔
195
    }
776✔
196

197
    if (config.dry_run) {
9,962✔
198
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
199
                    "never do this in production!");
×
200
    }
×
201

202
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,962✔
203

204
    if (m_one_connection_per_session) {
9,962✔
205
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
206
                    "never do this in production");
4✔
207
    }
4✔
208

209
    if (config.disable_upload_activation_delay) {
9,962✔
210
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
211
                    "never do this in production");
×
212
    }
×
213

214
    if (config.disable_sync_to_disk) {
9,962✔
215
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

219
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,778✔
220
        if (status == ErrorCodes::OperationAborted)
14,778✔
221
            return;
×
222
        else if (!status.is_ok())
14,778✔
223
            throw Exception(status);
×
224
        actualize_and_finalize_session_wrappers(); // Throws
14,778✔
225
    });
14,778✔
226
}
9,962✔
227

228
void ClientImpl::incr_outstanding_posts()
229
{
207,700✔
230
    util::CheckedLockGuard lock(m_drain_mutex);
207,700✔
231
    ++m_outstanding_posts;
207,700✔
232
    m_drained = false;
207,700✔
233
}
207,700✔
234

235
void ClientImpl::decr_outstanding_posts()
236
{
207,702✔
237
    util::CheckedLockGuard lock(m_drain_mutex);
207,702✔
238
    REALM_ASSERT(m_outstanding_posts);
207,702✔
239
    if (--m_outstanding_posts <= 0) {
207,702✔
240
        // Notify must happen with lock held or another thread could destroy
241
        // ClientImpl between when we release the lock and when we call notify
242
        m_drain_cv.notify_all();
18,240✔
243
    }
18,240✔
244
}
207,702✔
245

246
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
247
{
56,488✔
248
    REALM_ASSERT(m_socket_provider);
56,488✔
249
    incr_outstanding_posts();
56,488✔
250
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
56,492✔
251
        auto decr_guard = util::make_scope_exit([&]() noexcept {
56,492✔
252
            decr_outstanding_posts();
56,492✔
253
        });
56,492✔
254
        handler(status);
56,492✔
255
    });
56,492✔
256
}
56,488✔
257

258
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
259
{
133,106✔
260
    REALM_ASSERT(m_socket_provider);
133,106✔
261
    incr_outstanding_posts();
133,106✔
262
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
133,108✔
263
        auto decr_guard = util::make_scope_exit([&]() noexcept {
133,108✔
264
            decr_outstanding_posts();
133,104✔
265
        });
133,104✔
266
        if (status == ErrorCodes::OperationAborted)
133,108✔
267
            return;
×
268
        if (!status.is_ok())
133,108✔
269
            throw Exception(status);
×
270
        handler();
133,108✔
271
    });
133,108✔
272
}
133,106✔
273

274

275
void ClientImpl::drain_connections()
276
{
9,962✔
277
    logger.debug("Draining connections during sync client shutdown");
9,962✔
278
    for (auto& server_slot_pair : m_server_slots) {
9,962✔
279
        auto& server_slot = server_slot_pair.second;
2,728✔
280

281
        if (server_slot.connection) {
2,728✔
282
            auto& conn = server_slot.connection;
2,500✔
283
            conn->force_close();
2,500✔
284
        }
2,500✔
285
        else {
228✔
286
            for (auto& conn_pair : server_slot.alt_connections) {
228✔
287
                conn_pair.second->force_close();
×
288
            }
×
289
        }
228✔
290
    }
2,728✔
291
}
9,962✔
292

293

294
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
295
                                                       SyncSocketProvider::FunctionHandler&& handler)
296
{
18,102✔
297
    REALM_ASSERT(m_socket_provider);
18,102✔
298
    incr_outstanding_posts();
18,102✔
299
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
18,106✔
300
        auto decr_guard = util::make_scope_exit([&]() noexcept {
18,108✔
301
            decr_outstanding_posts();
18,106✔
302
        });
18,106✔
303
        handler(status);
18,106✔
304
    });
18,106✔
305
}
18,102✔
306

307

308
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
309
{
12,796✔
310
    REALM_ASSERT(m_socket_provider);
12,796✔
311
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,796✔
312
}
12,796✔
313

314
Connection::~Connection()
315
{
2,832✔
316
    if (m_websocket_sentinel) {
2,832✔
317
        m_websocket_sentinel->destroyed = true;
×
318
        m_websocket_sentinel.reset();
×
319
    }
×
320
}
2,832✔
321

322
void Connection::activate()
323
{
2,832✔
324
    REALM_ASSERT(m_on_idle);
2,832✔
325
    m_activated = true;
2,832✔
326
    if (m_num_active_sessions == 0)
2,832✔
327
        m_on_idle->trigger();
×
328
    // We cannot in general connect immediately, because a prior failure to
329
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
330
    initiate_reconnect_wait(); // Throws
2,832✔
331
}
2,832✔
332

333

334
void Connection::activate_session(std::unique_ptr<Session> sess)
335
{
10,404✔
336
    REALM_ASSERT(sess);
10,404✔
337
    REALM_ASSERT(&sess->m_conn == this);
10,404✔
338
    REALM_ASSERT(!m_force_closed);
10,404✔
339
    Session& sess_2 = *sess;
10,404✔
340
    session_ident_type ident = sess->m_ident;
10,404✔
341
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,404✔
342
    bool was_inserted = p.second;
10,404✔
343
    REALM_ASSERT(was_inserted);
10,404✔
344
    // Save the session ident to the historical list of session idents
345
    m_session_history.insert(ident);
10,404✔
346
    sess_2.activate(); // Throws
10,404✔
347
    if (m_state == ConnectionState::connected) {
10,404✔
348
        bool fast_reconnect = false;
7,272✔
349
        sess_2.connection_established(fast_reconnect); // Throws
7,272✔
350
    }
7,272✔
351
    ++m_num_active_sessions;
10,404✔
352
}
10,404✔
353

354

355
void Connection::initiate_session_deactivation(Session* sess)
356
{
10,408✔
357
    REALM_ASSERT(sess);
10,408✔
358
    REALM_ASSERT(&sess->m_conn == this);
10,408✔
359
    REALM_ASSERT(m_num_active_sessions);
10,408✔
360
    // Since the client may be waiting for m_num_active_sessions to reach 0
361
    // in stop_and_wait() (on a separate thread), deactivate Session before
362
    // decrementing the num active sessions value.
363
    sess->initiate_deactivation(); // Throws
10,408✔
364
    if (sess->m_state == Session::Deactivated) {
10,408✔
365
        finish_session_deactivation(sess);
952✔
366
    }
952✔
367
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,408✔
368
        if (m_activated && m_state == ConnectionState::disconnected)
4,634✔
369
            m_on_idle->trigger();
376✔
370
    }
4,634✔
371
}
10,408✔
372

373

374
void Connection::cancel_reconnect_delay()
375
{
2,214✔
376
    REALM_ASSERT(m_activated);
2,214✔
377

378
    if (m_reconnect_delay_in_progress) {
2,214✔
379
        if (m_nonzero_reconnect_delay)
1,978✔
380
            logger.detail("Canceling reconnect delay"); // Throws
992✔
381

382
        // Cancel the in-progress wait operation by destroying the timer
383
        // object. Destruction is needed in this case, because a new wait
384
        // operation might have to be initiated before the previous one
385
        // completes (its completion handler starts to execute), so the new wait
386
        // operation must be done on a new timer object.
387
        m_reconnect_disconnect_timer.reset();
1,978✔
388
        m_reconnect_delay_in_progress = false;
1,978✔
389
        m_reconnect_info.reset();
1,978✔
390
        initiate_reconnect_wait(); // Throws
1,978✔
391
        return;
1,978✔
392
    }
1,978✔
393

394
    // If we are not disconnected, then we need to make sure the next time we get disconnected
395
    // that we are allowed to re-connect as quickly as possible.
396
    //
397
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
398
    // backoff/delay state before calculating the next delay, unless a PONG message is received
399
    // for the urgent PING message we send below.
400
    //
401
    // If we get a PONG message for the urgent PING message sent below, then the connection is
402
    // healthy and we can calculate the next delay normally.
403
    if (m_state != ConnectionState::disconnected) {
236✔
404
        m_reconnect_info.scheduled_reset = true;
236✔
405
        m_ping_after_scheduled_reset_of_reconnect_info = false;
236✔
406

407
        schedule_urgent_ping(); // Throws
236✔
408
        return;
236✔
409
    }
236✔
410
    // Nothing to do in this case. The next reconnect attemp will be made as
411
    // soon as there are any sessions that are both active and unsuspended.
412
}
236✔
413

414
void Connection::finish_session_deactivation(Session* sess)
415
{
8,304✔
416
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,304✔
417
    auto ident = sess->m_ident;
8,304✔
418
    m_sessions.erase(ident);
8,304✔
419
    m_session_history.erase(ident);
8,304✔
420
}
8,304✔
421

422
void Connection::force_close()
423
{
2,500✔
424
    if (m_force_closed) {
2,500✔
425
        return;
×
426
    }
×
427

428
    m_force_closed = true;
2,500✔
429

430
    if (m_state != ConnectionState::disconnected) {
2,500✔
431
        voluntary_disconnect();
2,468✔
432
    }
2,468✔
433

434
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,500✔
435
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,500✔
436
        m_reconnect_disconnect_timer.reset();
32✔
437
        m_reconnect_delay_in_progress = false;
32✔
438
        m_disconnect_delay_in_progress = false;
32✔
439
    }
32✔
440

441
    // We must copy any session pointers we want to close to a vector because force_closing
442
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
443
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
444
    std::vector<Session*> to_close;
2,500✔
445
    for (auto& session_pair : m_sessions) {
2,500✔
446
        if (session_pair.second->m_state == Session::State::Active) {
102✔
447
            to_close.push_back(session_pair.second.get());
102✔
448
        }
102✔
449
    }
102✔
450

451
    for (auto& sess : to_close) {
2,500✔
452
        sess->force_close();
102✔
453
    }
102✔
454

455
    logger.debug("Force closed idle connection");
2,500✔
456
}
2,500✔
457

458

459
void Connection::websocket_connected_handler(const std::string& protocol)
460
{
3,654✔
461
    if (!protocol.empty()) {
3,654✔
462
        std::string_view expected_prefix =
3,654✔
463
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,654✔
464
        // FIXME: Use std::string_view::begins_with() in C++20.
465
        auto prefix_matches = [&](std::string_view other) {
3,654✔
466
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,654✔
467
        };
3,654✔
468
        if (prefix_matches(expected_prefix)) {
3,654✔
469
            util::MemoryInputStream in;
3,654✔
470
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,654✔
471
            in.imbue(std::locale::classic());
3,654✔
472
            in.unsetf(std::ios_base::skipws);
3,654✔
473
            int value_2 = 0;
3,654✔
474
            in >> value_2;
3,654✔
475
            if (in && in.eof() && value_2 >= 0) {
3,654✔
476
                bool good_version =
3,654✔
477
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,654✔
478
                if (good_version) {
3,654✔
479
                    logger.detail("Negotiated protocol version: %1", value_2);
3,654✔
480
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
481
                    // will provide the appservices connection ID via a log message.
482
                    // TODO: Remove once the server starts sending the connection ID
483
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,654✔
484
                    m_negotiated_protocol_version = value_2;
3,654✔
485
                    handle_connection_established(); // Throws
3,654✔
486
                    return;
3,654✔
487
                }
3,654✔
488
            }
3,654✔
489
        }
3,654✔
490
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
491
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
492
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
493
    }
×
494
    else {
×
495
        close_due_to_client_side_error(
×
496
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
497
            ConnectionTerminationReason::bad_headers_in_http_response);
×
498
    }
×
499
}
3,654✔
500

501

502
bool Connection::websocket_binary_message_received(util::Span<const char> data)
503
{
82,276✔
504
    if (m_force_closed) {
82,276✔
505
        logger.debug("Received binary message after connection was force closed");
×
506
        return false;
×
507
    }
×
508

509
    using sf = SimulatedFailure;
82,276✔
510
    if (sf::check_trigger(sf::sync_client__read_head)) {
82,276✔
511
        close_due_to_client_side_error(
472✔
512
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
472✔
513
            ConnectionTerminationReason::read_or_write_error);
472✔
514
        return bool(m_websocket);
472✔
515
    }
472✔
516

517
    handle_message_received(data);
81,804✔
518
    return bool(m_websocket);
81,804✔
519
}
82,276✔
520

521

522
void Connection::websocket_error_handler()
523
{
722✔
524
    m_websocket_error_received = true;
722✔
525
}
722✔
526

527
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
528
{
842✔
529
    if (m_force_closed) {
842✔
530
        logger.debug("Received websocket close message after connection was force closed");
×
531
        return false;
×
532
    }
×
533
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
842✔
534

535
    switch (error_code) {
842✔
536
        case WebSocketError::websocket_ok:
70✔
537
            break;
70✔
538
        case WebSocketError::websocket_resolve_failed:
4✔
539
            [[fallthrough]];
4✔
540
        case WebSocketError::websocket_connection_failed: {
112✔
541
            SessionErrorInfo error_info(
112✔
542
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
543
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
544
            // to make sure the websocket URL is correct
545
            if (!m_server_endpoint.is_verified) {
112✔
546
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
547
            }
84✔
548
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
549
            break;
112✔
550
        }
4✔
551
        case WebSocketError::websocket_read_error:
588✔
552
            [[fallthrough]];
588✔
553
        case WebSocketError::websocket_write_error: {
588✔
554
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
588✔
555
                                         ConnectionTerminationReason::read_or_write_error);
588✔
556
            break;
588✔
557
        }
588✔
558
        case WebSocketError::websocket_going_away:
✔
559
            [[fallthrough]];
×
560
        case WebSocketError::websocket_protocol_error:
✔
561
            [[fallthrough]];
×
562
        case WebSocketError::websocket_unsupported_data:
✔
563
            [[fallthrough]];
×
564
        case WebSocketError::websocket_invalid_payload_data:
✔
565
            [[fallthrough]];
×
566
        case WebSocketError::websocket_policy_violation:
✔
567
            [[fallthrough]];
×
568
        case WebSocketError::websocket_reserved:
✔
569
            [[fallthrough]];
×
570
        case WebSocketError::websocket_no_status_received:
✔
571
            [[fallthrough]];
×
572
        case WebSocketError::websocket_invalid_extension: {
✔
573
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
574
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
575
            break;
×
576
        }
×
577
        case WebSocketError::websocket_message_too_big: {
4✔
578
            auto message = util::format(
4✔
579
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
580
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
581
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
582
            involuntary_disconnect(std::move(error_info),
4✔
583
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
584
            break;
4✔
585
        }
×
586
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
587
            close_due_to_client_side_error(
10✔
588
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
589
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
590
            break;
10✔
591
        }
×
592
        case WebSocketError::websocket_fatal_error: {
✔
593
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
594
            // been verified, then use a non-fatal error and try to perform a location update.
595
            SessionErrorInfo error_info(
×
596
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
597
                IsFatal{m_server_endpoint.is_verified});
×
598
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
599
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
600
            // to make sure the websocket URL is correct
601
            if (!m_server_endpoint.is_verified) {
×
602
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
603
                reason = ConnectionTerminationReason::connect_operation_failed;
×
604
            }
×
605
            involuntary_disconnect(std::move(error_info), reason);
×
606
            break;
×
607
        }
×
608
        case WebSocketError::websocket_forbidden: {
✔
609
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
610
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
611
            involuntary_disconnect(std::move(error_info),
×
612
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
613
            break;
×
614
        }
×
615
        case WebSocketError::websocket_unauthorized: {
44✔
616
            SessionErrorInfo error_info(
44✔
617
                {ErrorCodes::AuthError,
44✔
618
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
619
                IsFatal{false});
44✔
620
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
621
            involuntary_disconnect(std::move(error_info),
44✔
622
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
623
            break;
44✔
624
        }
×
625
        case WebSocketError::websocket_moved_permanently: {
14✔
626
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
14✔
627
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
14✔
628
            involuntary_disconnect(std::move(error_info),
14✔
629
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
14✔
630
            break;
14✔
631
        }
×
632
        case WebSocketError::websocket_abnormal_closure: {
✔
633
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
634
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
635
            involuntary_disconnect(std::move(error_info),
×
636
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
637
            break;
×
638
        }
×
639
        case WebSocketError::websocket_internal_server_error:
✔
640
            [[fallthrough]];
×
641
        case WebSocketError::websocket_retry_error: {
✔
642
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
643
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
644
            break;
×
645
        }
×
646
    }
842✔
647

648
    return bool(m_websocket);
842✔
649
}
842✔
650

651
// Guarantees that handle_reconnect_wait() is never called from within the
652
// execution of initiate_reconnect_wait() (no callback reentrance).
653
void Connection::initiate_reconnect_wait()
654
{
8,668✔
655
    REALM_ASSERT(m_activated);
8,668✔
656
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,668✔
657
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,668✔
658

659
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
660
    if (m_force_closed) {
8,668✔
661
        return;
2,468✔
662
    }
2,468✔
663

664
    m_reconnect_delay_in_progress = true;
6,200✔
665
    auto delay = m_reconnect_info.delay_interval();
6,200✔
666
    if (delay == std::chrono::milliseconds::max()) {
6,200✔
667
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,034✔
668
        // Not actually starting a timer corresponds to an infinite wait
669
        m_nonzero_reconnect_delay = true;
1,034✔
670
        return;
1,034✔
671
    }
1,034✔
672

673
    if (delay == std::chrono::milliseconds::zero()) {
5,166✔
674
        m_nonzero_reconnect_delay = false;
4,808✔
675
    }
4,808✔
676
    else {
358✔
677
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
358✔
678
        m_nonzero_reconnect_delay = true;
358✔
679
    }
358✔
680

681
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
682
    // we need it to be cancelable in case the connection is terminated before the timer
683
    // callback is run.
684
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,166✔
685
        // If the operation is aborted, the connection object may have been
686
        // destroyed.
687
        if (status != ErrorCodes::OperationAborted)
5,164✔
688
            handle_reconnect_wait(status); // Throws
3,852✔
689
    });                                    // Throws
5,164✔
690
}
5,166✔
691

692

693
void Connection::handle_reconnect_wait(Status status)
694
{
3,852✔
695
    if (!status.is_ok()) {
3,852✔
696
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
697
        throw Exception(status);
×
698
    }
×
699

700
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,852✔
701
    m_reconnect_delay_in_progress = false;
3,852✔
702

703
    if (m_num_active_unsuspended_sessions > 0)
3,852✔
704
        initiate_reconnect(); // Throws
3,842✔
705
}
3,852✔
706

707
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
708
    explicit WebSocketObserverShim(Connection* conn)
709
        : conn(conn)
1,774✔
710
        , sentinel(conn->m_websocket_sentinel)
1,774✔
711
    {
3,854✔
712
    }
3,854✔
713

714
    Connection* conn;
715
    util::bind_ptr<LifecycleSentinel> sentinel;
716

717
    void websocket_connected_handler(const std::string& protocol) override
718
    {
3,654✔
719
        if (sentinel->destroyed) {
3,654✔
720
            return;
×
721
        }
×
722

723
        return conn->websocket_connected_handler(protocol);
3,654✔
724
    }
3,654✔
725

726
    void websocket_error_handler() override
727
    {
722✔
728
        if (sentinel->destroyed) {
722✔
729
            return;
×
730
        }
×
731

732
        conn->websocket_error_handler();
722✔
733
    }
722✔
734

735
    bool websocket_binary_message_received(util::Span<const char> data) override
736
    {
82,274✔
737
        if (sentinel->destroyed) {
82,274✔
738
            return false;
×
739
        }
×
740

741
        return conn->websocket_binary_message_received(data);
82,274✔
742
    }
82,274✔
743

744
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
745
    {
842✔
746
        if (sentinel->destroyed) {
842✔
747
            return true;
×
748
        }
×
749

750
        return conn->websocket_closed_handler(was_clean, error_code, msg);
842✔
751
    }
842✔
752
};
753

754
void Connection::initiate_reconnect()
755
{
3,852✔
756
    REALM_ASSERT(m_activated);
3,852✔
757

758
    m_state = ConnectionState::connecting;
3,852✔
759
    report_connection_state_change(ConnectionState::connecting); // Throws
3,852✔
760
    if (m_websocket_sentinel) {
3,852✔
761
        m_websocket_sentinel->destroyed = true;
×
762
    }
×
763
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,852✔
764
    m_websocket.reset();
3,852✔
765

766
    // Watchdog
767
    initiate_connect_wait(); // Throws
3,852✔
768

769
    std::vector<std::string> sec_websocket_protocol;
3,852✔
770
    {
3,852✔
771
        auto protocol_prefix =
3,852✔
772
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,852✔
773
        int min = get_oldest_supported_protocol_version();
3,852✔
774
        int max = get_current_protocol_version();
3,852✔
775
        REALM_ASSERT_3(min, <=, max);
3,852✔
776
        // List protocol version in descending order to ensure that the server
777
        // selects the highest possible version.
778
        for (int version = max; version >= min; --version) {
53,930✔
779
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
50,078✔
780
        }
50,078✔
781
    }
3,852✔
782

783
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,852✔
784
                m_server_endpoint.port, m_http_request_path_prefix);
3,852✔
785

786
    m_websocket_error_received = false;
3,852✔
787
    m_websocket =
3,852✔
788
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,852✔
789
                                            WebSocketEndpoint{
3,852✔
790
                                                m_server_endpoint.address,
3,852✔
791
                                                m_server_endpoint.port,
3,852✔
792
                                                get_http_request_path(),
3,852✔
793
                                                std::move(sec_websocket_protocol),
3,852✔
794
                                                is_ssl(m_server_endpoint.envelope),
3,852✔
795
                                                /// DEPRECATED - The following will be removed in a future release
796
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,852✔
797
                                                m_verify_servers_ssl_certificate,
3,852✔
798
                                                m_ssl_trust_certificate_path,
3,852✔
799
                                                m_ssl_verify_callback,
3,852✔
800
                                                m_proxy_config,
3,852✔
801
                                            });
3,852✔
802
}
3,852✔
803

804

805
void Connection::initiate_connect_wait()
806
{
3,852✔
807
    // Deploy a watchdog to enforce an upper bound on the time it can take to
808
    // fully establish the connection (including SSL and WebSocket
809
    // handshakes). Without such a watchdog, connect operations could take very
810
    // long, or even indefinite time.
811
    milliseconds_type time = m_client.m_connect_timeout;
3,852✔
812

813
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,854✔
814
        // If the operation is aborted, the connection object may have been
815
        // destroyed.
816
        if (status != ErrorCodes::OperationAborted)
3,852✔
817
            handle_connect_wait(status); // Throws
×
818
    });                                  // Throws
3,852✔
819
}
3,852✔
820

821

822
void Connection::handle_connect_wait(Status status)
823
{
×
824
    if (!status.is_ok()) {
×
825
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
826
        throw Exception(status);
×
827
    }
×
828

829
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
830
    logger.info("Connect timeout"); // Throws
×
831
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
832
                                IsFatal{false});
×
833
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
834
    // to make sure the websocket URL is correct
835
    if (!m_server_endpoint.is_verified) {
×
836
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
837
    }
×
838
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
839
}
×
840

841

842
void Connection::handle_connection_established()
843
{
3,654✔
844
    // Cancel connect timeout watchdog
845
    m_connect_timer.reset();
3,654✔
846

847
    m_state = ConnectionState::connected;
3,654✔
848
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,654✔
849

850
    milliseconds_type now = monotonic_clock_now();
3,654✔
851
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,654✔
852
    initiate_ping_delay(now);     // Throws
3,654✔
853

854
    bool fast_reconnect = false;
3,654✔
855
    if (m_disconnect_has_occurred) {
3,654✔
856
        milliseconds_type time = now - m_disconnect_time;
1,072✔
857
        if (time <= m_client.m_fast_reconnect_limit)
1,072✔
858
            fast_reconnect = true;
1,072✔
859
    }
1,072✔
860

861
    for (auto& p : m_sessions) {
4,830✔
862
        Session& sess = *p.second;
4,830✔
863
        sess.connection_established(fast_reconnect); // Throws
4,830✔
864
    }
4,830✔
865

866
    report_connection_state_change(ConnectionState::connected); // Throws
3,654✔
867
}
3,654✔
868

869

870
void Connection::schedule_urgent_ping()
871
{
236✔
872
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
236✔
873
    if (m_ping_delay_in_progress) {
236✔
874
        m_heartbeat_timer.reset();
124✔
875
        m_ping_delay_in_progress = false;
124✔
876
        m_minimize_next_ping_delay = true;
124✔
877
        milliseconds_type now = monotonic_clock_now();
124✔
878
        initiate_ping_delay(now); // Throws
124✔
879
        return;
124✔
880
    }
124✔
881
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
112✔
882
    if (!m_send_ping)
112✔
883
        m_minimize_next_ping_delay = true;
112✔
884
}
112✔
885

886

887
void Connection::initiate_ping_delay(milliseconds_type now)
888
{
3,944✔
889
    REALM_ASSERT(!m_ping_delay_in_progress);
3,944✔
890
    REALM_ASSERT(!m_waiting_for_pong);
3,944✔
891
    REALM_ASSERT(!m_send_ping);
3,944✔
892

893
    milliseconds_type delay = 0;
3,944✔
894
    if (!m_minimize_next_ping_delay) {
3,944✔
895
        delay = m_client.m_ping_keepalive_period;
3,804✔
896
        // Make a randomized deduction of up to 10%, or up to 100% if this is
897
        // the first PING message to be sent since the connection was
898
        // established. The purpose of this randomized deduction is to reduce
899
        // the risk of many connections sending PING messages simultaneously to
900
        // the server.
901
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,804✔
902
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,804✔
903
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,804✔
904
        delay -= randomized_deduction;
3,804✔
905
        // Deduct the time spent waiting for PONG
906
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,804✔
907
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,804✔
908
        if (spent_time < delay) {
3,804✔
909
            delay -= spent_time;
3,796✔
910
        }
3,796✔
911
        else {
8✔
912
            delay = 0;
8✔
913
        }
8✔
914
    }
3,804✔
915
    else {
140✔
916
        m_minimize_next_ping_delay = false;
140✔
917
    }
140✔
918

919

920
    m_ping_delay_in_progress = true;
3,944✔
921

922
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,944✔
923
        if (status == ErrorCodes::OperationAborted)
3,944✔
924
            return;
3,756✔
925
        else if (!status.is_ok())
188✔
926
            throw Exception(status);
×
927

928
        handle_ping_delay();                                    // Throws
188✔
929
    });                                                         // Throws
188✔
930
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,944✔
931
}
3,944✔
932

933

934
void Connection::handle_ping_delay()
935
{
186✔
936
    REALM_ASSERT(m_ping_delay_in_progress);
186✔
937
    m_ping_delay_in_progress = false;
186✔
938
    m_send_ping = true;
186✔
939

940
    initiate_pong_timeout(); // Throws
186✔
941

942
    if (m_state == ConnectionState::connected && !m_sending)
186✔
943
        send_next_message(); // Throws
128✔
944
}
186✔
945

946

947
void Connection::initiate_pong_timeout()
948
{
186✔
949
    REALM_ASSERT(!m_ping_delay_in_progress);
186✔
950
    REALM_ASSERT(!m_waiting_for_pong);
186✔
951
    REALM_ASSERT(m_send_ping);
186✔
952

953
    m_waiting_for_pong = true;
186✔
954
    m_pong_wait_started_at = monotonic_clock_now();
186✔
955

956
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
186✔
957
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
186✔
958
        if (status == ErrorCodes::OperationAborted)
186✔
959
            return;
174✔
960
        else if (!status.is_ok())
12✔
961
            throw Exception(status);
×
962

963
        handle_pong_timeout(); // Throws
12✔
964
    });                        // Throws
12✔
965
}
186✔
966

967

968
void Connection::handle_pong_timeout()
969
{
12✔
970
    REALM_ASSERT(m_waiting_for_pong);
12✔
971
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
972
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
973
                                 ConnectionTerminationReason::pong_timeout);
12✔
974
}
12✔
975

976

977
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
978
{
99,000✔
979
    // Stop sending messages if an websocket error was received.
980
    if (m_websocket_error_received)
99,000✔
981
        return;
×
982

983
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
99,004✔
984
        if (sentinel->destroyed) {
98,924✔
985
            return;
1,478✔
986
        }
1,478✔
987
        if (!status.is_ok()) {
97,446✔
988
            if (status != ErrorCodes::Error::OperationAborted) {
×
989
                // Write errors will be handled by the websocket_write_error_handler() callback
990
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
991
            }
×
992
            return;
×
993
        }
×
994
        handle_write_message(); // Throws
97,446✔
995
    });                         // Throws
97,446✔
996
    m_sending_session = sess;
99,000✔
997
    m_sending = true;
99,000✔
998
}
99,000✔
999

1000

1001
void Connection::handle_write_message()
1002
{
97,444✔
1003
    m_sending_session->message_sent(); // Throws
97,444✔
1004
    if (m_sending_session->m_state == Session::Deactivated) {
97,444✔
1005
        finish_session_deactivation(m_sending_session);
132✔
1006
    }
132✔
1007
    m_sending_session = nullptr;
97,444✔
1008
    m_sending = false;
97,444✔
1009
    send_next_message(); // Throws
97,444✔
1010
}
97,444✔
1011

1012

1013
void Connection::send_next_message()
1014
{
175,306✔
1015
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
175,306✔
1016
    REALM_ASSERT(!m_sending_session);
175,306✔
1017
    REALM_ASSERT(!m_sending);
175,306✔
1018
    if (m_send_ping) {
175,306✔
1019
        send_ping(); // Throws
174✔
1020
        return;
174✔
1021
    }
174✔
1022
    while (!m_sessions_enlisted_to_send.empty()) {
253,760✔
1023
        // The state of being connected is not supposed to be able to change
1024
        // across this loop thanks to the "no callback reentrance" guarantee
1025
        // provided by Websocket::async_write_text(), and friends.
1026
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
178,028✔
1027

1028
        Session& sess = *m_sessions_enlisted_to_send.front();
178,028✔
1029
        m_sessions_enlisted_to_send.pop_front();
178,028✔
1030
        sess.send_message(); // Throws
178,028✔
1031

1032
        if (sess.m_state == Session::Deactivated) {
178,028✔
1033
            finish_session_deactivation(&sess);
2,834✔
1034
        }
2,834✔
1035

1036
        // An enlisted session may choose to not send a message. In that case,
1037
        // we should pass the opportunity to the next enlisted session.
1038
        if (m_sending)
178,028✔
1039
            break;
99,400✔
1040
    }
178,028✔
1041
}
175,132✔
1042

1043

1044
void Connection::send_ping()
1045
{
174✔
1046
    REALM_ASSERT(!m_ping_delay_in_progress);
174✔
1047
    REALM_ASSERT(m_waiting_for_pong);
174✔
1048
    REALM_ASSERT(m_send_ping);
174✔
1049

1050
    m_send_ping = false;
174✔
1051
    if (m_reconnect_info.scheduled_reset)
174✔
1052
        m_ping_after_scheduled_reset_of_reconnect_info = true;
134✔
1053

1054
    m_last_ping_sent_at = monotonic_clock_now();
174✔
1055
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
174✔
1056
                 m_previous_ping_rtt); // Throws
174✔
1057

1058
    ClientProtocol& protocol = get_client_protocol();
174✔
1059
    OutputBuffer& out = get_output_buffer();
174✔
1060
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
174✔
1061
    initiate_write_ping(out);                                          // Throws
174✔
1062
    m_ping_sent = true;
174✔
1063
}
174✔
1064

1065

1066
void Connection::initiate_write_ping(const OutputBuffer& out)
1067
{
174✔
1068
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
174✔
1069
        if (sentinel->destroyed) {
174✔
1070
            return;
2✔
1071
        }
2✔
1072
        if (!status.is_ok()) {
172✔
1073
            if (status != ErrorCodes::Error::OperationAborted) {
×
1074
                // Write errors will be handled by the websocket_write_error_handler() callback
1075
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1076
            }
×
1077
            return;
×
1078
        }
×
1079
        handle_write_ping(); // Throws
172✔
1080
    });                      // Throws
172✔
1081
    m_sending = true;
174✔
1082
}
174✔
1083

1084

1085
void Connection::handle_write_ping()
1086
{
172✔
1087
    REALM_ASSERT(m_sending);
172✔
1088
    REALM_ASSERT(!m_sending_session);
172✔
1089
    m_sending = false;
172✔
1090
    send_next_message(); // Throws
172✔
1091
}
172✔
1092

1093

1094
void Connection::handle_message_received(util::Span<const char> data)
1095
{
81,802✔
1096
    // parse_message_received() parses the message and calls the proper handler
1097
    // on the Connection object (this).
1098
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,802✔
1099
}
81,802✔
1100

1101

1102
void Connection::initiate_disconnect_wait()
1103
{
4,780✔
1104
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,780✔
1105

1106
    if (m_disconnect_delay_in_progress) {
4,780✔
1107
        m_reconnect_disconnect_timer.reset();
2,214✔
1108
        m_disconnect_delay_in_progress = false;
2,214✔
1109
    }
2,214✔
1110

1111
    milliseconds_type time = m_client.m_connection_linger_time;
4,780✔
1112

1113
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,780✔
1114
        // If the operation is aborted, the connection object may have been
1115
        // destroyed.
1116
        if (status != ErrorCodes::OperationAborted)
4,780✔
1117
            handle_disconnect_wait(status); // Throws
12✔
1118
    });                                     // Throws
4,780✔
1119
    m_disconnect_delay_in_progress = true;
4,780✔
1120
}
4,780✔
1121

1122

1123
void Connection::handle_disconnect_wait(Status status)
1124
{
12✔
1125
    if (!status.is_ok()) {
12✔
1126
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1127
        throw Exception(status);
×
1128
    }
×
1129

1130
    m_disconnect_delay_in_progress = false;
12✔
1131

1132
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1133
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1134
        if (m_client.m_connection_linger_time > 0)
12✔
1135
            logger.detail("Linger time expired"); // Throws
×
1136
        voluntary_disconnect();                   // Throws
12✔
1137
        logger.info("Disconnected");              // Throws
12✔
1138
    }
12✔
1139
}
12✔
1140

1141

1142
void Connection::close_due_to_protocol_error(Status status)
1143
{
16✔
1144
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1145
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1146
    involuntary_disconnect(std::move(error_info),
16✔
1147
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1148
}
16✔
1149

1150

1151
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1152
{
482✔
1153
    logger.info("Connection closed due to error: %1", status); // Throws
482✔
1154

1155
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
482✔
1156
}
482✔
1157

1158

1159
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1160
{
600✔
1161
    logger.info("Connection closed due to transient error: %1", status); // Throws
600✔
1162
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
600✔
1163
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
600✔
1164

1165
    involuntary_disconnect(std::move(error_info), reason); // Throw
600✔
1166
}
600✔
1167

1168

1169
// Close connection due to error discovered on the server-side, and then
1170
// reported to the client by way of a connection-level ERROR message.
1171
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1172
{
66✔
1173
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1174
                int(error_code)); // Throws
66✔
1175

1176
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
66✔
1177
                                      : ConnectionTerminationReason::server_said_try_again_later;
66✔
1178
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1179
                           reason); // Throws
66✔
1180
}
66✔
1181

1182

1183
void Connection::disconnect(const SessionErrorInfo& info)
1184
{
3,854✔
1185
    // Cancel connect timeout watchdog
1186
    m_connect_timer.reset();
3,854✔
1187

1188
    if (m_state == ConnectionState::connected) {
3,854✔
1189
        m_disconnect_time = monotonic_clock_now();
3,652✔
1190
        m_disconnect_has_occurred = true;
3,652✔
1191

1192
        // Sessions that are in the Deactivating state at this time can be
1193
        // immediately discarded, in part because they are no longer enlisted to
1194
        // send. Such sessions will be taken to the Deactivated state by
1195
        // Session::connection_lost(), and then they will be removed from
1196
        // `m_sessions`.
1197
        auto i = m_sessions.begin(), end = m_sessions.end();
3,652✔
1198
        while (i != end) {
7,990✔
1199
            // Prevent invalidation of the main iterator when erasing elements
1200
            auto j = i++;
4,338✔
1201
            Session& sess = *j->second;
4,338✔
1202
            sess.connection_lost(); // Throws
4,338✔
1203
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,338✔
1204
                m_sessions.erase(j);
2,104✔
1205
        }
4,338✔
1206
    }
3,652✔
1207

1208
    change_state_to_disconnected();
3,854✔
1209

1210
    m_ping_delay_in_progress = false;
3,854✔
1211
    m_waiting_for_pong = false;
3,854✔
1212
    m_send_ping = false;
3,854✔
1213
    m_minimize_next_ping_delay = false;
3,854✔
1214
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,854✔
1215
    m_ping_sent = false;
3,854✔
1216
    m_heartbeat_timer.reset();
3,854✔
1217
    m_previous_ping_rtt = 0;
3,854✔
1218

1219
    m_websocket_sentinel->destroyed = true;
3,854✔
1220
    m_websocket_sentinel.reset();
3,854✔
1221
    m_websocket.reset();
3,854✔
1222
    m_input_body_buffer.reset();
3,854✔
1223
    m_sending_session = nullptr;
3,854✔
1224
    m_sessions_enlisted_to_send.clear();
3,854✔
1225
    m_sending = false;
3,854✔
1226

1227
    if (!m_appservices_coid.empty()) {
3,854✔
1228
        m_appservices_coid.clear();
3,600✔
1229
        logger.base_logger = make_logger(m_ident, std::nullopt, get_client().logger.base_logger);
3,600✔
1230
        for (auto& [ident, sess] : m_sessions) {
3,600✔
1231
            sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
2,148✔
1232
        }
2,148✔
1233
    }
3,600✔
1234

1235
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,854✔
1236
    initiate_reconnect_wait();                                           // Throws
3,854✔
1237
}
3,854✔
1238

1239
bool Connection::is_flx_sync_connection() const noexcept
1240
{
116,530✔
1241
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
116,530✔
1242
}
116,530✔
1243

1244
void Connection::receive_pong(milliseconds_type timestamp)
1245
{
166✔
1246
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
166✔
1247

1248
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
166✔
1249
    if (REALM_UNLIKELY(!legal_at_this_time)) {
166✔
1250
        close_due_to_protocol_error(
×
1251
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1252
        return;
×
1253
    }
×
1254

1255
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
166✔
1256
        close_due_to_protocol_error(
×
1257
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1258
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1259
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1260
        return;
×
1261
    }
×
1262

1263
    milliseconds_type now = monotonic_clock_now();
166✔
1264
    milliseconds_type round_trip_time = now - timestamp;
166✔
1265
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
166✔
1266
    m_previous_ping_rtt = round_trip_time;
166✔
1267

1268
    // If this PONG message is a response to a PING mesage that was sent after
1269
    // the last invocation of cancel_reconnect_delay(), then the connection is
1270
    // still good, and we do not have to skip the next reconnect delay.
1271
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
166✔
1272
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
122✔
1273
        m_ping_after_scheduled_reset_of_reconnect_info = false;
122✔
1274
        m_reconnect_info.scheduled_reset = false;
122✔
1275
    }
122✔
1276

1277
    m_heartbeat_timer.reset();
166✔
1278
    m_waiting_for_pong = false;
166✔
1279

1280
    initiate_ping_delay(now); // Throws
166✔
1281

1282
    if (m_client.m_roundtrip_time_handler)
166✔
1283
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1284
}
166✔
1285

1286
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1287
{
75,570✔
1288
    if (session_ident == 0) {
75,570✔
1289
        return nullptr;
×
1290
    }
×
1291

1292
    auto* sess = get_session(session_ident);
75,570✔
1293
    if (REALM_LIKELY(sess)) {
75,570✔
1294
        return sess;
75,564✔
1295
    }
75,564✔
1296
    // Check the history to see if the message received was for a previous session
1297
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
6✔
1298
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1299
        close_due_to_protocol_error(
×
1300
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1301
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1302
                          session_ident)});
×
1303
    }
×
1304
    else {
6✔
1305
        logger.error("Received %1 message for closed session, session_ident = %2", message,
6✔
1306
                     session_ident); // Throws
6✔
1307
    }
6✔
1308
    return nullptr;
6✔
1309
}
75,570✔
1310

1311
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1312
{
946✔
1313
    Session* sess = nullptr;
946✔
1314
    if (session_ident != 0) {
946✔
1315
        sess = find_and_validate_session(session_ident, "ERROR");
876✔
1316
        if (REALM_UNLIKELY(!sess)) {
876✔
1317
            return;
×
1318
        }
×
1319
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
876✔
1320
            close_due_to_protocol_error(std::move(status)); // Throws
×
1321
            return;
×
1322
        }
×
1323

1324
        if (sess->m_state == Session::Deactivated) {
876✔
1325
            finish_session_deactivation(sess);
12✔
1326
        }
12✔
1327
        return;
876✔
1328
    }
876✔
1329

1330
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1331
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1332
                info.server_requests_action); // Throws
70✔
1333

1334
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1335
    if (REALM_LIKELY(known_error_code)) {
70✔
1336
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1337
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1338
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1339
            return;
66✔
1340
        }
66✔
1341
        close_due_to_protocol_error(
×
1342
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1343
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1344
                          info.raw_error_code)});
×
1345
    }
×
1346
    else {
4✔
1347
        close_due_to_protocol_error(
4✔
1348
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1349
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1350
    }
4✔
1351
}
70✔
1352

1353

1354
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1355
                                             session_ident_type session_ident)
1356
{
20✔
1357
    if (session_ident == 0) {
20✔
1358
        return close_due_to_protocol_error(
×
1359
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1360
    }
×
1361

1362
    if (!is_flx_sync_connection()) {
20✔
1363
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1364
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1365
    }
×
1366

1367
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1368
    if (REALM_UNLIKELY(!sess)) {
20✔
1369
        return;
×
1370
    }
×
1371

1372
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1373
        close_due_to_protocol_error(std::move(status));
×
1374
    }
×
1375
}
20✔
1376

1377

1378
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1379
{
3,618✔
1380
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,618✔
1381
    if (REALM_UNLIKELY(!sess)) {
3,618✔
1382
        return;
×
1383
    }
×
1384

1385
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,618✔
1386
        close_due_to_protocol_error(std::move(status)); // Throws
×
1387
}
3,618✔
1388

1389
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1390
{
49,602✔
1391
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
49,602✔
1392
    if (REALM_UNLIKELY(!sess)) {
49,602✔
1393
        return;
×
1394
    }
×
1395

1396
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
49,602✔
UNCOV
1397
        close_due_to_protocol_error(std::move(status));
×
UNCOV
1398
    }
×
1399
}
49,602✔
1400

1401
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1402
{
17,020✔
1403
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,020✔
1404
    if (REALM_UNLIKELY(!sess)) {
17,020✔
1405
        return;
×
1406
    }
×
1407

1408
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,020✔
1409
        close_due_to_protocol_error(std::move(status)); // Throws
12✔
1410
}
17,020✔
1411

1412

1413
void Connection::receive_unbound_message(session_ident_type session_ident)
1414
{
4,374✔
1415
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,374✔
1416
    if (REALM_UNLIKELY(!sess)) {
4,374✔
1417
        return;
×
1418
    }
×
1419

1420
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,374✔
1421
        close_due_to_protocol_error(std::move(status)); // Throws
×
1422
        return;
×
1423
    }
×
1424

1425
    if (sess->m_state == Session::Deactivated) {
4,374✔
1426
        finish_session_deactivation(sess);
4,374✔
1427
    }
4,374✔
1428
}
4,374✔
1429

1430

1431
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1432
                                               std::string_view body)
1433
{
60✔
1434
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
60✔
1435
    if (REALM_UNLIKELY(!sess)) {
60✔
1436
        return;
×
1437
    }
×
1438

1439
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
60✔
1440
        close_due_to_protocol_error(std::move(status));
×
1441
    }
×
1442
}
60✔
1443

1444

1445
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1446
                                            std::string_view message)
1447
{
5,994✔
1448
    if (session_ident != 0) {
5,994✔
1449
        if (auto sess = get_session(session_ident)) {
3,978✔
1450
            sess->logger.log(LogCategory::session, level, "Server log: %1", message);
3,954✔
1451
            return;
3,954✔
1452
        }
3,954✔
1453

1454
        logger.log(util::LogCategory::session, level, "Server log for unknown session %1: %2", session_ident,
24✔
1455
                   message);
24✔
1456
        return;
24✔
1457
    }
3,978✔
1458

1459
    logger.log(level, "Server log: %1", message);
2,016✔
1460
}
2,016✔
1461

1462

1463
void Connection::receive_appservices_request_id(std::string_view coid)
1464
{
5,670✔
1465
    if (coid.empty() || !m_appservices_coid.empty()) {
5,670✔
1466
        return;
2,070✔
1467
    }
2,070✔
1468
    m_appservices_coid = coid;
3,600✔
1469
    logger.log(util::LogCategory::session, util::LogCategory::Level::info,
3,600✔
1470
               "Connected to app services with request id: \"%1\". Further log entries for this connection will be "
3,600✔
1471
               "prefixed with \"Connection[%2:%1]\" instead of \"Connection[%2]\"",
3,600✔
1472
               m_appservices_coid, m_ident);
3,600✔
1473
    logger.base_logger = make_logger(m_ident, m_appservices_coid, get_client().logger.base_logger);
3,600✔
1474

1475
    for (auto& [ident, sess] : m_sessions) {
4,740✔
1476
        sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
4,740✔
1477
    }
4,740✔
1478
}
3,600✔
1479

1480

1481
void Connection::handle_protocol_error(Status status)
1482
{
×
1483
    close_due_to_protocol_error(std::move(status));
×
1484
}
×
1485

1486

1487
// Sessions are guaranteed to be granted the opportunity to send a message in
1488
// the order that they enlist. Note that this is important to ensure
1489
// nonoverlapping communication with the server for consecutive sessions
1490
// associated with the same Realm file.
1491
//
1492
// CAUTION: The specified session may get destroyed before this function
1493
// returns, but only if its Session::send_message() puts it into the Deactivated
1494
// state.
1495
void Connection::enlist_to_send(Session* sess)
1496
{
179,630✔
1497
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
179,630✔
1498
    m_sessions_enlisted_to_send.push_back(sess); // Throws
179,630✔
1499
    if (!m_sending)
179,630✔
1500
        send_next_message(); // Throws
77,558✔
1501
}
179,630✔
1502

1503

1504
std::string Connection::get_active_appservices_connection_id()
1505
{
76✔
1506
    return m_appservices_coid;
76✔
1507
}
76✔
1508

1509
void Session::cancel_resumption_delay()
1510
{
4,322✔
1511
    REALM_ASSERT_EX(m_state == Active, m_state);
4,322✔
1512

1513
    if (!m_suspended)
4,322✔
1514
        return;
4,154✔
1515

1516
    m_suspended = false;
168✔
1517

1518
    logger.debug("Resumed"); // Throws
168✔
1519

1520
    if (unbind_process_complete())
168✔
1521
        initiate_rebind(); // Throws
126✔
1522

1523
    try {
168✔
1524
        process_pending_flx_bootstrap(); // throws
168✔
1525
    }
168✔
1526
    catch (const IntegrationException& error) {
168✔
1527
        on_integration_failure(error);
×
1528
    }
×
1529
    catch (...) {
168✔
1530
        on_integration_failure(IntegrationException(exception_to_status()));
×
1531
    }
×
1532

1533
    m_conn.one_more_active_unsuspended_session(); // Throws
168✔
1534
    if (m_try_again_activation_timer) {
168✔
1535
        m_try_again_activation_timer.reset();
8✔
1536
    }
8✔
1537

1538
    on_resumed(); // Throws
168✔
1539
}
168✔
1540

1541

1542
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1543
                                                 std::vector<ProtocolErrorInfo>* out)
1544
{
23,316✔
1545
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,316✔
1546
        return;
23,270✔
1547
    }
23,270✔
1548

1549
#ifdef REALM_DEBUG
46✔
1550
    REALM_ASSERT_DEBUG(
46✔
1551
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
46✔
1552
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
46✔
1553
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
46✔
1554
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
46✔
1555
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
46✔
1556
                       }));
46✔
1557
#endif
46✔
1558

1559
    while (!m_pending_compensating_write_errors.empty() &&
94✔
1560
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
94✔
1561
               changesets.back().version) {
48✔
1562
        auto& cur_error = m_pending_compensating_write_errors.front();
48✔
1563
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
48✔
1564
        out->push_back(std::move(cur_error));
48✔
1565
        m_pending_compensating_write_errors.pop_front();
48✔
1566
    }
48✔
1567
}
46✔
1568

1569

1570
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1571
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1572
                                   DownloadBatchState download_batch_state)
1573
{
45,024✔
1574
    auto& history = get_history();
45,024✔
1575
    if (received_changesets.empty()) {
45,024✔
1576
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,684✔
1577
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1578
                                       "received empty download message that was not the last in batch",
×
1579
                                       ProtocolError::bad_progress);
×
1580
        }
×
1581
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,684✔
1582
        return;
21,684✔
1583
    }
21,684✔
1584

1585
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,340✔
1586
    auto transact = get_db()->start_read();
23,340✔
1587
    history.integrate_server_changesets(
23,340✔
1588
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,340✔
1589
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
23,340✔
1590
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,318✔
1591
        }); // Throws
23,318✔
1592
    if (received_changesets.size() == 1) {
23,340✔
1593
        logger.debug("1 remote changeset integrated, producing client version %1",
15,650✔
1594
                     version_info.sync_version.version); // Throws
15,650✔
1595
    }
15,650✔
1596
    else {
7,690✔
1597
        logger.debug("%2 remote changesets integrated, producing client version %1",
7,690✔
1598
                     version_info.sync_version.version, received_changesets.size()); // Throws
7,690✔
1599
    }
7,690✔
1600

1601
    for (const auto& pending_error : pending_compensating_write_errors) {
23,340✔
1602
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
48✔
1603
                    pending_error.compensating_write_rejected_client_version,
48✔
1604
                    *pending_error.compensating_write_server_version, pending_error.message);
48✔
1605
        try {
48✔
1606
            on_connection_state_changed(
48✔
1607
                m_conn.get_state(),
48✔
1608
                SessionErrorInfo{pending_error,
48✔
1609
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
48✔
1610
                                                          pending_error.message)});
48✔
1611
        }
48✔
1612
        catch (...) {
48✔
1613
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1614
        }
×
1615
    }
48✔
1616
}
23,340✔
1617

1618

1619
void Session::on_integration_failure(const IntegrationException& error)
1620
{
40✔
1621
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1622
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1623
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1624

1625
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1626
    m_error_to_send = true;
40✔
1627
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1628
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1629
    // Surface the error to the user otherwise is lost.
1630
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1631

1632
    // Since the deactivation process has not been initiated, the UNBIND
1633
    // message cannot have been sent unless an ERROR message was received.
1634
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1635
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1636
        ensure_enlisted_to_send(); // Throws
36✔
1637
    }
36✔
1638
}
40✔
1639

1640
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1641
{
47,386✔
1642
    REALM_ASSERT_EX(m_state == Active, m_state);
47,386✔
1643
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,386✔
1644

1645
    m_download_progress = progress.download;
47,386✔
1646
    m_progress = progress;
47,386✔
1647

1648
    if (progress.upload.client_version > m_upload_progress.client_version)
47,386✔
1649
        m_upload_progress = progress.upload;
526✔
1650

1651
    do_recognize_sync_version(client_version); // Allows upload process to resume
47,386✔
1652

1653
    check_for_download_completion(); // Throws
47,386✔
1654

1655
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1656
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,386✔
1657
        auto& flx_subscription_store = *get_flx_subscription_store();
3,912✔
1658
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,912✔
1659
    }
3,912✔
1660

1661
    // Since the deactivation process has not been initiated, the UNBIND
1662
    // message cannot have been sent unless an ERROR message was received.
1663
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,386✔
1664
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,386✔
1665
        ensure_enlisted_to_send(); // Throws
47,378✔
1666
    }
47,378✔
1667
}
47,386✔
1668

1669

1670
Session::~Session()
1671
{
10,408✔
1672
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1673
}
10,408✔
1674

1675

1676
std::shared_ptr<util::Logger> Session::make_logger(session_ident_type ident,
1677
                                                   std::shared_ptr<util::Logger> base_logger)
1678
{
17,286✔
1679
    auto prefix = util::format("Session[%1]: ", ident);
17,286✔
1680
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix),
17,286✔
1681
                                                std::move(base_logger));
17,286✔
1682
}
17,286✔
1683

1684
void Session::activate()
1685
{
10,408✔
1686
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,408✔
1687

1688
    logger.debug("Activating"); // Throws
10,408✔
1689

1690
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,408✔
1691
        bool file_exists = util::File::exists(get_realm_path());
10,408✔
1692

1693
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,408✔
1694
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,408✔
1695
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,408✔
1696
    }
10,408✔
1697
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,408✔
1698
                 m_client_file_ident.salt); // Throws
10,408✔
1699
    m_upload_progress = m_progress.upload;
10,408✔
1700
    m_download_progress = m_progress.download;
10,408✔
1701
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,408✔
1702
    init_progress_handler();
10,408✔
1703

1704
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,408✔
1705
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,408✔
1706
    logger.debug("progress_download_client_version = %1",
10,408✔
1707
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,408✔
1708
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,408✔
1709
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,408✔
1710

1711
    reset_protocol_state();
10,408✔
1712
    m_state = Active;
10,408✔
1713

1714
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,408✔
1715

1716
    REALM_ASSERT(!m_suspended);
10,408✔
1717
    m_conn.one_more_active_unsuspended_session(); // Throws
10,408✔
1718

1719
    try {
10,408✔
1720
        process_pending_flx_bootstrap(); // throws
10,408✔
1721
    }
10,408✔
1722
    catch (const IntegrationException& error) {
10,408✔
1723
        on_integration_failure(error);
×
1724
    }
×
1725
    catch (...) {
10,408✔
1726
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1727
    }
4✔
1728

1729
    // Checks if there is a pending client reset
1730
    handle_pending_client_reset_acknowledgement();
10,408✔
1731
}
10,408✔
1732

1733

1734
// The caller (Connection) must discard the session if the session has become
1735
// deactivated upon return.
1736
void Session::initiate_deactivation()
1737
{
10,408✔
1738
    REALM_ASSERT_EX(m_state == Active, m_state);
10,408✔
1739

1740
    logger.debug("Initiating deactivation"); // Throws
10,408✔
1741

1742
    m_state = Deactivating;
10,408✔
1743

1744
    if (!m_suspended)
10,408✔
1745
        m_conn.one_less_active_unsuspended_session(); // Throws
9,744✔
1746

1747
    if (m_enlisted_to_send) {
10,408✔
1748
        REALM_ASSERT(!unbind_process_complete());
5,500✔
1749
        return;
5,500✔
1750
    }
5,500✔
1751

1752
    // Deactivate immediately if the BIND message has not yet been sent and the
1753
    // session is not enlisted to send, or if the unbinding process has already
1754
    // completed.
1755
    if (!m_bind_message_sent || unbind_process_complete()) {
4,908✔
1756
        complete_deactivation(); // Throws
952✔
1757
        // Life cycle state is now Deactivated
1758
        return;
952✔
1759
    }
952✔
1760

1761
    // Ready to send the UNBIND message, if it has not already been sent
1762
    if (!m_unbind_message_sent) {
3,956✔
1763
        enlist_to_send(); // Throws
3,742✔
1764
        return;
3,742✔
1765
    }
3,742✔
1766
}
3,956✔
1767

1768

1769
void Session::complete_deactivation()
1770
{
10,408✔
1771
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,408✔
1772
    m_state = Deactivated;
10,408✔
1773

1774
    logger.debug("Deactivation completed"); // Throws
10,408✔
1775
}
10,408✔
1776

1777

1778
// Called by the associated Connection object when this session is granted an
1779
// opportunity to send a message.
1780
//
1781
// The caller (Connection) must discard the session if the session has become
1782
// deactivated upon return.
1783
void Session::send_message()
1784
{
178,028✔
1785
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
178,028✔
1786
    REALM_ASSERT(m_enlisted_to_send);
178,028✔
1787
    m_enlisted_to_send = false;
178,028✔
1788
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
178,028✔
1789
        // Deactivation has been initiated. If the UNBIND message has not been
1790
        // sent yet, there is no point in sending it. Instead, we can let the
1791
        // deactivation process complete.
1792
        if (!m_bind_message_sent) {
9,684✔
1793
            return complete_deactivation(); // Throws
2,834✔
1794
            // Life cycle state is now Deactivated
1795
        }
2,834✔
1796

1797
        // Session life cycle state is Deactivating or the unbinding process has
1798
        // been initiated by a session specific ERROR message
1799
        if (!m_unbind_message_sent)
6,850✔
1800
            send_unbind_message(); // Throws
6,850✔
1801
        return;
6,850✔
1802
    }
9,684✔
1803

1804
    // Session life cycle state is Active and the unbinding process has
1805
    // not been initiated
1806
    REALM_ASSERT(!m_unbind_message_sent);
168,344✔
1807

1808
    if (!m_bind_message_sent)
168,344✔
1809
        return send_bind_message(); // Throws
9,364✔
1810

1811
    // Pending test commands can be sent any time after the BIND message is sent
1812
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
158,980✔
1813
                                                      [](const PendingTestCommand& command) {
158,980✔
1814
                                                          return command.pending;
152✔
1815
                                                      });
152✔
1816
    if (has_pending_test_command) {
158,980✔
1817
        return send_test_command_message();
60✔
1818
    }
60✔
1819

1820
    if (!m_ident_message_sent) {
158,920✔
1821
        if (have_client_file_ident())
7,844✔
1822
            send_ident_message(); // Throws
7,844✔
1823
        return;
7,844✔
1824
    }
7,844✔
1825

1826
    if (m_error_to_send)
151,076✔
1827
        return send_json_error_message(); // Throws
34✔
1828

1829
    // Stop sending upload, mark and query messages when the client detects an error.
1830
    if (m_client_error) {
151,042✔
1831
        return;
12✔
1832
    }
12✔
1833

1834
    if (m_target_download_mark > m_last_download_mark_sent)
151,030✔
1835
        return send_mark_message(); // Throws
17,850✔
1836

1837
    auto is_upload_allowed = [&]() -> bool {
133,184✔
1838
        if (!m_is_flx_sync_session) {
133,184✔
1839
            return true;
110,112✔
1840
        }
110,112✔
1841

1842
        auto migration_store = get_migration_store();
23,072✔
1843
        if (!migration_store) {
23,072✔
1844
            return true;
×
1845
        }
×
1846

1847
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
23,072✔
1848
        if (!sentinel_query_version) {
23,072✔
1849
            return true;
23,044✔
1850
        }
23,044✔
1851

1852
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1853
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1854
    };
23,072✔
1855

1856
    if (!is_upload_allowed()) {
133,180✔
1857
        return;
16✔
1858
    }
16✔
1859

1860
    auto check_pending_flx_version = [&]() -> bool {
133,168✔
1861
        if (!m_is_flx_sync_session) {
133,168✔
1862
            return false;
110,114✔
1863
        }
110,114✔
1864

1865
        if (m_delay_uploads) {
23,054✔
1866
            return false;
3,784✔
1867
        }
3,784✔
1868

1869
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
19,270✔
1870

1871
        if (!m_pending_flx_sub_set) {
19,270✔
1872
            return false;
16,786✔
1873
        }
16,786✔
1874

1875
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1876
        // of a pending subscription
1877
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,484✔
1878
    };
19,270✔
1879

1880
    if (check_pending_flx_version()) {
133,164✔
1881
        return send_query_change_message(); // throws
1,376✔
1882
    }
1,376✔
1883

1884
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
131,788✔
1885
        return send_upload_message(); // Throws
62,576✔
1886
    }
62,576✔
1887
}
131,788✔
1888

1889

1890
void Session::send_bind_message()
1891
{
9,364✔
1892
    REALM_ASSERT_EX(m_state == Active, m_state);
9,364✔
1893

1894
    session_ident_type session_ident = m_ident;
9,364✔
1895
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1896
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1897
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1898
    // to the server.
1899
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
9,364✔
1900
    const bool is_subserver = false;
9,364✔
1901

1902
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,364✔
1903
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,364✔
1904
    OutputBuffer& out = m_conn.get_output_buffer();
9,364✔
1905
    // Discard the token since it's ignored by the server.
1906
    std::string empty_access_token;
9,364✔
1907
    if (m_is_flx_sync_session) {
9,364✔
1908
        nlohmann::json bind_json_data;
1,882✔
1909
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,882✔
1910
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1911
        }
60✔
1912
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,882✔
1913
        auto schema_version = get_schema_version();
1,882✔
1914
        // Send 0 if schema is not versioned.
1915
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,882✔
1916
        if (logger.would_log(util::Logger::Level::debug)) {
1,882✔
1917
            std::string json_data_dump;
1,882✔
1918
            if (!bind_json_data.empty()) {
1,882✔
1919
                json_data_dump = bind_json_data.dump();
1,882✔
1920
            }
1,882✔
1921
            logger.debug(
1,882✔
1922
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,882✔
1923
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,882✔
1924
        }
1,882✔
1925
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,882✔
1926
                                       need_client_file_ident, is_subserver); // Throws
1,882✔
1927
    }
1,882✔
1928
    else {
7,482✔
1929
        std::string server_path = get_virt_path();
7,482✔
1930
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,482✔
1931
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,482✔
1932
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,482✔
1933
                                       need_client_file_ident, is_subserver); // Throws
7,482✔
1934
    }
7,482✔
1935
    m_conn.initiate_write_message(out, this); // Throws
9,364✔
1936

1937
    m_bind_message_sent = true;
9,364✔
1938
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
9,364✔
1939

1940
    // If there is a pending client reset diff, process that when the BIND message has
1941
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1942
    // ready to send the IDENT message if the file identifier pair is already available.
1943
    if (!need_client_file_ident)
9,364✔
1944
        enlist_to_send(); // Throws
5,558✔
1945
}
9,364✔
1946

1947

1948
void Session::send_ident_message()
1949
{
7,844✔
1950
    REALM_ASSERT_EX(m_state == Active, m_state);
7,844✔
1951
    REALM_ASSERT(m_bind_message_sent);
7,844✔
1952
    REALM_ASSERT(!m_unbind_message_sent);
7,844✔
1953
    REALM_ASSERT(have_client_file_ident());
7,844✔
1954

1955
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,844✔
1956
    OutputBuffer& out = m_conn.get_output_buffer();
7,844✔
1957
    session_ident_type session_ident = m_ident;
7,844✔
1958

1959
    if (m_is_flx_sync_session) {
7,844✔
1960
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,786✔
1961
        const auto active_query_body = active_query_set.to_ext_json();
1,786✔
1962
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,786✔
1963
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,786✔
1964
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,786✔
1965
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,786✔
1966
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,786✔
1967
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,786✔
1968
                     active_query_body); // Throws
1,786✔
1969
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,786✔
1970
                                        active_query_set.version(), active_query_body); // Throws
1,786✔
1971
        m_last_sent_flx_query_version = active_query_set.version();
1,786✔
1972
    }
1,786✔
1973
    else {
6,058✔
1974
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,058✔
1975
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,058✔
1976
                     "latest_server_version_salt=%6)",
6,058✔
1977
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,058✔
1978
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,058✔
1979
                     m_progress.latest_server_version.salt);                                  // Throws
6,058✔
1980
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
6,058✔
1981
    }
6,058✔
1982
    m_conn.initiate_write_message(out, this); // Throws
7,844✔
1983

1984
    m_ident_message_sent = true;
7,844✔
1985
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,844✔
1986

1987
    // Other messages may be waiting to be sent
1988
    enlist_to_send(); // Throws
7,844✔
1989
}
7,844✔
1990

1991
void Session::send_query_change_message()
1992
{
1,376✔
1993
    REALM_ASSERT_EX(m_state == Active, m_state);
1,376✔
1994
    REALM_ASSERT(m_ident_message_sent);
1,376✔
1995
    REALM_ASSERT(!m_unbind_message_sent);
1,376✔
1996
    REALM_ASSERT(m_pending_flx_sub_set);
1,376✔
1997
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,376✔
1998

1999
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,376✔
2000
        return;
×
2001
    }
×
2002

2003
    auto sub_store = get_flx_subscription_store();
1,376✔
2004
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,376✔
2005
    auto latest_queries = latest_sub_set.to_ext_json();
1,376✔
2006
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,376✔
2007
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,376✔
2008

2009
    OutputBuffer& out = m_conn.get_output_buffer();
1,376✔
2010
    session_ident_type session_ident = get_ident();
1,376✔
2011
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,376✔
2012
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,376✔
2013
    m_conn.initiate_write_message(out, this);
1,376✔
2014

2015
    m_last_sent_flx_query_version = latest_sub_set.version();
1,376✔
2016

2017
    request_download_completion_notification();
1,376✔
2018
}
1,376✔
2019

2020
void Session::send_upload_message()
2021
{
62,572✔
2022
    REALM_ASSERT_EX(m_state == Active, m_state);
62,572✔
2023
    REALM_ASSERT(m_ident_message_sent);
62,572✔
2024
    REALM_ASSERT(!m_unbind_message_sent);
62,572✔
2025

2026
    if (REALM_UNLIKELY(get_client().is_dry_run()))
62,572✔
2027
        return;
×
2028

2029
    version_type target_upload_version = m_last_version_available;
62,572✔
2030
    if (m_pending_flx_sub_set) {
62,572✔
2031
        REALM_ASSERT(m_is_flx_sync_session);
1,110✔
2032
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,110✔
2033
    }
1,110✔
2034

2035
    bool server_version_to_ack =
62,572✔
2036
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
62,572✔
2037

2038
    std::vector<UploadChangeset> uploadable_changesets;
62,572✔
2039
    version_type locked_server_version = 0;
62,572✔
2040
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
62,572✔
2041
                                             locked_server_version); // Throws
62,572✔
2042

2043
    if (uploadable_changesets.empty()) {
62,572✔
2044
        // Nothing more to upload right now if:
2045
        //  1. We need to limit upload up to some version other than the last client version
2046
        //     available and there are no changes to upload
2047
        //  2. There are no changes to upload and no server version(s) to acknowledge
2048
        if (m_pending_flx_sub_set || !server_version_to_ack) {
32,776✔
2049
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
6,378✔
2050
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
6,378✔
2051
            // Other messages may be waiting to be sent
2052
            return enlist_to_send(); // Throws
6,378✔
2053
        }
6,378✔
2054
    }
32,776✔
2055

2056
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
56,194✔
2057
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
744✔
2058
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
744✔
2059
    }
744✔
2060

2061
    version_type progress_client_version = m_upload_progress.client_version;
56,194✔
2062
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
56,194✔
2063

2064
    if (!upload_messages_allowed()) {
56,194✔
2065
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
574✔
2066
                     "locked_server_version=%3, num_changesets=%4)",
574✔
2067
                     progress_client_version, progress_server_version, locked_server_version,
574✔
2068
                     uploadable_changesets.size()); // Throws
574✔
2069
        // Other messages may be waiting to be sent
2070
        return enlist_to_send(); // Throws
574✔
2071
    }
574✔
2072

2073
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,620✔
2074
                 "locked_server_version=%3, num_changesets=%4)",
55,620✔
2075
                 progress_client_version, progress_server_version, locked_server_version,
55,620✔
2076
                 uploadable_changesets.size()); // Throws
55,620✔
2077

2078
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,620✔
2079
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,620✔
2080

2081
    for (const UploadChangeset& uc : uploadable_changesets) {
55,620✔
2082
        logger.debug(util::LogCategory::changeset,
43,204✔
2083
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,204✔
2084
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,204✔
2085
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,204✔
2086
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,204✔
2087
        if (logger.would_log(util::Logger::Level::trace)) {
43,204✔
2088
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2089
            if (changeset_data.size() < 1024) {
×
2090
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2091
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2092
            }
×
2093
            else {
×
2094
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2095
                             protocol.compressed_hex_dump(changeset_data));
×
2096
            }
×
2097

2098
#if REALM_DEBUG
×
2099
            ChunkedBinaryInputStream in{changeset_data};
×
2100
            Changeset log;
×
2101
            try {
×
2102
                parse_changeset(in, log);
×
2103
                std::stringstream ss;
×
2104
                log.print(ss);
×
2105
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2106
            }
×
2107
            catch (const BadChangesetError& err) {
×
2108
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2109
            }
×
2110
#endif
×
2111
        }
×
2112

2113
        {
43,204✔
2114
            upload_message_builder.add_changeset(uc.progress.client_version,
43,204✔
2115
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,204✔
2116
                                                 uc.origin_file_ident,
43,204✔
2117
                                                 uc.changeset); // Throws
43,204✔
2118
        }
43,204✔
2119
    }
43,204✔
2120

2121
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,620✔
2122
    OutputBuffer& out = m_conn.get_output_buffer();
55,620✔
2123
    session_ident_type session_ident = get_ident();
55,620✔
2124
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,620✔
2125
                                               progress_server_version,
55,620✔
2126
                                               locked_server_version); // Throws
55,620✔
2127
    m_conn.initiate_write_message(out, this);                          // Throws
55,620✔
2128

2129
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
55,620✔
2130

2131
    // Other messages may be waiting to be sent
2132
    enlist_to_send(); // Throws
55,620✔
2133
}
55,620✔
2134

2135

2136
void Session::send_mark_message()
2137
{
17,850✔
2138
    REALM_ASSERT_EX(m_state == Active, m_state);
17,850✔
2139
    REALM_ASSERT(m_ident_message_sent);
17,850✔
2140
    REALM_ASSERT(!m_unbind_message_sent);
17,850✔
2141
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,850✔
2142

2143
    request_ident_type request_ident = m_target_download_mark;
17,850✔
2144
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,850✔
2145

2146
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,850✔
2147
    OutputBuffer& out = m_conn.get_output_buffer();
17,850✔
2148
    session_ident_type session_ident = get_ident();
17,850✔
2149
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,850✔
2150
    m_conn.initiate_write_message(out, this);                      // Throws
17,850✔
2151

2152
    m_last_download_mark_sent = request_ident;
17,850✔
2153

2154
    // Other messages may be waiting to be sent
2155
    enlist_to_send(); // Throws
17,850✔
2156
}
17,850✔
2157

2158

2159
void Session::send_unbind_message()
2160
{
6,850✔
2161
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,850✔
2162
    REALM_ASSERT(m_bind_message_sent);
6,850✔
2163
    REALM_ASSERT(!m_unbind_message_sent);
6,850✔
2164

2165
    logger.debug("Sending: UNBIND"); // Throws
6,850✔
2166

2167
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,850✔
2168
    OutputBuffer& out = m_conn.get_output_buffer();
6,850✔
2169
    session_ident_type session_ident = get_ident();
6,850✔
2170
    protocol.make_unbind_message(out, session_ident); // Throws
6,850✔
2171
    m_conn.initiate_write_message(out, this);         // Throws
6,850✔
2172

2173
    m_unbind_message_sent = true;
6,850✔
2174
}
6,850✔
2175

2176

2177
void Session::send_json_error_message()
2178
{
34✔
2179
    REALM_ASSERT_EX(m_state == Active, m_state);
34✔
2180
    REALM_ASSERT(m_ident_message_sent);
34✔
2181
    REALM_ASSERT(!m_unbind_message_sent);
34✔
2182
    REALM_ASSERT(m_error_to_send);
34✔
2183
    REALM_ASSERT(m_client_error);
34✔
2184

2185
    ClientProtocol& protocol = m_conn.get_client_protocol();
34✔
2186
    OutputBuffer& out = m_conn.get_output_buffer();
34✔
2187
    session_ident_type session_ident = get_ident();
34✔
2188
    auto protocol_error = m_client_error->error_for_server;
34✔
2189

2190
    auto message = util::format("%1", m_client_error->to_status());
34✔
2191
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
34✔
2192
                session_ident); // Throws
34✔
2193

2194
    nlohmann::json error_body_json;
34✔
2195
    error_body_json["message"] = std::move(message);
34✔
2196
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
34✔
2197
                                     error_body_json.dump()); // Throws
34✔
2198
    m_conn.initiate_write_message(out, this);                 // Throws
34✔
2199

2200
    m_error_to_send = false;
34✔
2201
    enlist_to_send(); // Throws
34✔
2202
}
34✔
2203

2204

2205
void Session::send_test_command_message()
2206
{
60✔
2207
    REALM_ASSERT_EX(m_state == Active, m_state);
60✔
2208

2209
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2210
                           [](const PendingTestCommand& command) {
64✔
2211
                               return command.pending;
64✔
2212
                           });
64✔
2213
    REALM_ASSERT(it != m_pending_test_commands.end());
60✔
2214

2215
    ClientProtocol& protocol = m_conn.get_client_protocol();
60✔
2216
    OutputBuffer& out = m_conn.get_output_buffer();
60✔
2217
    auto session_ident = get_ident();
60✔
2218

2219
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
60✔
2220
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
60✔
2221

2222
    m_conn.initiate_write_message(out, this); // Throws;
60✔
2223
    it->pending = false;
60✔
2224

2225
    enlist_to_send();
60✔
2226
}
60✔
2227

2228
bool Session::client_reset_if_needed()
2229
{
424✔
2230
    // Even if we end up not actually performing a client reset, consume the
2231
    // config to ensure that the resources it holds are released
2232
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2233
    if (!client_reset_config) {
424✔
2234
        return false;
×
2235
    }
×
2236

2237
    // Save a copy of the status and action in case an error/exception occurs
2238
    Status cr_status = client_reset_config->error;
424✔
2239
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2240

2241
    auto on_flx_version_complete = [this](int64_t version) {
424✔
2242
        this->on_flx_sync_version_complete(version);
348✔
2243
    };
348✔
2244
    try {
424✔
2245
        // The file ident from the fresh realm will be copied over to the local realm
2246
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2247
                                                            get_flx_subscription_store(), on_flx_version_complete);
424✔
2248

2249
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2250
        if (!did_reset) {
424✔
2251
            return false;
×
2252
        }
×
2253
    }
424✔
2254
    catch (const std::exception& e) {
424✔
2255
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2256
                                    cr_status, e.what());
80✔
2257
        logger.error(err_msg.c_str());
80✔
2258
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2259
        suspend(err_info);
80✔
2260
        return false;
80✔
2261
    }
80✔
2262

2263
    // The fresh Realm has been used to reset the state
2264
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2265

2266
    // Update the version, file ident and progress info after the client reset diff is done
2267
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2268
    // Print the version/progress information before performing the asserts
2269
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2270
                 m_client_file_ident.salt);                                // Throws
344✔
2271
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2272
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2273
                 m_progress.upload.client_version,
344✔
2274
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2275
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2276
                 m_progress.download.last_integrated_client_version,
344✔
2277
                 m_progress.download.server_version); // Throws
344✔
2278

2279
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2280
                    m_progress.download.last_integrated_client_version);
344✔
2281
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2282

2283
    m_upload_progress = m_progress.upload;
344✔
2284
    m_download_progress = m_progress.download;
344✔
2285
    init_progress_handler();
344✔
2286
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2287
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2288
    // For both, we want to allow uploads again without needing external changes to download first.
2289
    m_delay_uploads = false;
344✔
2290

2291
    // Checks if there is a pending client reset
2292
    handle_pending_client_reset_acknowledgement();
344✔
2293

2294
    update_subscription_version_info();
344✔
2295

2296
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2297
    if (auto migration_store = get_migration_store()) {
344✔
2298
        migration_store->complete_migration_or_rollback();
316✔
2299
    }
316✔
2300

2301
    return true;
344✔
2302
}
424✔
2303

2304
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2305
{
3,618✔
2306
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,618✔
2307
                 client_file_ident.salt); // Throws
3,618✔
2308

2309
    // Ignore the message if the deactivation process has been initiated,
2310
    // because in that case, the associated Realm and SessionWrapper must
2311
    // not be accessed any longer.
2312
    if (m_state != Active)
3,618✔
2313
        return Status::OK(); // Success
74✔
2314

2315
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,544✔
2316
                               !m_unbound_message_received);
3,544✔
2317
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,544✔
2318
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2319
    }
×
2320
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,544✔
2321
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2322
    }
×
2323
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,544✔
2324
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2325
    }
×
2326

2327
    m_client_file_ident = client_file_ident;
3,544✔
2328

2329
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,544✔
2330
        // Ready to send the IDENT message
2331
        ensure_enlisted_to_send(); // Throws
×
2332
        return Status::OK();       // Success
×
2333
    }
×
2334

2335
    get_history().set_client_file_ident(client_file_ident,
3,544✔
2336
                                        m_fix_up_object_ids); // Throws
3,544✔
2337
    m_progress.download.last_integrated_client_version = 0;
3,544✔
2338
    m_progress.upload.client_version = 0;
3,544✔
2339

2340
    // Ready to send the IDENT message
2341
    ensure_enlisted_to_send(); // Throws
3,544✔
2342
    return Status::OK();       // Success
3,544✔
2343
}
3,544✔
2344

2345
Status Session::receive_download_message(const DownloadMessage& message)
2346
{
49,602✔
2347
    // Ignore the message if the deactivation process has been initiated,
2348
    // because in that case, the associated Realm and SessionWrapper must
2349
    // not be accessed any longer.
2350
    if (m_state != Active)
49,602✔
2351
        return Status::OK();
602✔
2352

2353
    bool is_flx = m_conn.is_flx_sync_connection();
49,000✔
2354
    int64_t query_version = is_flx ? *message.query_version : 0;
49,000✔
2355

2356
    if (!is_flx || query_version > 0)
49,000✔
2357
        enable_progress_notifications();
47,056✔
2358

2359
    auto&& progress = message.progress;
49,000✔
2360
    if (is_flx) {
49,000✔
2361
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,500✔
2362
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,500✔
2363
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,500✔
2364
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,500✔
2365
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,500✔
2366
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,500✔
2367
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,500✔
2368
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,500✔
2369
                     message.changesets.size()); // Throws
5,500✔
2370
    }
5,500✔
2371
    else {
43,500✔
2372
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,500✔
2373
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,500✔
2374
                     "upload_client_version=%5, upload_server_version=%6, "
43,500✔
2375
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,500✔
2376
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,500✔
2377
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,500✔
2378
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,500✔
2379
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,500✔
2380
    }
43,500✔
2381

2382
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2383
    // changeset over and over again.
2384
    if (m_client_error) {
49,000✔
2385
        logger.debug("Ignoring download message because the client detected an integration error");
×
2386
        return Status::OK();
×
2387
    }
×
2388

2389
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
49,002✔
2390
    if (REALM_UNLIKELY(!legal_at_this_time)) {
49,000✔
UNCOV
2391
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
UNCOV
2392
    }
×
2393
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
49,000✔
2394
        logger.error("Bad sync progress received (%1)", status);
×
2395
        return status;
×
2396
    }
×
2397

2398
    version_type server_version = m_progress.download.server_version;
49,000✔
2399
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
49,000✔
2400
    for (const RemoteChangeset& changeset : message.changesets) {
50,266✔
2401
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2402
        // version must be increasing, but can stay the same during bootstraps.
2403
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
47,840✔
2404
                                                         : (changeset.remote_version > server_version);
47,840✔
2405
        // Each server version cannot be greater than the one in the header of the download message.
2406
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
47,840✔
2407
        if (!good_server_version) {
47,840✔
2408
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2409
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2410
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2411
        }
×
2412
        server_version = changeset.remote_version;
47,840✔
2413

2414
        // Check that per-changeset last integrated client version is "weakly"
2415
        // increasing.
2416
        bool good_client_version =
47,840✔
2417
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
47,840✔
2418
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
47,842✔
2419
        if (!good_client_version) {
47,840✔
2420
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2421
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2422
                                 "(%1, %2, %3)",
×
2423
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2424
                                 progress.download.last_integrated_client_version)};
×
2425
        }
×
2426
        last_integrated_client_version = changeset.last_integrated_local_version;
47,840✔
2427
        // Server shouldn't send our own changes, and zero is not a valid client
2428
        // file identifier.
2429
        bool good_file_ident =
47,840✔
2430
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
47,846✔
2431
        if (!good_file_ident) {
47,840✔
2432
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2433
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2434
                                 changeset.origin_file_ident)};
×
2435
        }
×
2436
    }
47,840✔
2437

2438
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
49,000✔
2439
                                       message.batch_state, message.changesets.size());
49,000✔
2440
    if (hook_action == SyncClientHookAction::EarlyReturn) {
49,000✔
2441
        return Status::OK();
16✔
2442
    }
16✔
2443
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
48,984✔
2444

2445
    if (process_flx_bootstrap_message(message)) {
48,984✔
2446
        clear_resumption_delay_state();
3,960✔
2447
        return Status::OK();
3,960✔
2448
    }
3,960✔
2449

2450
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
45,024✔
2451
                                  message.changesets); // Throws
45,024✔
2452

2453
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
45,024✔
2454
                                  message.batch_state, message.changesets.size());
45,024✔
2455
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,024✔
2456
        return Status::OK();
×
2457
    }
×
2458
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,024✔
2459

2460
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2461
    // after a retryable session error.
2462
    clear_resumption_delay_state();
45,024✔
2463
    return Status::OK();
45,024✔
2464
}
45,024✔
2465

2466
Status Session::receive_mark_message(request_ident_type request_ident)
2467
{
17,020✔
2468
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,020✔
2469

2470
    // Ignore the message if the deactivation process has been initiated,
2471
    // because in that case, the associated Realm and SessionWrapper must
2472
    // not be accessed any longer.
2473
    if (m_state != Active)
17,020✔
2474
        return Status::OK(); // Success
56✔
2475

2476
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,964✔
2477
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,964✔
2478
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
12✔
2479
    }
12✔
2480
    bool good_request_ident =
16,952✔
2481
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,952✔
2482
    if (REALM_UNLIKELY(!good_request_ident)) {
16,952✔
2483
        return {
×
2484
            ErrorCodes::SyncProtocolInvariantFailed,
×
2485
            util::format(
×
2486
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2487
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2488
    }
×
2489

2490
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,952✔
2491
    m_last_download_mark_received = request_ident;
16,952✔
2492
    check_for_download_completion(); // Throws
16,952✔
2493

2494
    return Status::OK(); // Success
16,952✔
2495
}
16,952✔
2496

2497

2498
// The caller (Connection) must discard the session if the session has become
2499
// deactivated upon return.
2500
Status Session::receive_unbound_message()
2501
{
4,374✔
2502
    logger.debug("Received: UNBOUND");
4,374✔
2503

2504
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,374✔
2505
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,374✔
2506
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2507
    }
×
2508

2509
    // The fact that the UNBIND message has been sent, but an ERROR message has
2510
    // not been received, implies that the deactivation process must have been
2511
    // initiated, so this session must be in the Deactivating state or the session
2512
    // has been suspended because of a client side error.
2513
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,374!
2514

2515
    m_unbound_message_received = true;
4,374✔
2516

2517
    // Detect completion of the unbinding process
2518
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,374✔
2519
        // The deactivation process completes when the unbinding process
2520
        // completes.
2521
        complete_deactivation(); // Throws
4,374✔
2522
        // Life cycle state is now Deactivated
2523
    }
4,374✔
2524

2525
    return Status::OK(); // Success
4,374✔
2526
}
4,374✔
2527

2528

2529
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2530
{
20✔
2531
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2532
    // Ignore the message if the deactivation process has been initiated,
2533
    // because in that case, the associated Realm and SessionWrapper must
2534
    // not be accessed any longer.
2535
    if (m_state == Active) {
20✔
2536
        on_flx_sync_error(query_version, message); // throws
20✔
2537
    }
20✔
2538
    return Status::OK();
20✔
2539
}
20✔
2540

2541
// The caller (Connection) must discard the session if the session has become
2542
// deactivated upon return.
2543
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2544
{
888✔
2545
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
888✔
2546
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
888✔
2547

2548
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
888✔
2549
    if (REALM_UNLIKELY(!legal_at_this_time)) {
888✔
2550
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2551
    }
×
2552

2553
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
888✔
2554
    auto status = protocol_error_to_status(protocol_error, info.message);
888✔
2555
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
888✔
2556
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2557
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2558
                             info.raw_error_code)};
×
2559
    }
×
2560

2561
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2562
    // the SessionWrapper may not be available
2563
    if (m_state == Active) {
888✔
2564
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
876✔
2565
        if (debug_action == SyncClientHookAction::EarlyReturn) {
876✔
2566
            return Status::OK();
8✔
2567
        }
8✔
2568
    }
876✔
2569

2570
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2571
    // containing the compensating write has appeared in a download message.
2572
    if (status == ErrorCodes::SyncCompensatingWrite) {
880✔
2573
        // If the client is not active, the compensating writes will not be processed now, but will be
2574
        // sent again the next time the client connects
2575
        if (m_state == Active) {
48✔
2576
            REALM_ASSERT(info.compensating_write_server_version.has_value());
48✔
2577
            m_pending_compensating_write_errors.push_back(info);
48✔
2578
        }
48✔
2579
        return Status::OK();
48✔
2580
    }
48✔
2581

2582
    if (protocol_error == ProtocolError::schema_version_changed) {
832✔
2583
        // Enable upload immediately if the session is still active.
2584
        if (m_state == Active) {
68✔
2585
            auto wt = get_db()->start_write();
68✔
2586
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
68✔
2587
            wt->commit();
68✔
2588
            // Notify SyncSession a schema migration is required.
2589
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
68✔
2590
        }
68✔
2591
        // Keep the session active to upload any unsynced changes.
2592
        return Status::OK();
68✔
2593
    }
68✔
2594

2595
    m_error_message_received = true;
764✔
2596
    suspend(SessionErrorInfo{info, std::move(status)});
764✔
2597
    return Status::OK();
764✔
2598
}
832✔
2599

2600
void Session::suspend(const SessionErrorInfo& info)
2601
{
844✔
2602
    REALM_ASSERT(!m_suspended);
844✔
2603
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
844✔
2604
    logger.debug("Suspended"); // Throws
844✔
2605

2606
    m_suspended = true;
844✔
2607

2608
    // Detect completion of the unbinding process
2609
    if (m_unbind_message_send_complete && m_error_message_received) {
844✔
2610
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2611
        // we received an ERROR message implies that the deactivation process must
2612
        // have been initiated, so this session must be in the Deactivating state.
2613
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
12✔
2614

2615
        // The deactivation process completes when the unbinding process
2616
        // completes.
2617
        complete_deactivation(); // Throws
12✔
2618
        // Life cycle state is now Deactivated
2619
    }
12✔
2620

2621
    // Notify the application of the suspension of the session if the session is
2622
    // still in the Active state
2623
    if (m_state == Active) {
844✔
2624
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
832✔
2625
        m_conn.one_less_active_unsuspended_session(); // Throws
832✔
2626
        on_suspended(info);                           // Throws
832✔
2627
    }
832✔
2628

2629
    if (!info.is_fatal) {
844✔
2630
        begin_resumption_delay(info);
180✔
2631
    }
180✔
2632

2633
    // Ready to send the UNBIND message, if it has not been sent already
2634
    if (!m_unbind_message_sent)
844✔
2635
        ensure_enlisted_to_send(); // Throws
832✔
2636
}
844✔
2637

2638
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2639
{
60✔
2640
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
60✔
2641
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2642
                           [&](const PendingTestCommand& command) {
60✔
2643
                               return command.id == ident;
60✔
2644
                           });
60✔
2645
    if (it == m_pending_test_commands.end()) {
60✔
2646
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2647
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2648
    }
×
2649

2650
    it->promise.emplace_value(std::string{body});
60✔
2651
    m_pending_test_commands.erase(it);
60✔
2652

2653
    return Status::OK();
60✔
2654
}
60✔
2655

2656
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2657
{
180✔
2658
    REALM_ASSERT(!m_try_again_activation_timer);
180✔
2659

2660
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
180✔
2661
                                  error_info.resumption_delay_interval);
180✔
2662
    auto try_again_interval = m_try_again_delay_info.delay_interval();
180✔
2663
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
180✔
2664
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2665
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2666
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2667
        try_again_interval = std::chrono::milliseconds{1000};
148✔
2668
    }
148✔
2669
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
180✔
2670
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
180✔
2671
        if (status == ErrorCodes::OperationAborted)
180✔
2672
            return;
32✔
2673
        else if (!status.is_ok())
148✔
2674
            throw Exception(status);
×
2675

2676
        m_try_again_activation_timer.reset();
148✔
2677
        cancel_resumption_delay();
148✔
2678
    });
148✔
2679
}
180✔
2680

2681
void Session::clear_resumption_delay_state()
2682
{
48,988✔
2683
    if (m_try_again_activation_timer) {
48,988✔
2684
        logger.debug("Clearing resumption delay state after successful download");
×
2685
        m_try_again_delay_info.reset();
×
2686
    }
×
2687
}
48,988✔
2688

2689
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2690
{
49,004✔
2691
    const SyncProgress& a = m_progress;
49,004✔
2692
    const SyncProgress& b = progress;
49,004✔
2693
    std::string message;
49,004✔
2694
    if (b.latest_server_version.version < a.latest_server_version.version) {
49,004✔
2695
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2696
                               "session (current: %1, received: %2)",
×
2697
                               a.latest_server_version.version, b.latest_server_version.version);
×
2698
    }
×
2699
    if (b.upload.client_version < a.upload.client_version) {
49,004✔
2700
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2701
                               "throughout a session (current: %1, received: %2)",
×
2702
                               a.upload.client_version, b.upload.client_version);
×
2703
    }
×
2704
    if (b.upload.client_version > m_last_version_available) {
49,004✔
2705
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2706
                               "version in existence (current: %1, received: %2)",
×
2707
                               m_last_version_available, b.upload.client_version);
×
2708
    }
×
2709
    if (b.download.server_version < a.download.server_version) {
49,004✔
2710
        message =
×
2711
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2712
                         a.download.server_version, b.download.server_version);
×
2713
    }
×
2714
    if (b.download.server_version > b.latest_server_version.version) {
49,004✔
2715
        message = util::format(
×
2716
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2717
            b.download.server_version, b.latest_server_version.version);
×
2718
    }
×
2719
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
49,004✔
2720
        message = util::format(
×
2721
            "Last integrated client version on the server at the position in the server's history of the download "
×
2722
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2723
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2724
    }
×
2725
    if (b.download.last_integrated_client_version > b.upload.client_version) {
49,004✔
2726
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2727
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2728
                               "on the server (download: %1, upload: %2)",
×
2729
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2730
    }
×
2731
    if (b.download.server_version < b.upload.last_integrated_server_version) {
49,004✔
2732
        message = util::format(
×
2733
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2734
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2735
            b.download.server_version, b.upload.last_integrated_server_version);
×
2736
    }
×
2737

2738
    if (message.empty()) {
49,004✔
2739
        return Status::OK();
49,002✔
2740
    }
49,002✔
2741
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2742
}
49,004✔
2743

2744

2745
void Session::check_for_download_completion()
2746
{
64,334✔
2747
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
64,334✔
2748
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
64,334✔
2749
    if (m_last_download_mark_received == m_last_triggering_download_mark)
64,334✔
2750
        return;
47,132✔
2751
    if (m_last_download_mark_received < m_target_download_mark)
17,202✔
2752
        return;
380✔
2753
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,822✔
2754
        return;
×
2755
    m_last_triggering_download_mark = m_target_download_mark;
16,822✔
2756
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,822✔
2757
        // Activate the upload process now, and enable immediate reactivation
2758
        // after a subsequent fast reconnect.
2759
        m_delay_uploads = false;
4,754✔
2760
        ensure_enlisted_to_send(); // Throws
4,754✔
2761
    }
4,754✔
2762
    on_download_completion(); // Throws
16,822✔
2763
}
16,822✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc