• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2520

25 Jul 2024 09:27AM UTC coverage: 91.081% (-0.02%) from 91.096%
2520

push

Evergreen

web-flow
Prepare for release 14.11.1 (#7924)

Co-authored-by: nicola-cab <1497069+nicola-cab@users.noreply.github.com>

102662 of 181458 branches covered (56.58%)

216324 of 237506 relevant lines covered (91.08%)

5898073.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.62
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/client_reset_operation.hpp>
10
#include <realm/sync/noinst/sync_schema_migration.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
24

25
#include <system_error>
26
#include <sstream>
27

28
// NOTE: The protocol specification is in `/doc/protocol.md`
29

30
using namespace realm;
31
using namespace _impl;
32
using namespace realm::util;
33
using namespace realm::sync;
34
using namespace realm::sync::websocket;
35

36
// clang-format off
37
using Connection      = ClientImpl::Connection;
38
using Session         = ClientImpl::Session;
39
using UploadChangeset = ClientHistory::UploadChangeset;
40

41
// These are a work-around for a bug in MSVC. It cannot find in-class types
42
// mentioned in signature of out-of-line member function definitions.
43
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
44
using OutputBuffer                = ClientImpl::OutputBuffer;
45
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
46
// clang-format on
47

48
void ClientImpl::ReconnectInfo::reset() noexcept
49
{
1,864✔
50
    m_backoff_state.reset();
1,864✔
51
    scheduled_reset = false;
1,864✔
52
}
1,864✔
53

54

55
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
56
                                       std::optional<ResumptionDelayInfo> new_delay_info)
57
{
3,786✔
58
    m_backoff_state.update(new_reason, new_delay_info);
3,786✔
59
}
3,786✔
60

61

62
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
63
{
6,004✔
64
    if (scheduled_reset) {
6,004✔
65
        reset();
8✔
66
    }
8✔
67

68
    if (!m_backoff_state.triggering_error) {
6,004✔
69
        return std::chrono::milliseconds::zero();
4,598✔
70
    }
4,598✔
71

72
    switch (*m_backoff_state.triggering_error) {
1,406✔
73
        case ConnectionTerminationReason::closed_voluntarily:
80✔
74
            return std::chrono::milliseconds::zero();
80✔
75
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
76
            return std::chrono::milliseconds::max();
18✔
77
        default:
1,308✔
78
            if (m_reconnect_mode == ReconnectMode::testing) {
1,308✔
79
                return std::chrono::milliseconds::max();
954✔
80
            }
954✔
81

82
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
354✔
83
            return m_backoff_state.delay_interval();
354✔
84
    }
1,406✔
85
}
1,406✔
86

87

88
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
89
                                      port_type& port, std::string& path) const
90
{
4,370✔
91
    util::Uri uri(url); // Throws
4,370✔
92
    uri.canonicalize(); // Throws
4,370✔
93
    std::string userinfo, address_2, port_2;
4,370✔
94
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,370✔
95
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,370✔
96
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,370✔
97
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,370✔
98
    if (REALM_UNLIKELY(!good))
4,370✔
99
        return false;
×
100
    ProtocolEnvelope protocol_2;
4,370✔
101
    port_type port_3;
4,370✔
102
    if (realm_scheme) {
4,370✔
103
        if (uri.get_scheme() == "realm:") {
×
104
            protocol_2 = ProtocolEnvelope::realm;
×
105
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
106
        }
×
107
        else {
×
108
            protocol_2 = ProtocolEnvelope::realms;
×
109
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
110
        }
×
111
    }
×
112
    else {
4,370✔
113
        REALM_ASSERT(ws_scheme);
4,370✔
114
        if (uri.get_scheme() == "ws:") {
4,370✔
115
            protocol_2 = ProtocolEnvelope::ws;
4,290✔
116
            port_3 = 80;
4,290✔
117
        }
4,290✔
118
        else {
80✔
119
            protocol_2 = ProtocolEnvelope::wss;
80✔
120
            port_3 = 443;
80✔
121
        }
80✔
122
    }
4,370✔
123
    if (!port_2.empty()) {
4,370✔
124
        std::istringstream in(port_2);    // Throws
4,266✔
125
        in.imbue(std::locale::classic()); // Throws
4,266✔
126
        in >> port_3;
4,266✔
127
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,266✔
128
            return false;
×
129
    }
4,266✔
130
    std::string path_2 = uri.get_path(); // Throws (copy)
4,370✔
131

132
    protocol = protocol_2;
4,370✔
133
    address = std::move(address_2);
4,370✔
134
    port = port_3;
4,370✔
135
    path = std::move(path_2);
4,370✔
136
    return true;
4,370✔
137
}
4,370✔
138

139
ClientImpl::ClientImpl(ClientConfig config)
140
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
4,910✔
141
    , logger{*logger_ptr}
4,910✔
142
    , m_reconnect_mode{config.reconnect_mode}
4,910✔
143
    , m_connect_timeout{config.connect_timeout}
4,910✔
144
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,910✔
145
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,910✔
146
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,910✔
147
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,910✔
148
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,910✔
149
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,910✔
150
    , m_dry_run{config.dry_run}
4,910✔
151
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,910✔
152
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,910✔
153
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,910✔
154
    , m_socket_provider{std::move(config.socket_provider)}
4,910✔
155
    , m_client_protocol{} // Throws
4,910✔
156
    , m_one_connection_per_session{config.one_connection_per_session}
4,910✔
157
    , m_random{}
4,910✔
158
{
9,958✔
159
    // FIXME: Would be better if seeding was up to the application.
160
    util::seed_prng_nondeterministically(m_random); // Throws
9,958✔
161

162
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,958✔
163
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,958✔
164
                 get_current_protocol_version()); // Throws
9,958✔
165
    logger.info("Platform: %1", util::get_platform_info());
9,958✔
166
    const char* build_mode;
9,958✔
167
#if REALM_DEBUG
9,958✔
168
    build_mode = "Debug";
9,958✔
169
#else
170
    build_mode = "Release";
171
#endif
172
    logger.debug("Build mode: %1", build_mode);
9,958✔
173
    logger.debug("Config param: one_connection_per_session = %1",
9,958✔
174
                 config.one_connection_per_session); // Throws
9,958✔
175
    logger.debug("Config param: connect_timeout = %1 ms",
9,958✔
176
                 config.connect_timeout); // Throws
9,958✔
177
    logger.debug("Config param: connection_linger_time = %1 ms",
9,958✔
178
                 config.connection_linger_time); // Throws
9,958✔
179
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,958✔
180
                 config.ping_keepalive_period); // Throws
9,958✔
181
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,958✔
182
                 config.pong_keepalive_timeout); // Throws
9,958✔
183
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,958✔
184
                 config.fast_reconnect_limit); // Throws
9,958✔
185
    logger.debug("Config param: disable_sync_to_disk = %1",
9,958✔
186
                 config.disable_sync_to_disk); // Throws
9,958✔
187
    logger.debug(
9,958✔
188
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,958✔
189
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,958✔
190
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,958✔
191
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,958✔
192

193
    if (config.reconnect_mode != ReconnectMode::normal) {
9,958✔
194
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
195
                    "never do this in production!");
776✔
196
    }
776✔
197

198
    if (config.dry_run) {
9,958✔
199
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
200
                    "never do this in production!");
×
201
    }
×
202

203
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,958✔
204

205
    if (m_one_connection_per_session) {
9,958✔
206
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
207
                    "never do this in production");
4✔
208
    }
4✔
209

210
    if (config.disable_upload_activation_delay) {
9,958✔
211
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
212
                    "never do this in production");
×
213
    }
×
214

215
    if (config.disable_sync_to_disk) {
9,958✔
216
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
217
                    "never do this in production");
×
218
    }
×
219

220
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,750✔
221
        if (status == ErrorCodes::OperationAborted)
14,750✔
222
            return;
×
223
        else if (!status.is_ok())
14,750✔
224
            throw Exception(status);
×
225
        actualize_and_finalize_session_wrappers(); // Throws
14,750✔
226
    });
14,750✔
227
}
9,958✔
228

229
void ClientImpl::incr_outstanding_posts()
230
{
205,860✔
231
    util::CheckedLockGuard lock(m_drain_mutex);
205,860✔
232
    ++m_outstanding_posts;
205,860✔
233
    m_drained = false;
205,860✔
234
}
205,860✔
235

236
void ClientImpl::decr_outstanding_posts()
237
{
205,854✔
238
    util::CheckedLockGuard lock(m_drain_mutex);
205,854✔
239
    REALM_ASSERT(m_outstanding_posts);
205,854✔
240
    if (--m_outstanding_posts <= 0) {
205,854✔
241
        // Notify must happen with lock held or another thread could destroy
242
        // ClientImpl between when we release the lock and when we call notify
243
        m_drain_cv.notify_all();
18,226✔
244
    }
18,226✔
245
}
205,854✔
246

247
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
248
{
56,444✔
249
    REALM_ASSERT(m_socket_provider);
56,444✔
250
    incr_outstanding_posts();
56,444✔
251
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
56,444✔
252
        auto decr_guard = util::make_scope_exit([&]() noexcept {
56,444✔
253
            decr_outstanding_posts();
56,438✔
254
        });
56,438✔
255
        handler(status);
56,444✔
256
    });
56,444✔
257
}
56,444✔
258

259
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
260
{
131,642✔
261
    REALM_ASSERT(m_socket_provider);
131,642✔
262
    incr_outstanding_posts();
131,642✔
263
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
131,644✔
264
        auto decr_guard = util::make_scope_exit([&]() noexcept {
131,644✔
265
            decr_outstanding_posts();
131,644✔
266
        });
131,644✔
267
        if (status == ErrorCodes::OperationAborted)
131,644✔
268
            return;
×
269
        if (!status.is_ok())
131,644✔
270
            throw Exception(status);
×
271
        handler();
131,644✔
272
    });
131,644✔
273
}
131,642✔
274

275

276
void ClientImpl::drain_connections()
277
{
9,958✔
278
    logger.debug("Draining connections during sync client shutdown");
9,958✔
279
    for (auto& server_slot_pair : m_server_slots) {
9,958✔
280
        auto& server_slot = server_slot_pair.second;
2,722✔
281

282
        if (server_slot.connection) {
2,722✔
283
            auto& conn = server_slot.connection;
2,496✔
284
            conn->force_close();
2,496✔
285
        }
2,496✔
286
        else {
226✔
287
            for (auto& conn_pair : server_slot.alt_connections) {
226✔
288
                conn_pair.second->force_close();
6✔
289
            }
6✔
290
        }
226✔
291
    }
2,722✔
292
}
9,958✔
293

294

295
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
296
                                                       SyncSocketProvider::FunctionHandler&& handler)
297
{
17,780✔
298
    REALM_ASSERT(m_socket_provider);
17,780✔
299
    incr_outstanding_posts();
17,780✔
300
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,780✔
301
        auto decr_guard = util::make_scope_exit([&]() noexcept {
17,780✔
302
            decr_outstanding_posts();
17,776✔
303
        });
17,776✔
304
        handler(status);
17,774✔
305
    });
17,774✔
306
}
17,780✔
307

308

309
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
310
{
12,788✔
311
    REALM_ASSERT(m_socket_provider);
12,788✔
312
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,788✔
313
}
12,788✔
314

315
Connection::~Connection()
316
{
2,830✔
317
    if (m_websocket_sentinel) {
2,830✔
318
        m_websocket_sentinel->destroyed = true;
×
319
        m_websocket_sentinel.reset();
×
320
    }
×
321
}
2,830✔
322

323
void Connection::activate()
324
{
2,830✔
325
    REALM_ASSERT(m_on_idle);
2,830✔
326
    m_activated = true;
2,830✔
327
    if (m_num_active_sessions == 0)
2,830✔
328
        m_on_idle->trigger();
×
329
    // We cannot in general connect immediately, because a prior failure to
330
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
331
    initiate_reconnect_wait(); // Throws
2,830✔
332
}
2,830✔
333

334

335
void Connection::activate_session(std::unique_ptr<Session> sess)
336
{
10,404✔
337
    REALM_ASSERT(sess);
10,404✔
338
    REALM_ASSERT(&sess->m_conn == this);
10,404✔
339
    REALM_ASSERT(!m_force_closed);
10,404✔
340
    Session& sess_2 = *sess;
10,404✔
341
    session_ident_type ident = sess->m_ident;
10,404✔
342
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,404✔
343
    bool was_inserted = p.second;
10,404✔
344
    REALM_ASSERT(was_inserted);
10,404✔
345
    // Save the session ident to the historical list of session idents
346
    m_session_history.insert(ident);
10,404✔
347
    sess_2.activate(); // Throws
10,404✔
348
    if (m_state == ConnectionState::connected) {
10,404✔
349
        bool fast_reconnect = false;
7,298✔
350
        sess_2.connection_established(fast_reconnect); // Throws
7,298✔
351
    }
7,298✔
352
    ++m_num_active_sessions;
10,404✔
353
}
10,404✔
354

355

356
void Connection::initiate_session_deactivation(Session* sess)
357
{
10,404✔
358
    REALM_ASSERT(sess);
10,404✔
359
    REALM_ASSERT(&sess->m_conn == this);
10,404✔
360
    REALM_ASSERT(m_num_active_sessions);
10,404✔
361
    // Since the client may be waiting for m_num_active_sessions to reach 0
362
    // in stop_and_wait() (on a separate thread), deactivate Session before
363
    // decrementing the num active sessions value.
364
    sess->initiate_deactivation(); // Throws
10,404✔
365
    if (sess->m_state == Session::Deactivated) {
10,404✔
366
        finish_session_deactivation(sess);
956✔
367
    }
956✔
368
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,404✔
369
        if (m_activated && m_state == ConnectionState::disconnected)
4,630✔
370
            m_on_idle->trigger();
372✔
371
    }
4,630✔
372
}
10,404✔
373

374

375
void Connection::cancel_reconnect_delay()
376
{
2,088✔
377
    REALM_ASSERT(m_activated);
2,088✔
378

379
    if (m_reconnect_delay_in_progress) {
2,088✔
380
        if (m_nonzero_reconnect_delay)
1,852✔
381
            logger.detail("Canceling reconnect delay"); // Throws
928✔
382

383
        // Cancel the in-progress wait operation by destroying the timer
384
        // object. Destruction is needed in this case, because a new wait
385
        // operation might have to be initiated before the previous one
386
        // completes (its completion handler starts to execute), so the new wait
387
        // operation must be done on a new timer object.
388
        m_reconnect_disconnect_timer.reset();
1,852✔
389
        m_reconnect_delay_in_progress = false;
1,852✔
390
        m_reconnect_info.reset();
1,852✔
391
        initiate_reconnect_wait(); // Throws
1,852✔
392
        return;
1,852✔
393
    }
1,852✔
394

395
    // If we are not disconnected, then we need to make sure the next time we get disconnected
396
    // that we are allowed to re-connect as quickly as possible.
397
    //
398
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
399
    // backoff/delay state before calculating the next delay, unless a PONG message is received
400
    // for the urgent PING message we send below.
401
    //
402
    // If we get a PONG message for the urgent PING message sent below, then the connection is
403
    // healthy and we can calculate the next delay normally.
404
    if (m_state != ConnectionState::disconnected) {
236✔
405
        m_reconnect_info.scheduled_reset = true;
236✔
406
        m_ping_after_scheduled_reset_of_reconnect_info = false;
236✔
407

408
        schedule_urgent_ping(); // Throws
236✔
409
        return;
236✔
410
    }
236✔
411
    // Nothing to do in this case. The next reconnect attemp will be made as
412
    // soon as there are any sessions that are both active and unsuspended.
413
}
236✔
414

415
void Connection::finish_session_deactivation(Session* sess)
416
{
8,124✔
417
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,124✔
418
    auto ident = sess->m_ident;
8,124✔
419
    m_sessions.erase(ident);
8,124✔
420
    m_session_history.erase(ident);
8,124✔
421
}
8,124✔
422

423
void Connection::force_close()
424
{
2,502✔
425
    if (m_force_closed) {
2,502✔
426
        return;
×
427
    }
×
428

429
    m_force_closed = true;
2,502✔
430

431
    if (m_state != ConnectionState::disconnected) {
2,502✔
432
        voluntary_disconnect();
2,464✔
433
    }
2,464✔
434

435
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,502✔
436
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,502✔
437
        m_reconnect_disconnect_timer.reset();
38✔
438
        m_reconnect_delay_in_progress = false;
38✔
439
        m_disconnect_delay_in_progress = false;
38✔
440
    }
38✔
441

442
    // We must copy any session pointers we want to close to a vector because force_closing
443
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
444
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
445
    std::vector<Session*> to_close;
2,502✔
446
    for (auto& session_pair : m_sessions) {
2,502✔
447
        if (session_pair.second->m_state == Session::State::Active) {
102✔
448
            to_close.push_back(session_pair.second.get());
102✔
449
        }
102✔
450
    }
102✔
451

452
    for (auto& sess : to_close) {
2,502✔
453
        sess->force_close();
102✔
454
    }
102✔
455

456
    logger.debug("Force closed idle connection");
2,502✔
457
}
2,502✔
458

459

460
void Connection::websocket_connected_handler(const std::string& protocol)
461
{
3,584✔
462
    if (!protocol.empty()) {
3,584✔
463
        std::string_view expected_prefix =
3,584✔
464
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,584✔
465
        // FIXME: Use std::string_view::begins_with() in C++20.
466
        auto prefix_matches = [&](std::string_view other) {
3,584✔
467
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,584✔
468
        };
3,584✔
469
        if (prefix_matches(expected_prefix)) {
3,584✔
470
            util::MemoryInputStream in;
3,584✔
471
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,584✔
472
            in.imbue(std::locale::classic());
3,584✔
473
            in.unsetf(std::ios_base::skipws);
3,584✔
474
            int value_2 = 0;
3,584✔
475
            in >> value_2;
3,584✔
476
            if (in && in.eof() && value_2 >= 0) {
3,584✔
477
                bool good_version =
3,584✔
478
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,584✔
479
                if (good_version) {
3,584✔
480
                    logger.detail("Negotiated protocol version: %1", value_2);
3,584✔
481
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
482
                    // will provide the appservices connection ID via a log message.
483
                    // TODO: Remove once the server starts sending the connection ID
484
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,584✔
485
                    m_negotiated_protocol_version = value_2;
3,584✔
486
                    handle_connection_established(); // Throws
3,584✔
487
                    return;
3,584✔
488
                }
3,584✔
489
            }
3,584✔
490
        }
3,584✔
491
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
492
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
493
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
494
    }
×
495
    else {
×
496
        close_due_to_client_side_error(
×
497
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
498
            ConnectionTerminationReason::bad_headers_in_http_response);
×
499
    }
×
500
}
3,584✔
501

502

503
bool Connection::websocket_binary_message_received(util::Span<const char> data)
504
{
80,410✔
505
    if (m_force_closed) {
80,410✔
506
        logger.debug("Received binary message after connection was force closed");
×
507
        return false;
×
508
    }
×
509

510
    using sf = SimulatedFailure;
80,410✔
511
    if (sf::check_trigger(sf::sync_client__read_head)) {
80,410✔
512
        close_due_to_client_side_error(
454✔
513
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
454✔
514
            ConnectionTerminationReason::read_or_write_error);
454✔
515
        return bool(m_websocket);
454✔
516
    }
454✔
517

518
    handle_message_received(data);
79,956✔
519
    return bool(m_websocket);
79,956✔
520
}
80,410✔
521

522

523
void Connection::websocket_error_handler()
524
{
678✔
525
    m_websocket_error_received = true;
678✔
526
}
678✔
527

528
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
529
{
796✔
530
    if (m_force_closed) {
796✔
531
        logger.debug("Received websocket close message after connection was force closed");
×
532
        return false;
×
533
    }
×
534
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
796✔
535

536
    switch (error_code) {
796✔
537
        case WebSocketError::websocket_ok:
70✔
538
            break;
70✔
539
        case WebSocketError::websocket_resolve_failed:
4✔
540
            [[fallthrough]];
4✔
541
        case WebSocketError::websocket_connection_failed: {
112✔
542
            SessionErrorInfo error_info(
112✔
543
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
544
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
545
            // to make sure the websocket URL is correct
546
            if (!m_server_endpoint.is_verified) {
112✔
547
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
548
            }
84✔
549
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
550
            break;
112✔
551
        }
4✔
552
        case WebSocketError::websocket_read_error:
544✔
553
            [[fallthrough]];
544✔
554
        case WebSocketError::websocket_write_error: {
544✔
555
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
544✔
556
                                         ConnectionTerminationReason::read_or_write_error);
544✔
557
            break;
544✔
558
        }
544✔
559
        case WebSocketError::websocket_going_away:
✔
560
            [[fallthrough]];
×
561
        case WebSocketError::websocket_protocol_error:
✔
562
            [[fallthrough]];
×
563
        case WebSocketError::websocket_unsupported_data:
✔
564
            [[fallthrough]];
×
565
        case WebSocketError::websocket_invalid_payload_data:
✔
566
            [[fallthrough]];
×
567
        case WebSocketError::websocket_policy_violation:
✔
568
            [[fallthrough]];
×
569
        case WebSocketError::websocket_reserved:
✔
570
            [[fallthrough]];
×
571
        case WebSocketError::websocket_no_status_received:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_invalid_extension: {
✔
574
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
575
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
576
            break;
×
577
        }
×
578
        case WebSocketError::websocket_message_too_big: {
4✔
579
            auto message = util::format(
4✔
580
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
581
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
582
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
583
            involuntary_disconnect(std::move(error_info),
4✔
584
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
585
            break;
4✔
586
        }
×
587
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
588
            close_due_to_client_side_error(
10✔
589
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
590
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
591
            break;
10✔
592
        }
×
593
        case WebSocketError::websocket_fatal_error: {
✔
594
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
595
            // been verified, then use a non-fatal error and try to perform a location update.
596
            SessionErrorInfo error_info(
×
597
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
598
                IsFatal{m_server_endpoint.is_verified});
×
599
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
600
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
601
            // to make sure the websocket URL is correct
602
            if (!m_server_endpoint.is_verified) {
×
603
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
604
                reason = ConnectionTerminationReason::connect_operation_failed;
×
605
            }
×
606
            involuntary_disconnect(std::move(error_info), reason);
×
607
            break;
×
608
        }
×
609
        case WebSocketError::websocket_forbidden: {
✔
610
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
612
            involuntary_disconnect(std::move(error_info),
×
613
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_unauthorized: {
44✔
617
            SessionErrorInfo error_info(
44✔
618
                {ErrorCodes::AuthError,
44✔
619
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
620
                IsFatal{false});
44✔
621
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
622
            involuntary_disconnect(std::move(error_info),
44✔
623
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
624
            break;
44✔
625
        }
×
626
        case WebSocketError::websocket_moved_permanently: {
12✔
627
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
628
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
629
            involuntary_disconnect(std::move(error_info),
12✔
630
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
631
            break;
12✔
632
        }
×
633
        case WebSocketError::websocket_abnormal_closure: {
✔
634
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
635
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
636
            involuntary_disconnect(std::move(error_info),
×
637
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
638
            break;
×
639
        }
×
640
        case WebSocketError::websocket_internal_server_error:
✔
641
            [[fallthrough]];
×
642
        case WebSocketError::websocket_retry_error: {
✔
643
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
644
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
645
            break;
×
646
        }
×
647
    }
796✔
648

649
    return bool(m_websocket);
796✔
650
}
796✔
651

652
// Guarantees that handle_reconnect_wait() is never called from within the
653
// execution of initiate_reconnect_wait() (no callback reentrance).
654
void Connection::initiate_reconnect_wait()
655
{
8,466✔
656
    REALM_ASSERT(m_activated);
8,466✔
657
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,466✔
658
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,466✔
659

660
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
661
    if (m_force_closed) {
8,466✔
662
        return;
2,464✔
663
    }
2,464✔
664

665
    m_reconnect_delay_in_progress = true;
6,002✔
666
    auto delay = m_reconnect_info.delay_interval();
6,002✔
667
    if (delay == std::chrono::milliseconds::max()) {
6,002✔
668
        logger.detail("Reconnection delayed indefinitely"); // Throws
972✔
669
        // Not actually starting a timer corresponds to an infinite wait
670
        m_nonzero_reconnect_delay = true;
972✔
671
        return;
972✔
672
    }
972✔
673

674
    if (delay == std::chrono::milliseconds::zero()) {
5,030✔
675
        m_nonzero_reconnect_delay = false;
4,678✔
676
    }
4,678✔
677
    else {
352✔
678
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
352✔
679
        m_nonzero_reconnect_delay = true;
352✔
680
    }
352✔
681

682
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
683
    // we need it to be cancelable in case the connection is terminated before the timer
684
    // callback is run.
685
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,032✔
686
        // If the operation is aborted, the connection object may have been
687
        // destroyed.
688
        if (status != ErrorCodes::OperationAborted)
5,032✔
689
            handle_reconnect_wait(status); // Throws
3,786✔
690
    });                                    // Throws
5,032✔
691
}
5,030✔
692

693

694
void Connection::handle_reconnect_wait(Status status)
695
{
3,784✔
696
    if (!status.is_ok()) {
3,784✔
697
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
698
        throw Exception(status);
×
699
    }
×
700

701
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,784✔
702
    m_reconnect_delay_in_progress = false;
3,784✔
703

704
    if (m_num_active_unsuspended_sessions > 0)
3,784✔
705
        initiate_reconnect(); // Throws
3,772✔
706
}
3,784✔
707

708
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
709
    explicit WebSocketObserverShim(Connection* conn)
710
        : conn(conn)
1,792✔
711
        , sentinel(conn->m_websocket_sentinel)
1,792✔
712
    {
3,786✔
713
    }
3,786✔
714

715
    Connection* conn;
716
    util::bind_ptr<LifecycleSentinel> sentinel;
717

718
    void websocket_connected_handler(const std::string& protocol) override
719
    {
3,584✔
720
        if (sentinel->destroyed) {
3,584✔
721
            return;
×
722
        }
×
723

724
        return conn->websocket_connected_handler(protocol);
3,584✔
725
    }
3,584✔
726

727
    void websocket_error_handler() override
728
    {
678✔
729
        if (sentinel->destroyed) {
678✔
730
            return;
×
731
        }
×
732

733
        conn->websocket_error_handler();
678✔
734
    }
678✔
735

736
    bool websocket_binary_message_received(util::Span<const char> data) override
737
    {
80,412✔
738
        if (sentinel->destroyed) {
80,412✔
739
            return false;
×
740
        }
×
741

742
        return conn->websocket_binary_message_received(data);
80,412✔
743
    }
80,412✔
744

745
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
746
    {
796✔
747
        if (sentinel->destroyed) {
796✔
748
            return true;
×
749
        }
×
750

751
        return conn->websocket_closed_handler(was_clean, error_code, msg);
796✔
752
    }
796✔
753
};
754

755
void Connection::initiate_reconnect()
756
{
3,786✔
757
    REALM_ASSERT(m_activated);
3,786✔
758

759
    m_state = ConnectionState::connecting;
3,786✔
760
    report_connection_state_change(ConnectionState::connecting); // Throws
3,786✔
761
    if (m_websocket_sentinel) {
3,786✔
762
        m_websocket_sentinel->destroyed = true;
×
763
    }
×
764
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,786✔
765
    m_websocket.reset();
3,786✔
766

767
    // Watchdog
768
    initiate_connect_wait(); // Throws
3,786✔
769

770
    std::vector<std::string> sec_websocket_protocol;
3,786✔
771
    {
3,786✔
772
        auto protocol_prefix =
3,786✔
773
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,786✔
774
        int min = get_oldest_supported_protocol_version();
3,786✔
775
        int max = get_current_protocol_version();
3,786✔
776
        REALM_ASSERT_3(min, <=, max);
3,786✔
777
        // List protocol version in descending order to ensure that the server
778
        // selects the highest possible version.
779
        for (int version = max; version >= min; --version) {
52,994✔
780
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
49,208✔
781
        }
49,208✔
782
    }
3,786✔
783

784
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,786✔
785
                m_server_endpoint.port, m_http_request_path_prefix);
3,786✔
786

787
    m_websocket_error_received = false;
3,786✔
788
    m_websocket =
3,786✔
789
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,786✔
790
                                            WebSocketEndpoint{
3,786✔
791
                                                m_server_endpoint.address,
3,786✔
792
                                                m_server_endpoint.port,
3,786✔
793
                                                get_http_request_path(),
3,786✔
794
                                                std::move(sec_websocket_protocol),
3,786✔
795
                                                is_ssl(m_server_endpoint.envelope),
3,786✔
796
                                                /// DEPRECATED - The following will be removed in a future release
797
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,786✔
798
                                                m_verify_servers_ssl_certificate,
3,786✔
799
                                                m_ssl_trust_certificate_path,
3,786✔
800
                                                m_ssl_verify_callback,
3,786✔
801
                                                m_proxy_config,
3,786✔
802
                                            });
3,786✔
803
}
3,786✔
804

805

806
void Connection::initiate_connect_wait()
807
{
3,786✔
808
    // Deploy a watchdog to enforce an upper bound on the time it can take to
809
    // fully establish the connection (including SSL and WebSocket
810
    // handshakes). Without such a watchdog, connect operations could take very
811
    // long, or even indefinite time.
812
    milliseconds_type time = m_client.m_connect_timeout;
3,786✔
813

814
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,786✔
815
        // If the operation is aborted, the connection object may have been
816
        // destroyed.
817
        if (status != ErrorCodes::OperationAborted)
3,786✔
818
            handle_connect_wait(status); // Throws
×
819
    });                                  // Throws
3,786✔
820
}
3,786✔
821

822

823
void Connection::handle_connect_wait(Status status)
824
{
×
825
    if (!status.is_ok()) {
×
826
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
827
        throw Exception(status);
×
828
    }
×
829

830
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
831
    logger.info("Connect timeout"); // Throws
×
832
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
833
                                IsFatal{false});
×
834
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
835
    // to make sure the websocket URL is correct
836
    if (!m_server_endpoint.is_verified) {
×
837
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
838
    }
×
839
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
840
}
×
841

842

843
void Connection::handle_connection_established()
844
{
3,584✔
845
    // Cancel connect timeout watchdog
846
    m_connect_timer.reset();
3,584✔
847

848
    m_state = ConnectionState::connected;
3,584✔
849
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,584✔
850

851
    milliseconds_type now = monotonic_clock_now();
3,584✔
852
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,584✔
853
    initiate_ping_delay(now);     // Throws
3,584✔
854

855
    bool fast_reconnect = false;
3,584✔
856
    if (m_disconnect_has_occurred) {
3,584✔
857
        milliseconds_type time = now - m_disconnect_time;
1,008✔
858
        if (time <= m_client.m_fast_reconnect_limit)
1,008✔
859
            fast_reconnect = true;
1,008✔
860
    }
1,008✔
861

862
    for (auto& p : m_sessions) {
4,668✔
863
        Session& sess = *p.second;
4,668✔
864
        sess.connection_established(fast_reconnect); // Throws
4,668✔
865
    }
4,668✔
866

867
    report_connection_state_change(ConnectionState::connected); // Throws
3,584✔
868
}
3,584✔
869

870

871
void Connection::schedule_urgent_ping()
872
{
236✔
873
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
236✔
874
    if (m_ping_delay_in_progress) {
236✔
875
        m_heartbeat_timer.reset();
112✔
876
        m_ping_delay_in_progress = false;
112✔
877
        m_minimize_next_ping_delay = true;
112✔
878
        milliseconds_type now = monotonic_clock_now();
112✔
879
        initiate_ping_delay(now); // Throws
112✔
880
        return;
112✔
881
    }
112✔
882
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
124✔
883
    if (!m_send_ping)
124✔
884
        m_minimize_next_ping_delay = true;
124✔
885
}
124✔
886

887

888
void Connection::initiate_ping_delay(milliseconds_type now)
889
{
3,842✔
890
    REALM_ASSERT(!m_ping_delay_in_progress);
3,842✔
891
    REALM_ASSERT(!m_waiting_for_pong);
3,842✔
892
    REALM_ASSERT(!m_send_ping);
3,842✔
893

894
    milliseconds_type delay = 0;
3,842✔
895
    if (!m_minimize_next_ping_delay) {
3,842✔
896
        delay = m_client.m_ping_keepalive_period;
3,712✔
897
        // Make a randomized deduction of up to 10%, or up to 100% if this is
898
        // the first PING message to be sent since the connection was
899
        // established. The purpose of this randomized deduction is to reduce
900
        // the risk of many connections sending PING messages simultaneously to
901
        // the server.
902
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,712✔
903
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,712✔
904
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,712✔
905
        delay -= randomized_deduction;
3,712✔
906
        // Deduct the time spent waiting for PONG
907
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,712✔
908
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,712✔
909
        if (spent_time < delay) {
3,712✔
910
            delay -= spent_time;
3,704✔
911
        }
3,704✔
912
        else {
8✔
913
            delay = 0;
8✔
914
        }
8✔
915
    }
3,712✔
916
    else {
130✔
917
        m_minimize_next_ping_delay = false;
130✔
918
    }
130✔
919

920

921
    m_ping_delay_in_progress = true;
3,842✔
922

923
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,842✔
924
        if (status == ErrorCodes::OperationAborted)
3,840✔
925
            return;
3,672✔
926
        else if (!status.is_ok())
168✔
927
            throw Exception(status);
×
928

929
        handle_ping_delay();                                    // Throws
168✔
930
    });                                                         // Throws
168✔
931
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,842✔
932
}
3,842✔
933

934

935
void Connection::handle_ping_delay()
936
{
168✔
937
    REALM_ASSERT(m_ping_delay_in_progress);
168✔
938
    m_ping_delay_in_progress = false;
168✔
939
    m_send_ping = true;
168✔
940

941
    initiate_pong_timeout(); // Throws
168✔
942

943
    if (m_state == ConnectionState::connected && !m_sending)
168✔
944
        send_next_message(); // Throws
110✔
945
}
168✔
946

947

948
void Connection::initiate_pong_timeout()
949
{
168✔
950
    REALM_ASSERT(!m_ping_delay_in_progress);
168✔
951
    REALM_ASSERT(!m_waiting_for_pong);
168✔
952
    REALM_ASSERT(m_send_ping);
168✔
953

954
    m_waiting_for_pong = true;
168✔
955
    m_pong_wait_started_at = monotonic_clock_now();
168✔
956

957
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
168✔
958
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
168✔
959
        if (status == ErrorCodes::OperationAborted)
168✔
960
            return;
156✔
961
        else if (!status.is_ok())
12✔
962
            throw Exception(status);
×
963

964
        handle_pong_timeout(); // Throws
12✔
965
    });                        // Throws
12✔
966
}
168✔
967

968

969
void Connection::handle_pong_timeout()
970
{
12✔
971
    REALM_ASSERT(m_waiting_for_pong);
12✔
972
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
973
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
974
                                 ConnectionTerminationReason::pong_timeout);
12✔
975
}
12✔
976

977

978
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
979
{
99,306✔
980
    // Stop sending messages if an websocket error was received.
981
    if (m_websocket_error_received)
99,306✔
982
        return;
×
983

984
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
99,306✔
985
        if (sentinel->destroyed) {
99,252✔
986
            return;
1,424✔
987
        }
1,424✔
988
        if (!status.is_ok()) {
97,828✔
989
            if (status != ErrorCodes::Error::OperationAborted) {
×
990
                // Write errors will be handled by the websocket_write_error_handler() callback
991
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
992
            }
×
993
            return;
×
994
        }
×
995
        handle_write_message(); // Throws
97,828✔
996
    });                         // Throws
97,828✔
997
    m_sending_session = sess;
99,306✔
998
    m_sending = true;
99,306✔
999
}
99,306✔
1000

1001

1002
void Connection::handle_write_message()
1003
{
97,826✔
1004
    m_sending_session->message_sent(); // Throws
97,826✔
1005
    if (m_sending_session->m_state == Session::Deactivated) {
97,826✔
1006
        finish_session_deactivation(m_sending_session);
128✔
1007
    }
128✔
1008
    m_sending_session = nullptr;
97,826✔
1009
    m_sending = false;
97,826✔
1010
    send_next_message(); // Throws
97,826✔
1011
}
97,826✔
1012

1013

1014
void Connection::send_next_message()
1015
{
177,490✔
1016
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
177,490✔
1017
    REALM_ASSERT(!m_sending_session);
177,490✔
1018
    REALM_ASSERT(!m_sending);
177,490✔
1019
    if (m_send_ping) {
177,490✔
1020
        send_ping(); // Throws
156✔
1021
        return;
156✔
1022
    }
156✔
1023
    while (!m_sessions_enlisted_to_send.empty()) {
257,680✔
1024
        // The state of being connected is not supposed to be able to change
1025
        // across this loop thanks to the "no callback reentrance" guarantee
1026
        // provided by Websocket::async_write_text(), and friends.
1027
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
180,064✔
1028

1029
        Session& sess = *m_sessions_enlisted_to_send.front();
180,064✔
1030
        m_sessions_enlisted_to_send.pop_front();
180,064✔
1031
        sess.send_message(); // Throws
180,064✔
1032

1033
        if (sess.m_state == Session::Deactivated) {
180,064✔
1034
            finish_session_deactivation(&sess);
2,862✔
1035
        }
2,862✔
1036

1037
        // An enlisted session may choose to not send a message. In that case,
1038
        // we should pass the opportunity to the next enlisted session.
1039
        if (m_sending)
180,064✔
1040
            break;
99,718✔
1041
    }
180,064✔
1042
}
177,334✔
1043

1044

1045
void Connection::send_ping()
1046
{
156✔
1047
    REALM_ASSERT(!m_ping_delay_in_progress);
156✔
1048
    REALM_ASSERT(m_waiting_for_pong);
156✔
1049
    REALM_ASSERT(m_send_ping);
156✔
1050

1051
    m_send_ping = false;
156✔
1052
    if (m_reconnect_info.scheduled_reset)
156✔
1053
        m_ping_after_scheduled_reset_of_reconnect_info = true;
126✔
1054

1055
    m_last_ping_sent_at = monotonic_clock_now();
156✔
1056
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
156✔
1057
                 m_previous_ping_rtt); // Throws
156✔
1058

1059
    ClientProtocol& protocol = get_client_protocol();
156✔
1060
    OutputBuffer& out = get_output_buffer();
156✔
1061
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
156✔
1062
    initiate_write_ping(out);                                          // Throws
156✔
1063
    m_ping_sent = true;
156✔
1064
}
156✔
1065

1066

1067
void Connection::initiate_write_ping(const OutputBuffer& out)
1068
{
156✔
1069
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
156✔
1070
        if (sentinel->destroyed) {
156✔
1071
            return;
2✔
1072
        }
2✔
1073
        if (!status.is_ok()) {
154✔
1074
            if (status != ErrorCodes::Error::OperationAborted) {
×
1075
                // Write errors will be handled by the websocket_write_error_handler() callback
1076
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1077
            }
×
1078
            return;
×
1079
        }
×
1080
        handle_write_ping(); // Throws
154✔
1081
    });                      // Throws
154✔
1082
    m_sending = true;
156✔
1083
}
156✔
1084

1085

1086
void Connection::handle_write_ping()
1087
{
154✔
1088
    REALM_ASSERT(m_sending);
154✔
1089
    REALM_ASSERT(!m_sending_session);
154✔
1090
    m_sending = false;
154✔
1091
    send_next_message(); // Throws
154✔
1092
}
154✔
1093

1094

1095
void Connection::handle_message_received(util::Span<const char> data)
1096
{
79,956✔
1097
    // parse_message_received() parses the message and calls the proper handler
1098
    // on the Connection object (this).
1099
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
79,956✔
1100
}
79,956✔
1101

1102

1103
void Connection::initiate_disconnect_wait()
1104
{
4,776✔
1105
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,776✔
1106

1107
    if (m_disconnect_delay_in_progress) {
4,776✔
1108
        m_reconnect_disconnect_timer.reset();
2,216✔
1109
        m_disconnect_delay_in_progress = false;
2,216✔
1110
    }
2,216✔
1111

1112
    milliseconds_type time = m_client.m_connection_linger_time;
4,776✔
1113

1114
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,778✔
1115
        // If the operation is aborted, the connection object may have been
1116
        // destroyed.
1117
        if (status != ErrorCodes::OperationAborted)
4,778✔
1118
            handle_disconnect_wait(status); // Throws
12✔
1119
    });                                     // Throws
4,778✔
1120
    m_disconnect_delay_in_progress = true;
4,776✔
1121
}
4,776✔
1122

1123

1124
void Connection::handle_disconnect_wait(Status status)
1125
{
12✔
1126
    if (!status.is_ok()) {
12✔
1127
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1128
        throw Exception(status);
×
1129
    }
×
1130

1131
    m_disconnect_delay_in_progress = false;
12✔
1132

1133
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1134
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1135
        if (m_client.m_connection_linger_time > 0)
12✔
1136
            logger.detail("Linger time expired"); // Throws
×
1137
        voluntary_disconnect();                   // Throws
12✔
1138
        logger.info("Disconnected");              // Throws
12✔
1139
    }
12✔
1140
}
12✔
1141

1142

1143
void Connection::close_due_to_protocol_error(Status status)
1144
{
16✔
1145
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1146
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1147
    involuntary_disconnect(std::move(error_info),
16✔
1148
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1149
}
16✔
1150

1151

1152
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1153
{
464✔
1154
    logger.info("Connection closed due to error: %1", status); // Throws
464✔
1155

1156
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
464✔
1157
}
464✔
1158

1159

1160
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1161
{
556✔
1162
    logger.info("Connection closed due to transient error: %1", status); // Throws
556✔
1163
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
556✔
1164
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
556✔
1165

1166
    involuntary_disconnect(std::move(error_info), reason); // Throw
556✔
1167
}
556✔
1168

1169

1170
// Close connection due to error discovered on the server-side, and then
1171
// reported to the client by way of a connection-level ERROR message.
1172
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1173
{
66✔
1174
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1175
                int(error_code)); // Throws
66✔
1176

1177
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
66✔
1178
                                      : ConnectionTerminationReason::server_said_try_again_later;
66✔
1179
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1180
                           reason); // Throws
66✔
1181
}
66✔
1182

1183

1184
void Connection::disconnect(const SessionErrorInfo& info)
1185
{
3,786✔
1186
    // Cancel connect timeout watchdog
1187
    m_connect_timer.reset();
3,786✔
1188

1189
    if (m_state == ConnectionState::connected) {
3,786✔
1190
        m_disconnect_time = monotonic_clock_now();
3,584✔
1191
        m_disconnect_has_occurred = true;
3,584✔
1192

1193
        // Sessions that are in the Deactivating state at this time can be
1194
        // immediately discarded, in part because they are no longer enlisted to
1195
        // send. Such sessions will be taken to the Deactivated state by
1196
        // Session::connection_lost(), and then they will be removed from
1197
        // `m_sessions`.
1198
        auto i = m_sessions.begin(), end = m_sessions.end();
3,584✔
1199
        while (i != end) {
7,968✔
1200
            // Prevent invalidation of the main iterator when erasing elements
1201
            auto j = i++;
4,384✔
1202
            Session& sess = *j->second;
4,384✔
1203
            sess.connection_lost(); // Throws
4,384✔
1204
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,386✔
1205
                m_sessions.erase(j);
2,280✔
1206
        }
4,384✔
1207
    }
3,584✔
1208

1209
    change_state_to_disconnected();
3,786✔
1210

1211
    m_ping_delay_in_progress = false;
3,786✔
1212
    m_waiting_for_pong = false;
3,786✔
1213
    m_send_ping = false;
3,786✔
1214
    m_minimize_next_ping_delay = false;
3,786✔
1215
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,786✔
1216
    m_ping_sent = false;
3,786✔
1217
    m_heartbeat_timer.reset();
3,786✔
1218
    m_previous_ping_rtt = 0;
3,786✔
1219

1220
    m_websocket_sentinel->destroyed = true;
3,786✔
1221
    m_websocket_sentinel.reset();
3,786✔
1222
    m_websocket.reset();
3,786✔
1223
    m_input_body_buffer.reset();
3,786✔
1224
    m_sending_session = nullptr;
3,786✔
1225
    m_sessions_enlisted_to_send.clear();
3,786✔
1226
    m_sending = false;
3,786✔
1227

1228
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,786✔
1229
    initiate_reconnect_wait();                                           // Throws
3,786✔
1230
}
3,786✔
1231

1232
bool Connection::is_flx_sync_connection() const noexcept
1233
{
113,610✔
1234
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
113,610✔
1235
}
113,610✔
1236

1237
void Connection::receive_pong(milliseconds_type timestamp)
1238
{
146✔
1239
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
146✔
1240

1241
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
146✔
1242
    if (REALM_UNLIKELY(!legal_at_this_time)) {
146✔
1243
        close_due_to_protocol_error(
×
1244
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1245
        return;
×
1246
    }
×
1247

1248
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
146✔
1249
        close_due_to_protocol_error(
×
1250
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1251
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1252
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1253
        return;
×
1254
    }
×
1255

1256
    milliseconds_type now = monotonic_clock_now();
146✔
1257
    milliseconds_type round_trip_time = now - timestamp;
146✔
1258
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
146✔
1259
    m_previous_ping_rtt = round_trip_time;
146✔
1260

1261
    // If this PONG message is a response to a PING mesage that was sent after
1262
    // the last invocation of cancel_reconnect_delay(), then the connection is
1263
    // still good, and we do not have to skip the next reconnect delay.
1264
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
146✔
1265
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
108✔
1266
        m_ping_after_scheduled_reset_of_reconnect_info = false;
108✔
1267
        m_reconnect_info.scheduled_reset = false;
108✔
1268
    }
108✔
1269

1270
    m_heartbeat_timer.reset();
146✔
1271
    m_waiting_for_pong = false;
146✔
1272

1273
    initiate_ping_delay(now); // Throws
146✔
1274

1275
    if (m_client.m_roundtrip_time_handler)
146✔
1276
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1277
}
146✔
1278

1279
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1280
{
73,912✔
1281
    if (session_ident == 0) {
73,912✔
1282
        return nullptr;
×
1283
    }
×
1284

1285
    auto* sess = get_session(session_ident);
73,912✔
1286
    if (REALM_LIKELY(sess)) {
73,914✔
1287
        return sess;
73,914✔
1288
    }
73,914✔
1289
    // Check the history to see if the message received was for a previous session
1290
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,647✔
1291
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1292
        close_due_to_protocol_error(
×
1293
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1294
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1295
                          session_ident)});
×
1296
    }
×
1297
    else {
2,147,483,647✔
1298
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1299
                     session_ident); // Throws
2,147,483,647✔
1300
    }
2,147,483,647✔
1301
    return nullptr;
2,147,483,647✔
1302
}
73,912✔
1303

1304
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1305
{
942✔
1306
    Session* sess = nullptr;
942✔
1307
    if (session_ident != 0) {
942✔
1308
        sess = find_and_validate_session(session_ident, "ERROR");
872✔
1309
        if (REALM_UNLIKELY(!sess)) {
872✔
1310
            return;
×
1311
        }
×
1312
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
872✔
1313
            close_due_to_protocol_error(std::move(status)); // Throws
×
1314
            return;
×
1315
        }
×
1316

1317
        if (sess->m_state == Session::Deactivated) {
872✔
1318
            finish_session_deactivation(sess);
10✔
1319
        }
10✔
1320
        return;
872✔
1321
    }
872✔
1322

1323
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1324
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1325
                info.server_requests_action); // Throws
70✔
1326

1327
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1328
    if (REALM_LIKELY(known_error_code)) {
70✔
1329
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1330
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1331
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1332
            return;
66✔
1333
        }
66✔
1334
        close_due_to_protocol_error(
×
1335
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1336
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1337
                          info.raw_error_code)});
×
1338
    }
×
1339
    else {
4✔
1340
        close_due_to_protocol_error(
4✔
1341
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1342
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1343
    }
4✔
1344
}
70✔
1345

1346

1347
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1348
                                             session_ident_type session_ident)
1349
{
20✔
1350
    if (session_ident == 0) {
20✔
1351
        return close_due_to_protocol_error(
×
1352
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1353
    }
×
1354

1355
    if (!is_flx_sync_connection()) {
20✔
1356
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1357
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1358
    }
×
1359

1360
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1361
    if (REALM_UNLIKELY(!sess)) {
20✔
1362
        return;
×
1363
    }
×
1364

1365
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1366
        close_due_to_protocol_error(std::move(status));
×
1367
    }
×
1368
}
20✔
1369

1370

1371
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1372
{
3,604✔
1373
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,604✔
1374
    if (REALM_UNLIKELY(!sess)) {
3,604✔
1375
        return;
×
1376
    }
×
1377

1378
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,604✔
1379
        close_due_to_protocol_error(std::move(status)); // Throws
×
1380
}
3,604✔
1381

1382
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1383
{
48,170✔
1384
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
48,170✔
1385
    if (REALM_UNLIKELY(!sess)) {
48,170✔
1386
        return;
×
1387
    }
×
1388

1389
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
48,170✔
1390
        close_due_to_protocol_error(std::move(status));
2✔
1391
    }
2✔
1392
}
48,170✔
1393

1394
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1395
{
17,016✔
1396
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,016✔
1397
    if (REALM_UNLIKELY(!sess)) {
17,016✔
1398
        return;
×
1399
    }
×
1400

1401
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,016✔
1402
        close_due_to_protocol_error(std::move(status)); // Throws
10✔
1403
}
17,016✔
1404

1405

1406
void Connection::receive_unbound_message(session_ident_type session_ident)
1407
{
4,168✔
1408
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,168✔
1409
    if (REALM_UNLIKELY(!sess)) {
4,168✔
1410
        return;
×
1411
    }
×
1412

1413
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,168✔
1414
        close_due_to_protocol_error(std::move(status)); // Throws
×
1415
        return;
×
1416
    }
×
1417

1418
    if (sess->m_state == Session::Deactivated) {
4,168✔
1419
        finish_session_deactivation(sess);
4,168✔
1420
    }
4,168✔
1421
}
4,168✔
1422

1423

1424
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1425
                                               std::string_view body)
1426
{
60✔
1427
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
60✔
1428
    if (REALM_UNLIKELY(!sess)) {
60✔
1429
        return;
×
1430
    }
×
1431

1432
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
60✔
1433
        close_due_to_protocol_error(std::move(status));
×
1434
    }
×
1435
}
60✔
1436

1437

1438
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1439
                                            std::string_view message)
1440
{
5,826✔
1441
    std::string prefix;
5,826✔
1442
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,826✔
1443
        prefix = util::format("Server[%1]", m_appservices_coid);
5,826✔
1444
    }
5,826✔
1445
    else {
×
1446
        prefix = "Server";
×
1447
    }
×
1448

1449
    if (session_ident != 0) {
5,826✔
1450
        if (auto sess = get_session(session_ident)) {
3,852✔
1451
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,818✔
1452
            return;
3,818✔
1453
        }
3,818✔
1454

1455
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
34✔
1456
                   message);
34✔
1457
        return;
34✔
1458
    }
3,852✔
1459

1460
    logger.log(level, "%1 log: %2", prefix, message);
1,974✔
1461
}
1,974✔
1462

1463

1464
void Connection::receive_appservices_request_id(std::string_view coid)
1465
{
5,558✔
1466
    // Only set once per connection
1467
    if (!coid.empty() && m_appservices_coid.empty()) {
5,558✔
1468
        m_appservices_coid = coid;
2,560✔
1469
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,560✔
1470
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,560✔
1471
    }
2,560✔
1472
}
5,558✔
1473

1474

1475
void Connection::handle_protocol_error(Status status)
1476
{
×
1477
    close_due_to_protocol_error(std::move(status));
×
1478
}
×
1479

1480

1481
// Sessions are guaranteed to be granted the opportunity to send a message in
1482
// the order that they enlist. Note that this is important to ensure
1483
// nonoverlapping communication with the server for consecutive sessions
1484
// associated with the same Realm file.
1485
//
1486
// CAUTION: The specified session may get destroyed before this function
1487
// returns, but only if its Session::send_message() puts it into the Deactivated
1488
// state.
1489
void Connection::enlist_to_send(Session* sess)
1490
{
181,654✔
1491
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
181,654✔
1492
    m_sessions_enlisted_to_send.push_back(sess); // Throws
181,654✔
1493
    if (!m_sending)
181,654✔
1494
        send_next_message(); // Throws
79,398✔
1495
}
181,654✔
1496

1497

1498
std::string Connection::get_active_appservices_connection_id()
1499
{
72✔
1500
    return m_appservices_coid;
72✔
1501
}
72✔
1502

1503
void Session::cancel_resumption_delay()
1504
{
4,072✔
1505
    REALM_ASSERT_EX(m_state == Active, m_state);
4,072✔
1506

1507
    if (!m_suspended)
4,072✔
1508
        return;
3,904✔
1509

1510
    m_suspended = false;
168✔
1511

1512
    logger.debug("Resumed"); // Throws
168✔
1513

1514
    if (unbind_process_complete())
168✔
1515
        initiate_rebind(); // Throws
126✔
1516

1517
    try {
168✔
1518
        process_pending_flx_bootstrap(); // throws
168✔
1519
    }
168✔
1520
    catch (const IntegrationException& error) {
168✔
1521
        on_integration_failure(error);
×
1522
    }
×
1523
    catch (...) {
168✔
1524
        on_integration_failure(IntegrationException(exception_to_status()));
×
1525
    }
×
1526

1527
    m_conn.one_more_active_unsuspended_session(); // Throws
168✔
1528
    if (m_try_again_activation_timer) {
168✔
1529
        m_try_again_activation_timer.reset();
8✔
1530
    }
8✔
1531

1532
    on_resumed(); // Throws
168✔
1533
}
168✔
1534

1535

1536
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1537
                                                 std::vector<ProtocolErrorInfo>* out)
1538
{
22,040✔
1539
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
22,040✔
1540
        return;
21,992✔
1541
    }
21,992✔
1542

1543
#ifdef REALM_DEBUG
48✔
1544
    REALM_ASSERT_DEBUG(
48✔
1545
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
48✔
1546
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
48✔
1547
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
48✔
1548
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
48✔
1549
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
48✔
1550
                       }));
48✔
1551
#endif
48✔
1552

1553
    while (!m_pending_compensating_write_errors.empty() &&
96✔
1554
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
96✔
1555
               changesets.back().version) {
48✔
1556
        auto& cur_error = m_pending_compensating_write_errors.front();
48✔
1557
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
48✔
1558
        out->push_back(std::move(cur_error));
48✔
1559
        m_pending_compensating_write_errors.pop_front();
48✔
1560
    }
48✔
1561
}
48✔
1562

1563

1564
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1565
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1566
                                   DownloadBatchState download_batch_state)
1567
{
43,678✔
1568
    auto& history = get_history();
43,678✔
1569
    if (received_changesets.empty()) {
43,678✔
1570
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,614✔
1571
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1572
                                       "received empty download message that was not the last in batch",
×
1573
                                       ProtocolError::bad_progress);
×
1574
        }
×
1575
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,614✔
1576
        return;
21,614✔
1577
    }
21,614✔
1578

1579
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
22,064✔
1580
    auto transact = get_db()->start_read();
22,064✔
1581
    history.integrate_server_changesets(
22,064✔
1582
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
22,064✔
1583
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
22,064✔
1584
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
22,042✔
1585
        }); // Throws
22,042✔
1586
    if (received_changesets.size() == 1) {
22,064✔
1587
        logger.debug("1 remote changeset integrated, producing client version %1",
15,464✔
1588
                     version_info.sync_version.version); // Throws
15,464✔
1589
    }
15,464✔
1590
    else {
6,600✔
1591
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,600✔
1592
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,600✔
1593
    }
6,600✔
1594

1595
    for (const auto& pending_error : pending_compensating_write_errors) {
22,064✔
1596
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
48✔
1597
                    pending_error.compensating_write_rejected_client_version,
48✔
1598
                    *pending_error.compensating_write_server_version, pending_error.message);
48✔
1599
        try {
48✔
1600
            on_connection_state_changed(
48✔
1601
                m_conn.get_state(),
48✔
1602
                SessionErrorInfo{pending_error,
48✔
1603
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
48✔
1604
                                                          pending_error.message)});
48✔
1605
        }
48✔
1606
        catch (...) {
48✔
1607
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1608
        }
×
1609
    }
48✔
1610
}
22,064✔
1611

1612

1613
void Session::on_integration_failure(const IntegrationException& error)
1614
{
40✔
1615
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1616
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1617
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1618

1619
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1620
    m_error_to_send = true;
40✔
1621
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1622
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1623
    // Surface the error to the user otherwise is lost.
1624
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1625

1626
    // Since the deactivation process has not been initiated, the UNBIND
1627
    // message cannot have been sent unless an ERROR message was received.
1628
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1629
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1630
        ensure_enlisted_to_send(); // Throws
36✔
1631
    }
36✔
1632
}
40✔
1633

1634
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1635
{
46,038✔
1636
    REALM_ASSERT_EX(m_state == Active, m_state);
46,038✔
1637
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
46,038✔
1638

1639
    m_download_progress = progress.download;
46,038✔
1640
    m_progress = progress;
46,038✔
1641

1642
    if (progress.upload.client_version > m_upload_progress.client_version)
46,038✔
1643
        m_upload_progress = progress.upload;
608✔
1644

1645
    do_recognize_sync_version(client_version); // Allows upload process to resume
46,038✔
1646

1647
    check_for_download_completion(); // Throws
46,038✔
1648

1649
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1650
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
46,038✔
1651
        auto& flx_subscription_store = *get_flx_subscription_store();
3,886✔
1652
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,886✔
1653
    }
3,886✔
1654

1655
    // Since the deactivation process has not been initiated, the UNBIND
1656
    // message cannot have been sent unless an ERROR message was received.
1657
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
46,038✔
1658
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
46,038✔
1659
        ensure_enlisted_to_send(); // Throws
46,030✔
1660
    }
46,030✔
1661
}
46,038✔
1662

1663

1664
Session::~Session()
1665
{
10,404✔
1666
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1667
}
10,404✔
1668

1669

1670
std::string Session::make_logger_prefix(session_ident_type ident)
1671
{
10,404✔
1672
    std::ostringstream out;
10,404✔
1673
    out.imbue(std::locale::classic());
10,404✔
1674
    out << "Session[" << ident << "]: "; // Throws
10,404✔
1675
    return out.str();                    // Throws
10,404✔
1676
}
10,404✔
1677

1678

1679
void Session::activate()
1680
{
10,404✔
1681
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,404✔
1682

1683
    logger.debug("Activating"); // Throws
10,404✔
1684

1685
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,404✔
1686
        bool file_exists = util::File::exists(get_realm_path());
10,404✔
1687

1688
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,404✔
1689
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,404✔
1690
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,404✔
1691
    }
10,404✔
1692
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,404✔
1693
                 m_client_file_ident.salt); // Throws
10,404✔
1694
    m_upload_progress = m_progress.upload;
10,404✔
1695
    m_download_progress = m_progress.download;
10,404✔
1696
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,404✔
1697
    init_progress_handler();
10,404✔
1698

1699
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,404✔
1700
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,404✔
1701
    logger.debug("progress_download_client_version = %1",
10,404✔
1702
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,404✔
1703
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,404✔
1704
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,404✔
1705

1706
    reset_protocol_state();
10,404✔
1707
    m_state = Active;
10,404✔
1708

1709
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,404✔
1710

1711
    REALM_ASSERT(!m_suspended);
10,404✔
1712
    m_conn.one_more_active_unsuspended_session(); // Throws
10,404✔
1713

1714
    try {
10,404✔
1715
        process_pending_flx_bootstrap(); // throws
10,404✔
1716
    }
10,404✔
1717
    catch (const IntegrationException& error) {
10,404✔
1718
        on_integration_failure(error);
×
1719
    }
×
1720
    catch (...) {
10,404✔
1721
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1722
    }
4✔
1723

1724
    // Checks if there is a pending client reset
1725
    handle_pending_client_reset_acknowledgement();
10,404✔
1726
}
10,404✔
1727

1728

1729
// The caller (Connection) must discard the session if the session has become
1730
// deactivated upon return.
1731
void Session::initiate_deactivation()
1732
{
10,404✔
1733
    REALM_ASSERT_EX(m_state == Active, m_state);
10,404✔
1734

1735
    logger.debug("Initiating deactivation"); // Throws
10,404✔
1736

1737
    m_state = Deactivating;
10,404✔
1738

1739
    if (!m_suspended)
10,404✔
1740
        m_conn.one_less_active_unsuspended_session(); // Throws
9,744✔
1741

1742
    if (m_enlisted_to_send) {
10,404✔
1743
        REALM_ASSERT(!unbind_process_complete());
5,352✔
1744
        return;
5,352✔
1745
    }
5,352✔
1746

1747
    // Deactivate immediately if the BIND message has not yet been sent and the
1748
    // session is not enlisted to send, or if the unbinding process has already
1749
    // completed.
1750
    if (!m_bind_message_sent || unbind_process_complete()) {
5,052✔
1751
        complete_deactivation(); // Throws
956✔
1752
        // Life cycle state is now Deactivated
1753
        return;
956✔
1754
    }
956✔
1755

1756
    // Ready to send the UNBIND message, if it has not already been sent
1757
    if (!m_unbind_message_sent) {
4,096✔
1758
        enlist_to_send(); // Throws
3,884✔
1759
        return;
3,884✔
1760
    }
3,884✔
1761
}
4,096✔
1762

1763

1764
void Session::complete_deactivation()
1765
{
10,404✔
1766
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,404✔
1767
    m_state = Deactivated;
10,404✔
1768

1769
    logger.debug("Deactivation completed"); // Throws
10,404✔
1770
}
10,404✔
1771

1772

1773
// Called by the associated Connection object when this session is granted an
1774
// opportunity to send a message.
1775
//
1776
// The caller (Connection) must discard the session if the session has become
1777
// deactivated upon return.
1778
void Session::send_message()
1779
{
180,066✔
1780
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
180,066✔
1781
    REALM_ASSERT(m_enlisted_to_send);
180,066✔
1782
    m_enlisted_to_send = false;
180,066✔
1783
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
180,066✔
1784
        // Deactivation has been initiated. If the UNBIND message has not been
1785
        // sent yet, there is no point in sending it. Instead, we can let the
1786
        // deactivation process complete.
1787
        if (!m_bind_message_sent) {
9,590✔
1788
            return complete_deactivation(); // Throws
2,862✔
1789
            // Life cycle state is now Deactivated
1790
        }
2,862✔
1791

1792
        // Session life cycle state is Deactivating or the unbinding process has
1793
        // been initiated by a session specific ERROR message
1794
        if (!m_unbind_message_sent)
6,728✔
1795
            send_unbind_message(); // Throws
6,730✔
1796
        return;
6,728✔
1797
    }
9,590✔
1798

1799
    // Session life cycle state is Active and the unbinding process has
1800
    // not been initiated
1801
    REALM_ASSERT(!m_unbind_message_sent);
170,476✔
1802

1803
    if (!m_bind_message_sent)
170,476✔
1804
        return send_bind_message(); // Throws
9,110✔
1805

1806
    // Pending test commands can be sent any time after the BIND message is sent
1807
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
161,366✔
1808
                                                      [](const PendingTestCommand& command) {
161,366✔
1809
                                                          return command.pending;
148✔
1810
                                                      });
148✔
1811
    if (has_pending_test_command) {
161,366✔
1812
        return send_test_command_message();
60✔
1813
    }
60✔
1814

1815
    if (!m_ident_message_sent) {
161,306✔
1816
        if (have_client_file_ident())
7,714✔
1817
            send_ident_message(); // Throws
7,714✔
1818
        return;
7,714✔
1819
    }
7,714✔
1820

1821
    if (m_error_to_send)
153,592✔
1822
        return send_json_error_message(); // Throws
28✔
1823

1824
    // Stop sending upload, mark and query messages when the client detects an error.
1825
    if (m_client_error) {
153,564✔
1826
        return;
12✔
1827
    }
12✔
1828

1829
    if (m_target_download_mark > m_last_download_mark_sent)
153,552✔
1830
        return send_mark_message(); // Throws
17,848✔
1831

1832
    auto is_upload_allowed = [&]() -> bool {
135,716✔
1833
        if (!m_is_flx_sync_session) {
135,716✔
1834
            return true;
112,604✔
1835
        }
112,604✔
1836

1837
        auto migration_store = get_migration_store();
23,112✔
1838
        if (!migration_store) {
23,112✔
1839
            return true;
×
1840
        }
×
1841

1842
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
23,112✔
1843
        if (!sentinel_query_version) {
23,112✔
1844
            return true;
23,084✔
1845
        }
23,084✔
1846

1847
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1848
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1849
    };
23,112✔
1850

1851
    if (!is_upload_allowed()) {
135,704✔
1852
        return;
16✔
1853
    }
16✔
1854

1855
    auto check_pending_flx_version = [&]() -> bool {
135,700✔
1856
        if (!m_is_flx_sync_session) {
135,700✔
1857
            return false;
112,604✔
1858
        }
112,604✔
1859

1860
        if (m_delay_uploads) {
23,096✔
1861
            return false;
3,780✔
1862
        }
3,780✔
1863

1864
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
19,316✔
1865

1866
        if (!m_pending_flx_sub_set) {
19,316✔
1867
            return false;
16,840✔
1868
        }
16,840✔
1869

1870
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1871
        // of a pending subscription
1872
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,476✔
1873
    };
19,316✔
1874

1875
    if (check_pending_flx_version()) {
135,688✔
1876
        return send_query_change_message(); // throws
1,372✔
1877
    }
1,372✔
1878

1879
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
134,316✔
1880
        return send_upload_message(); // Throws
63,612✔
1881
    }
63,612✔
1882
}
134,316✔
1883

1884

1885
void Session::send_bind_message()
1886
{
9,110✔
1887
    REALM_ASSERT_EX(m_state == Active, m_state);
9,110✔
1888

1889
    session_ident_type session_ident = m_ident;
9,110✔
1890
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1891
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1892
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1893
    // to the server.
1894
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
9,110✔
1895
    const bool is_subserver = false;
9,110✔
1896

1897
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,110✔
1898
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,110✔
1899
    OutputBuffer& out = m_conn.get_output_buffer();
9,110✔
1900
    // Discard the token since it's ignored by the server.
1901
    std::string empty_access_token;
9,110✔
1902
    if (m_is_flx_sync_session) {
9,110✔
1903
        nlohmann::json bind_json_data;
1,878✔
1904
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,878✔
1905
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1906
        }
60✔
1907
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,878✔
1908
        auto schema_version = get_schema_version();
1,878✔
1909
        // Send 0 if schema is not versioned.
1910
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,878✔
1911
        if (logger.would_log(util::Logger::Level::debug)) {
1,878✔
1912
            std::string json_data_dump;
1,878✔
1913
            if (!bind_json_data.empty()) {
1,878✔
1914
                json_data_dump = bind_json_data.dump();
1,878✔
1915
            }
1,878✔
1916
            logger.debug(
1,878✔
1917
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,878✔
1918
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,878✔
1919
        }
1,878✔
1920
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,878✔
1921
                                       need_client_file_ident, is_subserver); // Throws
1,878✔
1922
    }
1,878✔
1923
    else {
7,232✔
1924
        std::string server_path = get_virt_path();
7,232✔
1925
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,232✔
1926
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,232✔
1927
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,232✔
1928
                                       need_client_file_ident, is_subserver); // Throws
7,232✔
1929
    }
7,232✔
1930
    m_conn.initiate_write_message(out, this); // Throws
9,110✔
1931

1932
    m_bind_message_sent = true;
9,110✔
1933
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
9,110✔
1934

1935
    // If there is a pending client reset diff, process that when the BIND message has
1936
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1937
    // ready to send the IDENT message if the file identifier pair is already available.
1938
    if (!need_client_file_ident)
9,110✔
1939
        enlist_to_send(); // Throws
5,306✔
1940
}
9,110✔
1941

1942

1943
void Session::send_ident_message()
1944
{
7,714✔
1945
    REALM_ASSERT_EX(m_state == Active, m_state);
7,714✔
1946
    REALM_ASSERT(m_bind_message_sent);
7,714✔
1947
    REALM_ASSERT(!m_unbind_message_sent);
7,714✔
1948
    REALM_ASSERT(have_client_file_ident());
7,714✔
1949

1950
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,714✔
1951
    OutputBuffer& out = m_conn.get_output_buffer();
7,714✔
1952
    session_ident_type session_ident = m_ident;
7,714✔
1953

1954
    if (m_is_flx_sync_session) {
7,714✔
1955
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,786✔
1956
        const auto active_query_body = active_query_set.to_ext_json();
1,786✔
1957
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,786✔
1958
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,786✔
1959
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,786✔
1960
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,786✔
1961
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,786✔
1962
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,786✔
1963
                     active_query_body); // Throws
1,786✔
1964
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,786✔
1965
                                        active_query_set.version(), active_query_body); // Throws
1,786✔
1966
        m_last_sent_flx_query_version = active_query_set.version();
1,786✔
1967
    }
1,786✔
1968
    else {
5,928✔
1969
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,928✔
1970
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,928✔
1971
                     "latest_server_version_salt=%6)",
5,928✔
1972
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,928✔
1973
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,928✔
1974
                     m_progress.latest_server_version.salt);                                  // Throws
5,928✔
1975
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,928✔
1976
    }
5,928✔
1977
    m_conn.initiate_write_message(out, this); // Throws
7,714✔
1978

1979
    m_ident_message_sent = true;
7,714✔
1980
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,714✔
1981

1982
    // Other messages may be waiting to be sent
1983
    enlist_to_send(); // Throws
7,714✔
1984
}
7,714✔
1985

1986
void Session::send_query_change_message()
1987
{
1,372✔
1988
    REALM_ASSERT_EX(m_state == Active, m_state);
1,372✔
1989
    REALM_ASSERT(m_ident_message_sent);
1,372✔
1990
    REALM_ASSERT(!m_unbind_message_sent);
1,372✔
1991
    REALM_ASSERT(m_pending_flx_sub_set);
1,372✔
1992
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,372✔
1993

1994
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,372✔
1995
        return;
×
1996
    }
×
1997

1998
    auto sub_store = get_flx_subscription_store();
1,372✔
1999
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,372✔
2000
    auto latest_queries = latest_sub_set.to_ext_json();
1,372✔
2001
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,372✔
2002
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,372✔
2003

2004
    OutputBuffer& out = m_conn.get_output_buffer();
1,372✔
2005
    session_ident_type session_ident = get_ident();
1,372✔
2006
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,372✔
2007
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,372✔
2008
    m_conn.initiate_write_message(out, this);
1,372✔
2009

2010
    m_last_sent_flx_query_version = latest_sub_set.version();
1,372✔
2011

2012
    request_download_completion_notification();
1,372✔
2013
}
1,372✔
2014

2015
void Session::send_upload_message()
2016
{
63,614✔
2017
    REALM_ASSERT_EX(m_state == Active, m_state);
63,614✔
2018
    REALM_ASSERT(m_ident_message_sent);
63,614✔
2019
    REALM_ASSERT(!m_unbind_message_sent);
63,614✔
2020

2021
    if (REALM_UNLIKELY(get_client().is_dry_run()))
63,614✔
2022
        return;
×
2023

2024
    version_type target_upload_version = m_last_version_available;
63,614✔
2025
    if (m_pending_flx_sub_set) {
63,614✔
2026
        REALM_ASSERT(m_is_flx_sync_session);
1,104✔
2027
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,104✔
2028
    }
1,104✔
2029

2030
    bool server_version_to_ack =
63,614✔
2031
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
63,614✔
2032

2033
    std::vector<UploadChangeset> uploadable_changesets;
63,614✔
2034
    version_type locked_server_version = 0;
63,614✔
2035
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
63,614✔
2036
                                             locked_server_version); // Throws
63,614✔
2037

2038
    if (uploadable_changesets.empty()) {
63,614✔
2039
        // Nothing more to upload right now if:
2040
        //  1. We need to limit upload up to some version other than the last client version
2041
        //     available and there are no changes to upload
2042
        //  2. There are no changes to upload and no server version(s) to acknowledge
2043
        if (m_pending_flx_sub_set || !server_version_to_ack) {
33,166✔
2044
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
6,600✔
2045
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
6,600✔
2046
            // Other messages may be waiting to be sent
2047
            return enlist_to_send(); // Throws
6,600✔
2048
        }
6,600✔
2049
    }
33,166✔
2050

2051
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
57,014✔
2052
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
744✔
2053
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
744✔
2054
    }
744✔
2055

2056
    version_type progress_client_version = m_upload_progress.client_version;
57,014✔
2057
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
57,014✔
2058

2059
    if (!upload_messages_allowed()) {
57,014✔
2060
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
572✔
2061
                     "locked_server_version=%3, num_changesets=%4)",
572✔
2062
                     progress_client_version, progress_server_version, locked_server_version,
572✔
2063
                     uploadable_changesets.size()); // Throws
572✔
2064
        // Other messages may be waiting to be sent
2065
        return enlist_to_send(); // Throws
572✔
2066
    }
572✔
2067

2068
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
56,442✔
2069
                 "locked_server_version=%3, num_changesets=%4)",
56,442✔
2070
                 progress_client_version, progress_server_version, locked_server_version,
56,442✔
2071
                 uploadable_changesets.size()); // Throws
56,442✔
2072

2073
    ClientProtocol& protocol = m_conn.get_client_protocol();
56,442✔
2074
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
56,442✔
2075

2076
    for (const UploadChangeset& uc : uploadable_changesets) {
56,442✔
2077
        logger.debug(util::LogCategory::changeset,
43,054✔
2078
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,054✔
2079
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,054✔
2080
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,054✔
2081
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,054✔
2082
        if (logger.would_log(util::Logger::Level::trace)) {
43,054✔
2083
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2084
            if (changeset_data.size() < 1024) {
×
2085
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2086
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2087
            }
×
2088
            else {
×
2089
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2090
                             protocol.compressed_hex_dump(changeset_data));
×
2091
            }
×
2092

2093
#if REALM_DEBUG
×
2094
            ChunkedBinaryInputStream in{changeset_data};
×
2095
            Changeset log;
×
2096
            try {
×
2097
                parse_changeset(in, log);
×
2098
                std::stringstream ss;
×
2099
                log.print(ss);
×
2100
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2101
            }
×
2102
            catch (const BadChangesetError& err) {
×
2103
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2104
            }
×
2105
#endif
×
2106
        }
×
2107

2108
        {
43,054✔
2109
            upload_message_builder.add_changeset(uc.progress.client_version,
43,054✔
2110
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,054✔
2111
                                                 uc.origin_file_ident,
43,054✔
2112
                                                 uc.changeset); // Throws
43,054✔
2113
        }
43,054✔
2114
    }
43,054✔
2115

2116
    int protocol_version = m_conn.get_negotiated_protocol_version();
56,442✔
2117
    OutputBuffer& out = m_conn.get_output_buffer();
56,442✔
2118
    session_ident_type session_ident = get_ident();
56,442✔
2119
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
56,442✔
2120
                                               progress_server_version,
56,442✔
2121
                                               locked_server_version); // Throws
56,442✔
2122
    m_conn.initiate_write_message(out, this);                          // Throws
56,442✔
2123

2124
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
56,442✔
2125

2126
    // Other messages may be waiting to be sent
2127
    enlist_to_send(); // Throws
56,442✔
2128
}
56,442✔
2129

2130

2131
void Session::send_mark_message()
2132
{
17,848✔
2133
    REALM_ASSERT_EX(m_state == Active, m_state);
17,848✔
2134
    REALM_ASSERT(m_ident_message_sent);
17,848✔
2135
    REALM_ASSERT(!m_unbind_message_sent);
17,848✔
2136
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,848✔
2137

2138
    request_ident_type request_ident = m_target_download_mark;
17,848✔
2139
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,848✔
2140

2141
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,848✔
2142
    OutputBuffer& out = m_conn.get_output_buffer();
17,848✔
2143
    session_ident_type session_ident = get_ident();
17,848✔
2144
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,848✔
2145
    m_conn.initiate_write_message(out, this);                      // Throws
17,848✔
2146

2147
    m_last_download_mark_sent = request_ident;
17,848✔
2148

2149
    // Other messages may be waiting to be sent
2150
    enlist_to_send(); // Throws
17,848✔
2151
}
17,848✔
2152

2153

2154
void Session::send_unbind_message()
2155
{
6,730✔
2156
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,730✔
2157
    REALM_ASSERT(m_bind_message_sent);
6,730✔
2158
    REALM_ASSERT(!m_unbind_message_sent);
6,730✔
2159

2160
    logger.debug("Sending: UNBIND"); // Throws
6,730✔
2161

2162
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,730✔
2163
    OutputBuffer& out = m_conn.get_output_buffer();
6,730✔
2164
    session_ident_type session_ident = get_ident();
6,730✔
2165
    protocol.make_unbind_message(out, session_ident); // Throws
6,730✔
2166
    m_conn.initiate_write_message(out, this);         // Throws
6,730✔
2167

2168
    m_unbind_message_sent = true;
6,730✔
2169
}
6,730✔
2170

2171

2172
void Session::send_json_error_message()
2173
{
28✔
2174
    REALM_ASSERT_EX(m_state == Active, m_state);
28✔
2175
    REALM_ASSERT(m_ident_message_sent);
28✔
2176
    REALM_ASSERT(!m_unbind_message_sent);
28✔
2177
    REALM_ASSERT(m_error_to_send);
28✔
2178
    REALM_ASSERT(m_client_error);
28✔
2179

2180
    ClientProtocol& protocol = m_conn.get_client_protocol();
28✔
2181
    OutputBuffer& out = m_conn.get_output_buffer();
28✔
2182
    session_ident_type session_ident = get_ident();
28✔
2183
    auto protocol_error = m_client_error->error_for_server;
28✔
2184

2185
    auto message = util::format("%1", m_client_error->to_status());
28✔
2186
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
28✔
2187
                session_ident); // Throws
28✔
2188

2189
    nlohmann::json error_body_json;
28✔
2190
    error_body_json["message"] = std::move(message);
28✔
2191
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
28✔
2192
                                     error_body_json.dump()); // Throws
28✔
2193
    m_conn.initiate_write_message(out, this);                 // Throws
28✔
2194

2195
    m_error_to_send = false;
28✔
2196
    enlist_to_send(); // Throws
28✔
2197
}
28✔
2198

2199

2200
void Session::send_test_command_message()
2201
{
60✔
2202
    REALM_ASSERT_EX(m_state == Active, m_state);
60✔
2203

2204
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2205
                           [](const PendingTestCommand& command) {
64✔
2206
                               return command.pending;
64✔
2207
                           });
64✔
2208
    REALM_ASSERT(it != m_pending_test_commands.end());
60✔
2209

2210
    ClientProtocol& protocol = m_conn.get_client_protocol();
60✔
2211
    OutputBuffer& out = m_conn.get_output_buffer();
60✔
2212
    auto session_ident = get_ident();
60✔
2213

2214
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
60✔
2215
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
60✔
2216

2217
    m_conn.initiate_write_message(out, this); // Throws;
60✔
2218
    it->pending = false;
60✔
2219

2220
    enlist_to_send();
60✔
2221
}
60✔
2222

2223
bool Session::client_reset_if_needed()
2224
{
424✔
2225
    // Even if we end up not actually performing a client reset, consume the
2226
    // config to ensure that the resources it holds are released
2227
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2228
    if (!client_reset_config) {
424✔
2229
        return false;
×
2230
    }
×
2231

2232
    // Save a copy of the status and action in case an error/exception occurs
2233
    Status cr_status = client_reset_config->error;
424✔
2234
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2235

2236
    auto on_flx_version_complete = [this](int64_t version) {
424✔
2237
        this->on_flx_sync_version_complete(version);
348✔
2238
    };
348✔
2239
    try {
424✔
2240
        // The file ident from the fresh realm will be copied over to the local realm
2241
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2242
                                                            get_flx_subscription_store(), on_flx_version_complete);
424✔
2243

2244
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2245
        if (!did_reset) {
424✔
2246
            return false;
×
2247
        }
×
2248
    }
424✔
2249
    catch (const std::exception& e) {
424✔
2250
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2251
                                    cr_status, e.what());
80✔
2252
        logger.error(err_msg.c_str());
80✔
2253
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2254
        suspend(err_info);
80✔
2255
        return false;
80✔
2256
    }
80✔
2257

2258
    // The fresh Realm has been used to reset the state
2259
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2260

2261
    // Update the version, file ident and progress info after the client reset diff is done
2262
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2263
    // Print the version/progress information before performing the asserts
2264
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2265
                 m_client_file_ident.salt);                                // Throws
344✔
2266
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2267
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2268
                 m_progress.upload.client_version,
344✔
2269
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2270
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2271
                 m_progress.download.last_integrated_client_version,
344✔
2272
                 m_progress.download.server_version); // Throws
344✔
2273

2274
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2275
                    m_progress.download.last_integrated_client_version);
344✔
2276
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2277

2278
    m_upload_progress = m_progress.upload;
344✔
2279
    m_download_progress = m_progress.download;
344✔
2280
    init_progress_handler();
344✔
2281
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2282
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2283
    // For both, we want to allow uploads again without needing external changes to download first.
2284
    m_delay_uploads = false;
344✔
2285

2286
    // Checks if there is a pending client reset
2287
    handle_pending_client_reset_acknowledgement();
344✔
2288

2289
    update_subscription_version_info();
344✔
2290

2291
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2292
    if (auto migration_store = get_migration_store()) {
344✔
2293
        migration_store->complete_migration_or_rollback();
316✔
2294
    }
316✔
2295

2296
    return true;
344✔
2297
}
424✔
2298

2299
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2300
{
3,604✔
2301
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,604✔
2302
                 client_file_ident.salt); // Throws
3,604✔
2303

2304
    // Ignore the message if the deactivation process has been initiated,
2305
    // because in that case, the associated Realm and SessionWrapper must
2306
    // not be accessed any longer.
2307
    if (m_state != Active)
3,604✔
2308
        return Status::OK(); // Success
68✔
2309

2310
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,536✔
2311
                               !m_unbound_message_received);
3,536✔
2312
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,536✔
2313
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2314
    }
×
2315
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,536✔
2316
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2317
    }
×
2318
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,536✔
2319
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2320
    }
×
2321

2322
    m_client_file_ident = client_file_ident;
3,536✔
2323

2324
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,536✔
2325
        // Ready to send the IDENT message
2326
        ensure_enlisted_to_send(); // Throws
×
2327
        return Status::OK();       // Success
×
2328
    }
×
2329

2330
    get_history().set_client_file_ident(client_file_ident,
3,536✔
2331
                                        m_fix_up_object_ids); // Throws
3,536✔
2332
    m_progress.download.last_integrated_client_version = 0;
3,536✔
2333
    m_progress.upload.client_version = 0;
3,536✔
2334

2335
    // Ready to send the IDENT message
2336
    ensure_enlisted_to_send(); // Throws
3,536✔
2337
    return Status::OK();       // Success
3,536✔
2338
}
3,536✔
2339

2340
Status Session::receive_download_message(const DownloadMessage& message)
2341
{
48,176✔
2342
    // Ignore the message if the deactivation process has been initiated,
2343
    // because in that case, the associated Realm and SessionWrapper must
2344
    // not be accessed any longer.
2345
    if (m_state != Active)
48,176✔
2346
        return Status::OK();
524✔
2347

2348
    bool is_flx = m_conn.is_flx_sync_connection();
47,652✔
2349
    int64_t query_version = is_flx ? *message.query_version : 0;
47,652✔
2350

2351
    if (!is_flx || query_version > 0)
47,652✔
2352
        enable_progress_notifications();
45,714✔
2353

2354
    auto&& progress = message.progress;
47,652✔
2355
    if (is_flx) {
47,652✔
2356
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,470✔
2357
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,470✔
2358
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,470✔
2359
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,470✔
2360
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,470✔
2361
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,470✔
2362
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,470✔
2363
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,470✔
2364
                     message.changesets.size()); // Throws
5,470✔
2365
    }
5,470✔
2366
    else {
42,182✔
2367
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
42,182✔
2368
                     "latest_server_version=%3, latest_server_version_salt=%4, "
42,182✔
2369
                     "upload_client_version=%5, upload_server_version=%6, "
42,182✔
2370
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
42,182✔
2371
                     progress.download.server_version, progress.download.last_integrated_client_version,
42,182✔
2372
                     progress.latest_server_version.version, progress.latest_server_version.salt,
42,182✔
2373
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
42,182✔
2374
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
42,182✔
2375
    }
42,182✔
2376

2377
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2378
    // changeset over and over again.
2379
    if (m_client_error) {
47,652✔
2380
        logger.debug("Ignoring download message because the client detected an integration error");
×
2381
        return Status::OK();
×
2382
    }
×
2383

2384
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
47,652✔
2385
    if (REALM_UNLIKELY(!legal_at_this_time)) {
47,652✔
2386
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
2✔
2387
    }
2✔
2388
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
47,650✔
2389
        logger.error("Bad sync progress received (%1)", status);
×
2390
        return status;
×
2391
    }
×
2392

2393
    version_type server_version = m_progress.download.server_version;
47,650✔
2394
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
47,650✔
2395
    for (const RemoteChangeset& changeset : message.changesets) {
49,412✔
2396
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2397
        // version must be increasing, but can stay the same during bootstraps.
2398
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
46,530✔
2399
                                                         : (changeset.remote_version > server_version);
46,530✔
2400
        // Each server version cannot be greater than the one in the header of the download message.
2401
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
46,530✔
2402
        if (!good_server_version) {
46,530✔
2403
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2404
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2405
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2406
        }
×
2407
        server_version = changeset.remote_version;
46,530✔
2408

2409
        // Check that per-changeset last integrated client version is "weakly"
2410
        // increasing.
2411
        bool good_client_version =
46,530✔
2412
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
46,530✔
2413
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
46,534✔
2414
        if (!good_client_version) {
46,530✔
2415
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2416
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2417
                                 "(%1, %2, %3)",
×
2418
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2419
                                 progress.download.last_integrated_client_version)};
×
2420
        }
×
2421
        last_integrated_client_version = changeset.last_integrated_local_version;
46,530✔
2422
        // Server shouldn't send our own changes, and zero is not a valid client
2423
        // file identifier.
2424
        bool good_file_ident =
46,530✔
2425
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
46,530✔
2426
        if (!good_file_ident) {
46,530✔
2427
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2428
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2429
                                 changeset.origin_file_ident)};
×
2430
        }
×
2431
    }
46,530✔
2432

2433
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
47,650✔
2434
                                       message.batch_state, message.changesets.size());
47,650✔
2435
    if (hook_action == SyncClientHookAction::EarlyReturn) {
47,650✔
2436
        return Status::OK();
16✔
2437
    }
16✔
2438
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
47,634✔
2439

2440
    if (process_flx_bootstrap_message(message)) {
47,634✔
2441
        clear_resumption_delay_state();
3,954✔
2442
        return Status::OK();
3,954✔
2443
    }
3,954✔
2444

2445
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
43,680✔
2446
                                  message.changesets); // Throws
43,680✔
2447

2448
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
43,680✔
2449
                                  message.batch_state, message.changesets.size());
43,680✔
2450
    if (hook_action == SyncClientHookAction::EarlyReturn) {
43,680✔
2451
        return Status::OK();
×
2452
    }
×
2453
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
43,680✔
2454

2455
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2456
    // after a retryable session error.
2457
    clear_resumption_delay_state();
43,680✔
2458
    return Status::OK();
43,680✔
2459
}
43,680✔
2460

2461
Status Session::receive_mark_message(request_ident_type request_ident)
2462
{
17,016✔
2463
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,016✔
2464

2465
    // Ignore the message if the deactivation process has been initiated,
2466
    // because in that case, the associated Realm and SessionWrapper must
2467
    // not be accessed any longer.
2468
    if (m_state != Active)
17,016✔
2469
        return Status::OK(); // Success
68✔
2470

2471
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,948✔
2472
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,948✔
2473
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
10✔
2474
    }
10✔
2475
    bool good_request_ident =
16,938✔
2476
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,938✔
2477
    if (REALM_UNLIKELY(!good_request_ident)) {
16,938✔
2478
        return {
×
2479
            ErrorCodes::SyncProtocolInvariantFailed,
×
2480
            util::format(
×
2481
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2482
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2483
    }
×
2484

2485
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,938✔
2486
    m_last_download_mark_received = request_ident;
16,938✔
2487
    check_for_download_completion(); // Throws
16,938✔
2488

2489
    return Status::OK(); // Success
16,938✔
2490
}
16,938✔
2491

2492

2493
// The caller (Connection) must discard the session if the session has become
2494
// deactivated upon return.
2495
Status Session::receive_unbound_message()
2496
{
4,168✔
2497
    logger.debug("Received: UNBOUND");
4,168✔
2498

2499
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,168✔
2500
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,168✔
2501
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2502
    }
×
2503

2504
    // The fact that the UNBIND message has been sent, but an ERROR message has
2505
    // not been received, implies that the deactivation process must have been
2506
    // initiated, so this session must be in the Deactivating state or the session
2507
    // has been suspended because of a client side error.
2508
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,168!
2509

2510
    m_unbound_message_received = true;
4,168✔
2511

2512
    // Detect completion of the unbinding process
2513
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,168✔
2514
        // The deactivation process completes when the unbinding process
2515
        // completes.
2516
        complete_deactivation(); // Throws
4,168✔
2517
        // Life cycle state is now Deactivated
2518
    }
4,168✔
2519

2520
    return Status::OK(); // Success
4,168✔
2521
}
4,168✔
2522

2523

2524
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2525
{
20✔
2526
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2527
    // Ignore the message if the deactivation process has been initiated,
2528
    // because in that case, the associated Realm and SessionWrapper must
2529
    // not be accessed any longer.
2530
    if (m_state == Active) {
20✔
2531
        on_flx_sync_error(query_version, message); // throws
20✔
2532
    }
20✔
2533
    return Status::OK();
20✔
2534
}
20✔
2535

2536
// The caller (Connection) must discard the session if the session has become
2537
// deactivated upon return.
2538
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2539
{
884✔
2540
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
884✔
2541
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
884✔
2542

2543
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
884✔
2544
    if (REALM_UNLIKELY(!legal_at_this_time)) {
884✔
2545
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2546
    }
×
2547

2548
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
884✔
2549
    auto status = protocol_error_to_status(protocol_error, info.message);
884✔
2550
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
884✔
2551
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2552
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2553
                             info.raw_error_code)};
×
2554
    }
×
2555

2556
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2557
    // the SessionWrapper may not be available
2558
    if (m_state == Active) {
884✔
2559
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
874✔
2560
        if (debug_action == SyncClientHookAction::EarlyReturn) {
874✔
2561
            return Status::OK();
8✔
2562
        }
8✔
2563
    }
874✔
2564

2565
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2566
    // containing the compensating write has appeared in a download message.
2567
    if (status == ErrorCodes::SyncCompensatingWrite) {
876✔
2568
        // If the client is not active, the compensating writes will not be processed now, but will be
2569
        // sent again the next time the client connects
2570
        if (m_state == Active) {
48✔
2571
            REALM_ASSERT(info.compensating_write_server_version.has_value());
48✔
2572
            m_pending_compensating_write_errors.push_back(info);
48✔
2573
        }
48✔
2574
        return Status::OK();
48✔
2575
    }
48✔
2576

2577
    if (protocol_error == ProtocolError::schema_version_changed) {
828✔
2578
        // Enable upload immediately if the session is still active.
2579
        if (m_state == Active) {
70✔
2580
            auto wt = get_db()->start_write();
70✔
2581
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
70✔
2582
            wt->commit();
70✔
2583
            // Notify SyncSession a schema migration is required.
2584
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
70✔
2585
        }
70✔
2586
        // Keep the session active to upload any unsynced changes.
2587
        return Status::OK();
70✔
2588
    }
70✔
2589

2590
    m_error_message_received = true;
758✔
2591
    suspend(SessionErrorInfo{info, std::move(status)});
758✔
2592
    return Status::OK();
758✔
2593
}
828✔
2594

2595
void Session::suspend(const SessionErrorInfo& info)
2596
{
838✔
2597
    REALM_ASSERT(!m_suspended);
838✔
2598
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
838✔
2599
    logger.debug("Suspended"); // Throws
838✔
2600

2601
    m_suspended = true;
838✔
2602

2603
    // Detect completion of the unbinding process
2604
    if (m_unbind_message_send_complete && m_error_message_received) {
838✔
2605
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2606
        // we received an ERROR message implies that the deactivation process must
2607
        // have been initiated, so this session must be in the Deactivating state.
2608
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
10✔
2609

2610
        // The deactivation process completes when the unbinding process
2611
        // completes.
2612
        complete_deactivation(); // Throws
10✔
2613
        // Life cycle state is now Deactivated
2614
    }
10✔
2615

2616
    // Notify the application of the suspension of the session if the session is
2617
    // still in the Active state
2618
    if (m_state == Active) {
838✔
2619
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
828✔
2620
        m_conn.one_less_active_unsuspended_session(); // Throws
828✔
2621
        on_suspended(info);                           // Throws
828✔
2622
    }
828✔
2623

2624
    if (!info.is_fatal) {
838✔
2625
        begin_resumption_delay(info);
174✔
2626
    }
174✔
2627

2628
    // Ready to send the UNBIND message, if it has not been sent already
2629
    if (!m_unbind_message_sent)
838✔
2630
        ensure_enlisted_to_send(); // Throws
828✔
2631
}
838✔
2632

2633
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2634
{
60✔
2635
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
60✔
2636
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2637
                           [&](const PendingTestCommand& command) {
60✔
2638
                               return command.id == ident;
60✔
2639
                           });
60✔
2640
    if (it == m_pending_test_commands.end()) {
60✔
2641
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2642
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2643
    }
×
2644

2645
    it->promise.emplace_value(std::string{body});
60✔
2646
    m_pending_test_commands.erase(it);
60✔
2647

2648
    return Status::OK();
60✔
2649
}
60✔
2650

2651
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2652
{
174✔
2653
    REALM_ASSERT(!m_try_again_activation_timer);
174✔
2654

2655
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
174✔
2656
                                  error_info.resumption_delay_interval);
174✔
2657
    auto try_again_interval = m_try_again_delay_info.delay_interval();
174✔
2658
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
174✔
2659
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2660
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2661
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2662
        try_again_interval = std::chrono::milliseconds{1000};
144✔
2663
    }
144✔
2664
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
174✔
2665
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
174✔
2666
        if (status == ErrorCodes::OperationAborted)
174✔
2667
            return;
26✔
2668
        else if (!status.is_ok())
148✔
2669
            throw Exception(status);
×
2670

2671
        m_try_again_activation_timer.reset();
148✔
2672
        cancel_resumption_delay();
148✔
2673
    });
148✔
2674
}
174✔
2675

2676
void Session::clear_resumption_delay_state()
2677
{
47,630✔
2678
    if (m_try_again_activation_timer) {
47,630✔
2679
        logger.debug("Clearing resumption delay state after successful download");
×
2680
        m_try_again_delay_info.reset();
×
2681
    }
×
2682
}
47,630✔
2683

2684
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2685
{
47,650✔
2686
    const SyncProgress& a = m_progress;
47,650✔
2687
    const SyncProgress& b = progress;
47,650✔
2688
    std::string message;
47,650✔
2689
    if (b.latest_server_version.version < a.latest_server_version.version) {
47,650✔
2690
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2691
                               "session (current: %1, received: %2)",
×
2692
                               a.latest_server_version.version, b.latest_server_version.version);
×
2693
    }
×
2694
    if (b.upload.client_version < a.upload.client_version) {
47,650✔
2695
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2696
                               "throughout a session (current: %1, received: %2)",
×
2697
                               a.upload.client_version, b.upload.client_version);
×
2698
    }
×
2699
    if (b.upload.client_version > m_last_version_available) {
47,650✔
2700
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2701
                               "version in existence (current: %1, received: %2)",
×
2702
                               m_last_version_available, b.upload.client_version);
×
2703
    }
×
2704
    if (b.download.server_version < a.download.server_version) {
47,650✔
2705
        message =
×
2706
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2707
                         a.download.server_version, b.download.server_version);
×
2708
    }
×
2709
    if (b.download.server_version > b.latest_server_version.version) {
47,650✔
2710
        message = util::format(
×
2711
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2712
            b.download.server_version, b.latest_server_version.version);
×
2713
    }
×
2714
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
47,650✔
2715
        message = util::format(
×
2716
            "Last integrated client version on the server at the position in the server's history of the download "
×
2717
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2718
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2719
    }
×
2720
    if (b.download.last_integrated_client_version > b.upload.client_version) {
47,650✔
2721
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2722
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2723
                               "on the server (download: %1, upload: %2)",
×
2724
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2725
    }
×
2726
    if (b.download.server_version < b.upload.last_integrated_server_version) {
47,650✔
2727
        message = util::format(
×
2728
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2729
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2730
            b.download.server_version, b.upload.last_integrated_server_version);
×
2731
    }
×
2732

2733
    if (message.empty()) {
47,650✔
2734
        return Status::OK();
47,646✔
2735
    }
47,646✔
2736
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
4✔
2737
}
47,650✔
2738

2739

2740
void Session::check_for_download_completion()
2741
{
62,978✔
2742
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
62,978✔
2743
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
62,978✔
2744
    if (m_last_download_mark_received == m_last_triggering_download_mark)
62,978✔
2745
        return;
45,792✔
2746
    if (m_last_download_mark_received < m_target_download_mark)
17,186✔
2747
        return;
366✔
2748
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,820✔
2749
        return;
×
2750
    m_last_triggering_download_mark = m_target_download_mark;
16,820✔
2751
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,820✔
2752
        // Activate the upload process now, and enable immediate reactivation
2753
        // after a subsequent fast reconnect.
2754
        m_delay_uploads = false;
4,750✔
2755
        ensure_enlisted_to_send(); // Throws
4,750✔
2756
    }
4,750✔
2757
    on_download_completion(); // Throws
16,820✔
2758
}
16,820✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc