• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_484

05 Aug 2024 04:20PM UTC coverage: 91.097% (-0.01%) from 91.108%
thomas.goyne_484

Pull #7912

Evergreen

tgoyne
Extract some duplicated code for sync triggers and timers
Pull Request #7912: Extract some duplicated code for sync triggers and timers

102644 of 181486 branches covered (56.56%)

62 of 71 new or added lines in 6 files covered. (87.32%)

87 existing lines in 14 files now uncovered.

216695 of 237872 relevant lines covered (91.1%)

5798402.26 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.18
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/client_reset_operation.hpp>
10
#include <realm/sync/noinst/sync_schema_migration.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <system_error>
24
#include <sstream>
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,974✔
48
    m_backoff_state.reset();
1,974✔
49
    scheduled_reset = false;
1,974✔
50
}
1,974✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,854✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,854✔
57
}
3,854✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
6,178✔
62
    if (scheduled_reset) {
6,178✔
63
        reset();
8✔
64
    }
8✔
65

66
    if (!m_backoff_state.triggering_error) {
6,178✔
67
        return std::chrono::milliseconds::zero();
4,712✔
68
    }
4,712✔
69

70
    switch (*m_backoff_state.triggering_error) {
1,466✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,364✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,364✔
77
                return std::chrono::milliseconds::max();
1,008✔
78
            }
1,008✔
79

80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
356✔
81
            return m_backoff_state.delay_interval();
356✔
82
    }
1,466✔
83
}
1,466✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
4,382✔
89
    util::Uri uri(url); // Throws
4,382✔
90
    uri.canonicalize(); // Throws
4,382✔
91
    std::string userinfo, address_2, port_2;
4,382✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,382✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,382✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,382✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,382✔
96
    if (REALM_UNLIKELY(!good))
4,382✔
97
        return false;
×
98
    ProtocolEnvelope protocol_2;
4,382✔
99
    port_type port_3;
4,382✔
100
    if (realm_scheme) {
4,382✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
4,382✔
111
        REALM_ASSERT(ws_scheme);
4,382✔
112
        if (uri.get_scheme() == "ws:") {
4,382✔
113
            protocol_2 = ProtocolEnvelope::ws;
4,302✔
114
            port_3 = 80;
4,302✔
115
        }
4,302✔
116
        else {
80✔
117
            protocol_2 = ProtocolEnvelope::wss;
80✔
118
            port_3 = 443;
80✔
119
        }
80✔
120
    }
4,382✔
121
    if (!port_2.empty()) {
4,382✔
122
        std::istringstream in(port_2);    // Throws
4,278✔
123
        in.imbue(std::locale::classic()); // Throws
4,278✔
124
        in >> port_3;
4,278✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,278✔
126
            return false;
×
127
    }
4,278✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
4,382✔
129

130
    protocol = protocol_2;
4,382✔
131
    address = std::move(address_2);
4,382✔
132
    port = port_3;
4,382✔
133
    path = std::move(path_2);
4,382✔
134
    return true;
4,382✔
135
}
4,382✔
136

137
ClientImpl::ClientImpl(ClientConfig config)
138
    : logger(std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger)))
4,914✔
139
    , m_reconnect_mode{config.reconnect_mode}
4,914✔
140
    , m_connect_timeout{config.connect_timeout}
4,914✔
141
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,914✔
142
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,914✔
143
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,914✔
144
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,914✔
145
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,914✔
146
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,914✔
147
    , m_dry_run{config.dry_run}
4,914✔
148
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,914✔
149
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,914✔
150
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,914✔
151
    , m_socket_provider{std::move(config.socket_provider)}
4,914✔
152
    , m_one_connection_per_session{config.one_connection_per_session}
4,914✔
153
    , m_actualize_and_finalize{*this, &ClientImpl::actualize_and_finalize_session_wrappers, this}
4,914✔
154
{
9,966✔
155
    // FIXME: Would be better if seeding was up to the application.
156
    util::seed_prng_nondeterministically(m_random); // Throws
9,966✔
157

158
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,966✔
159
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,966✔
160
                 get_current_protocol_version()); // Throws
9,966✔
161
    logger.info("Platform: %1", util::get_platform_info());
9,966✔
162
    const char* build_mode;
9,966✔
163
#if REALM_DEBUG
9,966✔
164
    build_mode = "Debug";
9,966✔
165
#else
166
    build_mode = "Release";
167
#endif
168
    logger.debug("Build mode: %1", build_mode);
9,966✔
169
    logger.debug("Config param: one_connection_per_session = %1",
9,966✔
170
                 config.one_connection_per_session); // Throws
9,966✔
171
    logger.debug("Config param: connect_timeout = %1 ms",
9,966✔
172
                 config.connect_timeout); // Throws
9,966✔
173
    logger.debug("Config param: connection_linger_time = %1 ms",
9,966✔
174
                 config.connection_linger_time); // Throws
9,966✔
175
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,966✔
176
                 config.ping_keepalive_period); // Throws
9,966✔
177
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,966✔
178
                 config.pong_keepalive_timeout); // Throws
9,966✔
179
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,966✔
180
                 config.fast_reconnect_limit); // Throws
9,966✔
181
    logger.debug("Config param: disable_sync_to_disk = %1",
9,966✔
182
                 config.disable_sync_to_disk); // Throws
9,966✔
183
    logger.debug(
9,966✔
184
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,966✔
185
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,966✔
186
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,966✔
187
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,966✔
188

189
    if (config.reconnect_mode != ReconnectMode::normal) {
9,966✔
190
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
191
                    "never do this in production!");
776✔
192
    }
776✔
193

194
    if (config.dry_run) {
9,966✔
195
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
196
                    "never do this in production!");
×
197
    }
×
198

199
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,966✔
200

201
    if (m_one_connection_per_session) {
9,966✔
202
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
203
                    "never do this in production");
4✔
204
    }
4✔
205

206
    if (config.disable_upload_activation_delay) {
9,966✔
207
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
208
                    "never do this in production");
×
209
    }
×
210

211
    if (config.disable_sync_to_disk) {
9,966✔
212
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
213
                    "never do this in production");
×
214
    }
×
215
}
9,966✔
216

217
void ClientImpl::incr_outstanding_posts()
218
{
206,310✔
219
    util::CheckedLockGuard lock(m_drain_mutex);
206,310✔
220
    ++m_outstanding_posts;
206,310✔
221
    m_drained = false;
206,310✔
222
}
206,310✔
223

224
void ClientImpl::decr_outstanding_posts()
225
{
206,310✔
226
    util::CheckedLockGuard lock(m_drain_mutex);
206,310✔
227
    REALM_ASSERT(m_outstanding_posts);
206,310✔
228
    if (--m_outstanding_posts <= 0) {
206,310✔
229
        // Notify must happen with lock held or another thread could destroy
230
        // ClientImpl between when we release the lock and when we call notify
231
        m_drain_cv.notify_all();
18,234✔
232
    }
18,234✔
233
}
206,310✔
234

235
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
236
{
57,088✔
237
    REALM_ASSERT(m_socket_provider);
57,088✔
238
    incr_outstanding_posts();
57,088✔
239
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
57,088✔
240
        auto decr_guard = util::make_scope_exit([&]() noexcept {
57,088✔
241
            decr_outstanding_posts();
57,084✔
242
        });
57,084✔
243
        handler(status);
57,088✔
244
    });
57,088✔
245
}
57,088✔
246

247
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
248
{
130,876✔
249
    REALM_ASSERT(m_socket_provider);
130,876✔
250
    incr_outstanding_posts();
130,876✔
251
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
130,876✔
252
        auto decr_guard = util::make_scope_exit([&]() noexcept {
130,872✔
253
            decr_outstanding_posts();
130,870✔
254
        });
130,870✔
255
        if (status == ErrorCodes::OperationAborted)
130,870✔
256
            return;
×
257
        if (!status.is_ok())
130,870✔
258
            throw Exception(status);
×
259
        handler();
130,870✔
260
    });
130,870✔
261
}
130,876✔
262

263

264
void ClientImpl::drain_connections()
265
{
9,966✔
266
    logger.debug("Draining connections during sync client shutdown");
9,966✔
267
    for (auto& server_slot_pair : m_server_slots) {
9,966✔
268
        auto& server_slot = server_slot_pair.second;
2,728✔
269

270
        if (server_slot.connection) {
2,728✔
271
            auto& conn = server_slot.connection;
2,504✔
272
            conn->force_close();
2,504✔
273
        }
2,504✔
274
        else {
224✔
275
            for (auto& conn_pair : server_slot.alt_connections) {
224✔
276
                conn_pair.second->force_close();
×
277
            }
×
278
        }
224✔
279
    }
2,728✔
280
}
9,966✔
281

282

283
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
284
                                                       util::UniqueFunction<void()>&& handler)
285
{
18,354✔
286
    REALM_ASSERT(m_socket_provider);
18,354✔
287
    incr_outstanding_posts();
18,354✔
288
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
18,360✔
289
        ScopeExit decr_guard([&]() noexcept {
18,360✔
290
            decr_outstanding_posts();
18,356✔
291
        });
18,356✔
292
        if (status == ErrorCodes::OperationAborted)
18,360✔
293
            return;
14,056✔
294
        if (!status.is_ok())
4,304✔
NEW
295
            throw Exception(status);
×
296
        handler();
4,304✔
297
    });
4,304✔
298
}
18,354✔
299

300

301
Connection::~Connection()
302
{
2,834✔
303
    if (m_websocket_sentinel) {
2,834✔
304
        m_websocket_sentinel->destroyed = true;
×
305
        m_websocket_sentinel.reset();
×
306
    }
×
307
}
2,834✔
308

309
void Connection::activate()
310
{
2,836✔
311
    m_activated = true;
2,836✔
312
    if (m_num_active_sessions == 0)
2,836✔
NEW
313
        m_on_idle.trigger();
×
314
    // We cannot in general connect immediately, because a prior failure to
315
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
316
    initiate_reconnect_wait(); // Throws
2,836✔
317
}
2,836✔
318

319

320
void Connection::activate_session(std::unique_ptr<Session> sess)
321
{
10,408✔
322
    REALM_ASSERT(sess);
10,408✔
323
    REALM_ASSERT(&sess->m_conn == this);
10,408✔
324
    REALM_ASSERT(!m_force_closed);
10,408✔
325
    Session& sess_2 = *sess;
10,408✔
326
    session_ident_type ident = sess->m_ident;
10,408✔
327
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,408✔
328
    bool was_inserted = p.second;
10,408✔
329
    REALM_ASSERT(was_inserted);
10,408✔
330
    // Save the session ident to the historical list of session idents
331
    m_session_history.insert(ident);
10,408✔
332
    sess_2.activate(); // Throws
10,408✔
333
    if (m_state == ConnectionState::connected) {
10,408✔
334
        bool fast_reconnect = false;
7,288✔
335
        sess_2.connection_established(fast_reconnect); // Throws
7,288✔
336
    }
7,288✔
337
    ++m_num_active_sessions;
10,408✔
338
}
10,408✔
339

340

341
void Connection::initiate_session_deactivation(Session* sess)
342
{
10,410✔
343
    REALM_ASSERT(sess);
10,410✔
344
    REALM_ASSERT(&sess->m_conn == this);
10,410✔
345
    REALM_ASSERT(m_num_active_sessions);
10,410✔
346
    // Since the client may be waiting for m_num_active_sessions to reach 0
347
    // in stop_and_wait() (on a separate thread), deactivate Session before
348
    // decrementing the num active sessions value.
349
    sess->initiate_deactivation(); // Throws
10,410✔
350
    if (sess->m_state == Session::Deactivated) {
10,410✔
351
        finish_session_deactivation(sess);
952✔
352
    }
952✔
353
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,410✔
354
        if (m_activated && m_state == ConnectionState::disconnected)
4,642✔
355
            m_on_idle.trigger();
374✔
356
    }
4,642✔
357
}
10,410✔
358

359

360
void Connection::cancel_reconnect_delay()
361
{
2,198✔
362
    REALM_ASSERT(m_activated);
2,198✔
363

364
    if (m_reconnect_delay_in_progress) {
2,198✔
365
        if (m_nonzero_reconnect_delay)
1,962✔
366
            logger.detail("Canceling reconnect delay"); // Throws
984✔
367

368
        // Cancel the in-progress wait operation by destroying the timer
369
        // object. Destruction is needed in this case, because a new wait
370
        // operation might have to be initiated before the previous one
371
        // completes (its completion handler starts to execute), so the new wait
372
        // operation must be done on a new timer object.
373
        m_reconnect_disconnect_timer.reset();
1,962✔
374
        m_reconnect_delay_in_progress = false;
1,962✔
375
        m_reconnect_info.reset();
1,962✔
376
        initiate_reconnect_wait(); // Throws
1,962✔
377
        return;
1,962✔
378
    }
1,962✔
379

380
    // If we are not disconnected, then we need to make sure the next time we get disconnected
381
    // that we are allowed to re-connect as quickly as possible.
382
    //
383
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
384
    // backoff/delay state before calculating the next delay, unless a PONG message is received
385
    // for the urgent PING message we send below.
386
    //
387
    // If we get a PONG message for the urgent PING message sent below, then the connection is
388
    // healthy and we can calculate the next delay normally.
389
    if (m_state != ConnectionState::disconnected) {
236✔
390
        m_reconnect_info.scheduled_reset = true;
236✔
391
        m_ping_after_scheduled_reset_of_reconnect_info = false;
236✔
392

393
        schedule_urgent_ping(); // Throws
236✔
394
        return;
236✔
395
    }
236✔
396
    // Nothing to do in this case. The next reconnect attemp will be made as
397
    // soon as there are any sessions that are both active and unsuspended.
398
}
236✔
399

400
void Connection::finish_session_deactivation(Session* sess)
401
{
8,056✔
402
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,056✔
403
    auto ident = sess->m_ident;
8,056✔
404
    m_sessions.erase(ident);
8,056✔
405
    m_session_history.erase(ident);
8,056✔
406
}
8,056✔
407

408
void Connection::force_close()
409
{
2,506✔
410
    if (m_force_closed) {
2,506✔
411
        return;
×
412
    }
×
413

414
    m_force_closed = true;
2,506✔
415

416
    if (m_state != ConnectionState::disconnected) {
2,506✔
417
        voluntary_disconnect();
2,474✔
418
    }
2,474✔
419

420
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,506✔
421
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,506✔
422
        m_reconnect_disconnect_timer.reset();
32✔
423
        m_reconnect_delay_in_progress = false;
32✔
424
        m_disconnect_delay_in_progress = false;
32✔
425
    }
32✔
426

427
    // We must copy any session pointers we want to close to a vector because force_closing
428
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
429
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
430
    std::vector<Session*> to_close;
2,506✔
431
    for (auto& session_pair : m_sessions) {
2,506✔
432
        if (session_pair.second->m_state == Session::State::Active) {
102✔
433
            to_close.push_back(session_pair.second.get());
102✔
434
        }
102✔
435
    }
102✔
436

437
    for (auto& sess : to_close) {
2,506✔
438
        sess->force_close();
102✔
439
    }
102✔
440

441
    logger.debug("Force closed idle connection");
2,506✔
442
}
2,506✔
443

444

445
void Connection::websocket_connected_handler(const std::string& protocol)
446
{
3,650✔
447
    if (!protocol.empty()) {
3,650✔
448
        std::string_view expected_prefix =
3,650✔
449
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,650✔
450
        // FIXME: Use std::string_view::begins_with() in C++20.
451
        auto prefix_matches = [&](std::string_view other) {
3,650✔
452
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,650✔
453
        };
3,650✔
454
        if (prefix_matches(expected_prefix)) {
3,650✔
455
            util::MemoryInputStream in;
3,650✔
456
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,650✔
457
            in.imbue(std::locale::classic());
3,650✔
458
            in.unsetf(std::ios_base::skipws);
3,650✔
459
            int value_2 = 0;
3,650✔
460
            in >> value_2;
3,650✔
461
            if (in && in.eof() && value_2 >= 0) {
3,650✔
462
                bool good_version =
3,650✔
463
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,650✔
464
                if (good_version) {
3,650✔
465
                    logger.detail("Negotiated protocol version: %1", value_2);
3,650✔
466
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
467
                    // will provide the appservices connection ID via a log message.
468
                    // TODO: Remove once the server starts sending the connection ID
469
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,650✔
470
                    m_negotiated_protocol_version = value_2;
3,650✔
471
                    handle_connection_established(); // Throws
3,650✔
472
                    return;
3,650✔
473
                }
3,650✔
474
            }
3,650✔
475
        }
3,650✔
476
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
477
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
478
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
479
    }
×
480
    else {
×
481
        close_due_to_client_side_error(
×
482
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
483
            ConnectionTerminationReason::bad_headers_in_http_response);
×
484
    }
×
485
}
3,650✔
486

487

488
bool Connection::websocket_binary_message_received(util::Span<const char> data)
489
{
82,442✔
490
    if (m_force_closed) {
82,442✔
491
        logger.debug("Received binary message after connection was force closed");
×
492
        return false;
×
493
    }
×
494

495
    using sf = SimulatedFailure;
82,442✔
496
    if (sf::check_trigger(sf::sync_client__read_head)) {
82,442✔
497
        close_due_to_client_side_error(
438✔
498
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
438✔
499
            ConnectionTerminationReason::read_or_write_error);
438✔
500
        return bool(m_websocket);
438✔
501
    }
438✔
502

503
    handle_message_received(data);
82,004✔
504
    return bool(m_websocket);
82,004✔
505
}
82,442✔
506

507

508
void Connection::websocket_error_handler()
509
{
748✔
510
    m_websocket_error_received = true;
748✔
511
}
748✔
512

513
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
514
{
868✔
515
    if (m_force_closed) {
868✔
516
        logger.debug("Received websocket close message after connection was force closed");
×
517
        return false;
×
518
    }
×
519
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
868✔
520

521
    switch (error_code) {
868✔
522
        case WebSocketError::websocket_ok:
70✔
523
            break;
70✔
524
        case WebSocketError::websocket_resolve_failed:
4✔
525
            [[fallthrough]];
4✔
526
        case WebSocketError::websocket_connection_failed: {
112✔
527
            SessionErrorInfo error_info(
112✔
528
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
529
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
530
            // to make sure the websocket URL is correct
531
            if (!m_server_endpoint.is_verified) {
112✔
532
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
533
            }
84✔
534
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
535
            break;
112✔
536
        }
4✔
537
        case WebSocketError::websocket_read_error:
614✔
538
            [[fallthrough]];
614✔
539
        case WebSocketError::websocket_write_error: {
614✔
540
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
614✔
541
                                         ConnectionTerminationReason::read_or_write_error);
614✔
542
            break;
614✔
543
        }
614✔
544
        case WebSocketError::websocket_going_away:
✔
545
            [[fallthrough]];
×
546
        case WebSocketError::websocket_protocol_error:
✔
547
            [[fallthrough]];
×
548
        case WebSocketError::websocket_unsupported_data:
✔
549
            [[fallthrough]];
×
550
        case WebSocketError::websocket_invalid_payload_data:
✔
551
            [[fallthrough]];
×
552
        case WebSocketError::websocket_policy_violation:
✔
553
            [[fallthrough]];
×
554
        case WebSocketError::websocket_reserved:
✔
555
            [[fallthrough]];
×
556
        case WebSocketError::websocket_no_status_received:
✔
557
            [[fallthrough]];
×
558
        case WebSocketError::websocket_invalid_extension: {
✔
559
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
560
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
561
            break;
×
562
        }
×
563
        case WebSocketError::websocket_message_too_big: {
4✔
564
            auto message = util::format(
4✔
565
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
566
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
567
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
568
            involuntary_disconnect(std::move(error_info),
4✔
569
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
570
            break;
4✔
571
        }
×
572
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
573
            close_due_to_client_side_error(
10✔
574
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
575
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
576
            break;
10✔
577
        }
×
578
        case WebSocketError::websocket_fatal_error: {
✔
579
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
580
            // been verified, then use a non-fatal error and try to perform a location update.
581
            SessionErrorInfo error_info(
×
582
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
583
                IsFatal{m_server_endpoint.is_verified});
×
584
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
585
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
586
            // to make sure the websocket URL is correct
587
            if (!m_server_endpoint.is_verified) {
×
588
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
589
                reason = ConnectionTerminationReason::connect_operation_failed;
×
590
            }
×
591
            involuntary_disconnect(std::move(error_info), reason);
×
592
            break;
×
593
        }
×
594
        case WebSocketError::websocket_forbidden: {
✔
595
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
596
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
597
            involuntary_disconnect(std::move(error_info),
×
598
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
599
            break;
×
600
        }
×
601
        case WebSocketError::websocket_unauthorized: {
44✔
602
            SessionErrorInfo error_info(
44✔
603
                {ErrorCodes::AuthError,
44✔
604
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
605
                IsFatal{false});
44✔
606
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
607
            involuntary_disconnect(std::move(error_info),
44✔
608
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
609
            break;
44✔
610
        }
×
611
        case WebSocketError::websocket_moved_permanently: {
14✔
612
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
14✔
613
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
14✔
614
            involuntary_disconnect(std::move(error_info),
14✔
615
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
14✔
616
            break;
14✔
617
        }
×
618
        case WebSocketError::websocket_abnormal_closure: {
✔
619
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
620
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
621
            involuntary_disconnect(std::move(error_info),
×
622
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
623
            break;
×
624
        }
×
625
        case WebSocketError::websocket_internal_server_error:
✔
626
            [[fallthrough]];
×
627
        case WebSocketError::websocket_retry_error: {
✔
628
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
629
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
630
            break;
×
631
        }
×
632
    }
868✔
633

634
    return bool(m_websocket);
868✔
635
}
868✔
636

637
// Guarantees that handle_reconnect_wait() is never called from within the
638
// execution of initiate_reconnect_wait() (no callback reentrance).
639
void Connection::initiate_reconnect_wait()
640
{
8,652✔
641
    REALM_ASSERT(m_activated);
8,652✔
642
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,652✔
643
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,652✔
644

645
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
646
    if (m_force_closed) {
8,652✔
647
        return;
2,474✔
648
    }
2,474✔
649

650
    m_reconnect_delay_in_progress = true;
6,178✔
651
    auto delay = m_reconnect_info.delay_interval();
6,178✔
652
    if (delay == std::chrono::milliseconds::max()) {
6,178✔
653
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,026✔
654
        // Not actually starting a timer corresponds to an infinite wait
655
        m_nonzero_reconnect_delay = true;
1,026✔
656
        return;
1,026✔
657
    }
1,026✔
658

659
    if (delay == std::chrono::milliseconds::zero()) {
5,152✔
660
        m_nonzero_reconnect_delay = false;
4,796✔
661
    }
4,796✔
662
    else {
356✔
663
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
356✔
664
        m_nonzero_reconnect_delay = true;
356✔
665
    }
356✔
666

667
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
668
    // we need it to be cancelable in case the connection is terminated before the timer
669
    // callback is run.
670
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this] {
5,152✔
671
        handle_reconnect_wait(); // Throws
3,852✔
672
    });                          // Throws
3,852✔
673
}
5,152✔
674

675

676
void Connection::handle_reconnect_wait()
677
{
3,852✔
678
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,852✔
679
    m_reconnect_delay_in_progress = false;
3,852✔
680

681
    if (m_num_active_unsuspended_sessions > 0)
3,852✔
682
        initiate_reconnect(); // Throws
3,842✔
683
}
3,852✔
684

685
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
686
    explicit WebSocketObserverShim(Connection* conn)
687
        : conn(conn)
1,754✔
688
        , sentinel(conn->m_websocket_sentinel)
1,754✔
689
    {
3,854✔
690
    }
3,854✔
691

692
    Connection* conn;
693
    util::bind_ptr<LifecycleSentinel> sentinel;
694

695
    void websocket_connected_handler(const std::string& protocol) override
696
    {
3,650✔
697
        if (sentinel->destroyed) {
3,650✔
698
            return;
×
699
        }
×
700

701
        return conn->websocket_connected_handler(protocol);
3,650✔
702
    }
3,650✔
703

704
    void websocket_error_handler() override
705
    {
748✔
706
        if (sentinel->destroyed) {
748✔
707
            return;
×
708
        }
×
709

710
        conn->websocket_error_handler();
748✔
711
    }
748✔
712

713
    bool websocket_binary_message_received(util::Span<const char> data) override
714
    {
82,440✔
715
        if (sentinel->destroyed) {
82,440✔
716
            return false;
×
717
        }
×
718

719
        return conn->websocket_binary_message_received(data);
82,440✔
720
    }
82,440✔
721

722
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
723
    {
868✔
724
        if (sentinel->destroyed) {
868✔
725
            return true;
×
726
        }
×
727

728
        return conn->websocket_closed_handler(was_clean, error_code, msg);
868✔
729
    }
868✔
730
};
731

732
void Connection::initiate_reconnect()
733
{
3,852✔
734
    REALM_ASSERT(m_activated);
3,852✔
735

736
    m_state = ConnectionState::connecting;
3,852✔
737
    report_connection_state_change(ConnectionState::connecting); // Throws
3,852✔
738
    if (m_websocket_sentinel) {
3,852✔
739
        m_websocket_sentinel->destroyed = true;
×
740
    }
×
741
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,852✔
742
    m_websocket.reset();
3,852✔
743

744
    // Watchdog
745
    initiate_connect_wait(); // Throws
3,852✔
746

747
    std::vector<std::string> sec_websocket_protocol;
3,852✔
748
    {
3,852✔
749
        auto protocol_prefix =
3,852✔
750
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,852✔
751
        int min = get_oldest_supported_protocol_version();
3,852✔
752
        int max = get_current_protocol_version();
3,852✔
753
        REALM_ASSERT_3(min, <=, max);
3,852✔
754
        // List protocol version in descending order to ensure that the server
755
        // selects the highest possible version.
756
        for (int version = max; version >= min; --version) {
53,928✔
757
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
50,076✔
758
        }
50,076✔
759
    }
3,852✔
760

761
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,852✔
762
                m_server_endpoint.port, m_http_request_path_prefix);
3,852✔
763

764
    m_websocket_error_received = false;
3,852✔
765
    m_websocket =
3,852✔
766
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,852✔
767
                                            WebSocketEndpoint{
3,852✔
768
                                                m_server_endpoint.address,
3,852✔
769
                                                m_server_endpoint.port,
3,852✔
770
                                                get_http_request_path(),
3,852✔
771
                                                std::move(sec_websocket_protocol),
3,852✔
772
                                                is_ssl(m_server_endpoint.envelope),
3,852✔
773
                                                /// DEPRECATED - The following will be removed in a future release
774
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,852✔
775
                                                m_verify_servers_ssl_certificate,
3,852✔
776
                                                m_ssl_trust_certificate_path,
3,852✔
777
                                                m_ssl_verify_callback,
3,852✔
778
                                                m_proxy_config,
3,852✔
779
                                            });
3,852✔
780
}
3,852✔
781

782

783
void Connection::initiate_connect_wait()
784
{
3,852✔
785
    // Deploy a watchdog to enforce an upper bound on the time it can take to
786
    // fully establish the connection (including SSL and WebSocket
787
    // handshakes). Without such a watchdog, connect operations could take very
788
    // long, or even indefinite time.
789
    std::chrono::milliseconds time(m_client.m_connect_timeout);
3,852✔
790
    m_connect_timer = m_client.create_timer(time, [this] {
3,852✔
NEW
791
        handle_connect_wait(); // Throws
×
NEW
792
    });                        // Throws
×
793
}
3,852✔
794

795

796
void Connection::handle_connect_wait()
797
{
×
798
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
799
    logger.info("Connect timeout"); // Throws
×
800
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
801
                                IsFatal{false});
×
802
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
803
    // to make sure the websocket URL is correct
804
    if (!m_server_endpoint.is_verified) {
×
805
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
806
    }
×
807
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
808
}
×
809

810

811
void Connection::handle_connection_established()
812
{
3,650✔
813
    // Cancel connect timeout watchdog
814
    m_connect_timer.reset();
3,650✔
815

816
    m_state = ConnectionState::connected;
3,650✔
817
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,650✔
818

819
    milliseconds_type now = monotonic_clock_now();
3,650✔
820
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,650✔
821
    initiate_ping_delay(now);     // Throws
3,650✔
822

823
    bool fast_reconnect = false;
3,650✔
824
    if (m_disconnect_has_occurred) {
3,650✔
825
        milliseconds_type time = now - m_disconnect_time;
1,066✔
826
        if (time <= m_client.m_fast_reconnect_limit)
1,066✔
827
            fast_reconnect = true;
1,066✔
828
    }
1,066✔
829

830
    for (auto& p : m_sessions) {
4,806✔
831
        Session& sess = *p.second;
4,806✔
832
        sess.connection_established(fast_reconnect); // Throws
4,806✔
833
    }
4,806✔
834

835
    report_connection_state_change(ConnectionState::connected); // Throws
3,650✔
836
}
3,650✔
837

838

839
void Connection::schedule_urgent_ping()
840
{
236✔
841
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
236✔
842
    if (m_ping_delay_in_progress) {
236✔
843
        m_heartbeat_timer.reset();
208✔
844
        m_ping_delay_in_progress = false;
208✔
845
        m_minimize_next_ping_delay = true;
208✔
846
        milliseconds_type now = monotonic_clock_now();
208✔
847
        initiate_ping_delay(now); // Throws
208✔
848
        return;
208✔
849
    }
208✔
850
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
28✔
851
    if (!m_send_ping)
28✔
852
        m_minimize_next_ping_delay = true;
26✔
853
}
28✔
854

855

856
void Connection::initiate_ping_delay(milliseconds_type now)
857
{
4,108✔
858
    REALM_ASSERT(!m_ping_delay_in_progress);
4,108✔
859
    REALM_ASSERT(!m_waiting_for_pong);
4,108✔
860
    REALM_ASSERT(!m_send_ping);
4,108✔
861

862
    milliseconds_type delay = 0;
4,108✔
863
    if (!m_minimize_next_ping_delay) {
4,108✔
864
        delay = m_client.m_ping_keepalive_period;
3,878✔
865
        // Make a randomized deduction of up to 10%, or up to 100% if this is
866
        // the first PING message to be sent since the connection was
867
        // established. The purpose of this randomized deduction is to reduce
868
        // the risk of many connections sending PING messages simultaneously to
869
        // the server.
870
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,878✔
871
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,878✔
872
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,878✔
873
        delay -= randomized_deduction;
3,878✔
874
        // Deduct the time spent waiting for PONG
875
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,878✔
876
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,878✔
877
        if (spent_time < delay) {
3,878✔
878
            delay -= spent_time;
3,870✔
879
        }
3,870✔
880
        else {
8✔
881
            delay = 0;
8✔
882
        }
8✔
883
    }
3,878✔
884
    else {
230✔
885
        m_minimize_next_ping_delay = false;
230✔
886
    }
230✔
887

888

889
    m_ping_delay_in_progress = true;
4,108✔
890

891
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this] {
4,108✔
892
        handle_ping_delay();                                    // Throws
270✔
893
    });                                                         // Throws
270✔
894
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
4,108✔
895
}
4,108✔
896

897

898
void Connection::handle_ping_delay()
899
{
270✔
900
    REALM_ASSERT(m_ping_delay_in_progress);
270✔
901
    m_ping_delay_in_progress = false;
270✔
902
    m_send_ping = true;
270✔
903

904
    initiate_pong_timeout(); // Throws
270✔
905

906
    if (m_state == ConnectionState::connected && !m_sending)
270✔
907
        send_next_message(); // Throws
212✔
908
}
270✔
909

910

911
void Connection::initiate_pong_timeout()
912
{
270✔
913
    REALM_ASSERT(!m_ping_delay_in_progress);
270✔
914
    REALM_ASSERT(!m_waiting_for_pong);
270✔
915
    REALM_ASSERT(m_send_ping);
270✔
916

917
    m_waiting_for_pong = true;
270✔
918
    m_pong_wait_started_at = monotonic_clock_now();
270✔
919

920
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
270✔
921
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
270✔
922
        handle_pong_timeout(); // Throws
12✔
923
    });                        // Throws
12✔
924
}
270✔
925

926

927
void Connection::handle_pong_timeout()
928
{
12✔
929
    REALM_ASSERT(m_waiting_for_pong);
12✔
930
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
931
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
932
                                 ConnectionTerminationReason::pong_timeout);
12✔
933
}
12✔
934

935

936
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
937
{
98,738✔
938
    // Stop sending messages if an websocket error was received.
939
    if (m_websocket_error_received)
98,738✔
940
        return;
×
941

942
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
98,738✔
943
        if (sentinel->destroyed) {
98,658✔
944
            return;
1,422✔
945
        }
1,422✔
946
        if (!status.is_ok()) {
97,236✔
947
            if (status != ErrorCodes::Error::OperationAborted) {
×
948
                // Write errors will be handled by the websocket_write_error_handler() callback
949
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
950
            }
×
951
            return;
×
952
        }
×
953
        handle_write_message(); // Throws
97,236✔
954
    });                         // Throws
97,236✔
955
    m_sending_session = sess;
98,738✔
956
    m_sending = true;
98,738✔
957
}
98,738✔
958

959

960
void Connection::handle_write_message()
961
{
97,226✔
962
    m_sending_session->message_sent(); // Throws
97,226✔
963
    if (m_sending_session->m_state == Session::Deactivated) {
97,226✔
964
        finish_session_deactivation(m_sending_session);
124✔
965
    }
124✔
966
    m_sending_session = nullptr;
97,226✔
967
    m_sending = false;
97,226✔
968
    send_next_message(); // Throws
97,226✔
969
}
97,226✔
970

971

972
void Connection::send_next_message()
973
{
173,390✔
974
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
173,390✔
975
    REALM_ASSERT(!m_sending_session);
173,390✔
976
    REALM_ASSERT(!m_sending);
173,390✔
977
    if (m_send_ping) {
173,390✔
978
        send_ping(); // Throws
258✔
979
        return;
258✔
980
    }
258✔
981
    while (!m_sessions_enlisted_to_send.empty()) {
249,582✔
982
        // The state of being connected is not supposed to be able to change
983
        // across this loop thanks to the "no callback reentrance" guarantee
984
        // provided by Websocket::async_write_text(), and friends.
985
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
175,580✔
986

987
        Session& sess = *m_sessions_enlisted_to_send.front();
175,580✔
988
        m_sessions_enlisted_to_send.pop_front();
175,580✔
989
        sess.send_message(); // Throws
175,580✔
990

991
        if (sess.m_state == Session::Deactivated) {
175,580✔
992
            finish_session_deactivation(&sess);
2,896✔
993
        }
2,896✔
994

995
        // An enlisted session may choose to not send a message. In that case,
996
        // we should pass the opportunity to the next enlisted session.
997
        if (m_sending)
175,580✔
998
            break;
99,130✔
999
    }
175,580✔
1000
}
173,132✔
1001

1002

1003
void Connection::send_ping()
1004
{
258✔
1005
    REALM_ASSERT(!m_ping_delay_in_progress);
258✔
1006
    REALM_ASSERT(m_waiting_for_pong);
258✔
1007
    REALM_ASSERT(m_send_ping);
258✔
1008

1009
    m_send_ping = false;
258✔
1010
    if (m_reconnect_info.scheduled_reset)
258✔
1011
        m_ping_after_scheduled_reset_of_reconnect_info = true;
224✔
1012

1013
    m_last_ping_sent_at = monotonic_clock_now();
258✔
1014
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
258✔
1015
                 m_previous_ping_rtt); // Throws
258✔
1016

1017
    ClientProtocol& protocol = get_client_protocol();
258✔
1018
    OutputBuffer& out = get_output_buffer();
258✔
1019
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
258✔
1020
    initiate_write_ping(out);                                          // Throws
258✔
1021
    m_ping_sent = true;
258✔
1022
}
258✔
1023

1024

1025
void Connection::initiate_write_ping(const OutputBuffer& out)
1026
{
258✔
1027
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
258✔
1028
        if (sentinel->destroyed) {
258✔
1029
            return;
2✔
1030
        }
2✔
1031
        if (!status.is_ok()) {
256✔
1032
            if (status != ErrorCodes::Error::OperationAborted) {
×
1033
                // Write errors will be handled by the websocket_write_error_handler() callback
1034
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1035
            }
×
1036
            return;
×
1037
        }
×
1038
        handle_write_ping(); // Throws
256✔
1039
    });                      // Throws
256✔
1040
    m_sending = true;
258✔
1041
}
258✔
1042

1043

1044
void Connection::handle_write_ping()
1045
{
256✔
1046
    REALM_ASSERT(m_sending);
256✔
1047
    REALM_ASSERT(!m_sending_session);
256✔
1048
    m_sending = false;
256✔
1049
    send_next_message(); // Throws
256✔
1050
}
256✔
1051

1052

1053
void Connection::handle_message_received(util::Span<const char> data)
1054
{
81,998✔
1055
    // parse_message_received() parses the message and calls the proper handler
1056
    // on the Connection object (this).
1057
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,998✔
1058
}
81,998✔
1059

1060

1061
void Connection::initiate_disconnect_wait()
1062
{
4,794✔
1063
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,794✔
1064

1065
    if (m_disconnect_delay_in_progress) {
4,794✔
1066
        m_reconnect_disconnect_timer.reset();
2,222✔
1067
        m_disconnect_delay_in_progress = false;
2,222✔
1068
    }
2,222✔
1069

1070
    milliseconds_type time = m_client.m_connection_linger_time;
4,794✔
1071

1072
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this] {
4,794✔
1073
        handle_disconnect_wait(); // Throws
12✔
1074
    });                           // Throws
12✔
1075
    m_disconnect_delay_in_progress = true;
4,794✔
1076
}
4,794✔
1077

1078

1079
void Connection::handle_disconnect_wait()
1080
{
12✔
1081
    m_disconnect_delay_in_progress = false;
12✔
1082

1083
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1084
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1085
        if (m_client.m_connection_linger_time > 0)
12✔
1086
            logger.detail("Linger time expired"); // Throws
×
1087
        voluntary_disconnect();                   // Throws
12✔
1088
        logger.info("Disconnected");              // Throws
12✔
1089
    }
12✔
1090
}
12✔
1091

1092

1093
void Connection::close_due_to_protocol_error(Status status)
1094
{
16✔
1095
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1096
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1097
    involuntary_disconnect(std::move(error_info),
16✔
1098
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1099
}
16✔
1100

1101

1102
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1103
{
448✔
1104
    logger.info("Connection closed due to error: %1", status); // Throws
448✔
1105

1106
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
448✔
1107
}
448✔
1108

1109

1110
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1111
{
626✔
1112
    logger.info("Connection closed due to transient error: %1", status); // Throws
626✔
1113
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
626✔
1114
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
626✔
1115

1116
    involuntary_disconnect(std::move(error_info), reason); // Throw
626✔
1117
}
626✔
1118

1119

1120
// Close connection due to error discovered on the server-side, and then
1121
// reported to the client by way of a connection-level ERROR message.
1122
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1123
{
68✔
1124
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
68✔
1125
                int(error_code)); // Throws
68✔
1126

1127
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
68✔
1128
                                      : ConnectionTerminationReason::server_said_try_again_later;
68✔
1129
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
68✔
1130
                           reason); // Throws
68✔
1131
}
68✔
1132

1133

1134
void Connection::disconnect(const SessionErrorInfo& info)
1135
{
3,854✔
1136
    // Cancel connect timeout watchdog
1137
    m_connect_timer.reset();
3,854✔
1138

1139
    if (m_state == ConnectionState::connected) {
3,854✔
1140
        m_disconnect_time = monotonic_clock_now();
3,646✔
1141
        m_disconnect_has_occurred = true;
3,646✔
1142

1143
        // Sessions that are in the Deactivating state at this time can be
1144
        // immediately discarded, in part because they are no longer enlisted to
1145
        // send. Such sessions will be taken to the Deactivated state by
1146
        // Session::connection_lost(), and then they will be removed from
1147
        // `m_sessions`.
1148
        auto i = m_sessions.begin(), end = m_sessions.end();
3,646✔
1149
        while (i != end) {
8,218✔
1150
            // Prevent invalidation of the main iterator when erasing elements
1151
            auto j = i++;
4,572✔
1152
            Session& sess = *j->second;
4,572✔
1153
            sess.connection_lost(); // Throws
4,572✔
1154
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,572✔
1155
                m_sessions.erase(j);
2,352✔
1156
        }
4,572✔
1157
    }
3,646✔
1158

1159
    change_state_to_disconnected();
3,854✔
1160

1161
    m_ping_delay_in_progress = false;
3,854✔
1162
    m_waiting_for_pong = false;
3,854✔
1163
    m_send_ping = false;
3,854✔
1164
    m_minimize_next_ping_delay = false;
3,854✔
1165
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,854✔
1166
    m_ping_sent = false;
3,854✔
1167
    m_heartbeat_timer.reset();
3,854✔
1168
    m_previous_ping_rtt = 0;
3,854✔
1169

1170
    m_websocket_sentinel->destroyed = true;
3,854✔
1171
    m_websocket_sentinel.reset();
3,854✔
1172
    m_websocket.reset();
3,854✔
1173
    m_input_body_buffer.reset();
3,854✔
1174
    m_sending_session = nullptr;
3,854✔
1175
    m_sessions_enlisted_to_send.clear();
3,854✔
1176
    m_sending = false;
3,854✔
1177

1178
    if (!m_appservices_coid.empty()) {
3,854✔
1179
        m_appservices_coid.clear();
3,612✔
1180
        logger.base_logger = make_logger(m_ident, std::nullopt, get_client().logger.base_logger);
3,612✔
1181
        for (auto& [ident, sess] : m_sessions) {
3,612✔
1182
            sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
2,170✔
1183
        }
2,170✔
1184
    }
3,612✔
1185

1186
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,854✔
1187
    initiate_reconnect_wait();                                           // Throws
3,854✔
1188
}
3,854✔
1189

1190
bool Connection::is_flx_sync_connection() const noexcept
1191
{
117,354✔
1192
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
117,354✔
1193
}
117,354✔
1194

1195
void Connection::receive_pong(milliseconds_type timestamp)
1196
{
250✔
1197
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
250✔
1198

1199
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
250✔
1200
    if (REALM_UNLIKELY(!legal_at_this_time)) {
250✔
1201
        close_due_to_protocol_error(
×
1202
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1203
        return;
×
1204
    }
×
1205

1206
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
250✔
1207
        close_due_to_protocol_error(
×
1208
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1209
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1210
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1211
        return;
×
1212
    }
×
1213

1214
    milliseconds_type now = monotonic_clock_now();
250✔
1215
    milliseconds_type round_trip_time = now - timestamp;
250✔
1216
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
250✔
1217
    m_previous_ping_rtt = round_trip_time;
250✔
1218

1219
    // If this PONG message is a response to a PING mesage that was sent after
1220
    // the last invocation of cancel_reconnect_delay(), then the connection is
1221
    // still good, and we do not have to skip the next reconnect delay.
1222
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
250✔
1223
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
208✔
1224
        m_ping_after_scheduled_reset_of_reconnect_info = false;
208✔
1225
        m_reconnect_info.scheduled_reset = false;
208✔
1226
    }
208✔
1227

1228
    m_heartbeat_timer.reset();
250✔
1229
    m_waiting_for_pong = false;
250✔
1230

1231
    initiate_ping_delay(now); // Throws
250✔
1232

1233
    if (m_client.m_roundtrip_time_handler)
250✔
1234
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1235
}
250✔
1236

1237
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1238
{
75,702✔
1239
    if (session_ident == 0) {
75,702✔
1240
        return nullptr;
×
1241
    }
×
1242

1243
    auto* sess = get_session(session_ident);
75,702✔
1244
    if (REALM_LIKELY(sess)) {
75,702✔
1245
        return sess;
75,698✔
1246
    }
75,698✔
1247
    // Check the history to see if the message received was for a previous session
1248
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
4✔
1249
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1250
        close_due_to_protocol_error(
×
1251
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1252
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1253
                          session_ident)});
×
1254
    }
×
1255
    else {
4✔
1256
        logger.error("Received %1 message for closed session, session_ident = %2", message,
4✔
1257
                     session_ident); // Throws
4✔
1258
    }
4✔
1259
    return nullptr;
4✔
1260
}
75,702✔
1261

1262
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1263
{
964✔
1264
    Session* sess = nullptr;
964✔
1265
    if (session_ident != 0) {
964✔
1266
        sess = find_and_validate_session(session_ident, "ERROR");
892✔
1267
        if (REALM_UNLIKELY(!sess)) {
892✔
1268
            return;
×
1269
        }
×
1270
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
892✔
1271
            close_due_to_protocol_error(std::move(status)); // Throws
×
1272
            return;
×
1273
        }
×
1274

1275
        if (sess->m_state == Session::Deactivated) {
892✔
1276
            finish_session_deactivation(sess);
10✔
1277
        }
10✔
1278
        return;
892✔
1279
    }
892✔
1280

1281
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
72✔
1282
                info.message, info.raw_error_code, info.is_fatal, session_ident,
72✔
1283
                info.server_requests_action); // Throws
72✔
1284

1285
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
72✔
1286
    if (REALM_LIKELY(known_error_code)) {
72✔
1287
        ProtocolError error_code = ProtocolError(info.raw_error_code);
68✔
1288
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
68✔
1289
            close_due_to_server_side_error(error_code, info); // Throws
68✔
1290
            return;
68✔
1291
        }
68✔
1292
        close_due_to_protocol_error(
×
1293
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1294
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1295
                          info.raw_error_code)});
×
1296
    }
×
1297
    else {
4✔
1298
        close_due_to_protocol_error(
4✔
1299
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1300
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1301
    }
4✔
1302
}
72✔
1303

1304

1305
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1306
                                             session_ident_type session_ident)
1307
{
20✔
1308
    if (session_ident == 0) {
20✔
1309
        return close_due_to_protocol_error(
×
1310
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1311
    }
×
1312

1313
    if (!is_flx_sync_connection()) {
20✔
1314
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1315
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1316
    }
×
1317

1318
    if (Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR")) {
20✔
1319
        sess->receive_query_error_message(raw_error_code, message, query_version);
20✔
1320
    }
20✔
1321
}
20✔
1322

1323

1324
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1325
{
3,624✔
1326
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,624✔
1327
    if (REALM_UNLIKELY(!sess)) {
3,624✔
1328
        return;
×
1329
    }
×
1330

1331
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,624✔
1332
        close_due_to_protocol_error(std::move(status)); // Throws
×
1333
}
3,624✔
1334

1335
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1336
{
49,998✔
1337
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
49,998✔
1338
    if (REALM_UNLIKELY(!sess)) {
49,998✔
1339
        return;
×
1340
    }
×
1341

1342
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
49,998✔
1343
        close_due_to_protocol_error(std::move(status));
2✔
1344
    }
2✔
1345
}
49,998✔
1346

1347
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1348
{
17,030✔
1349
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,030✔
1350
    if (REALM_UNLIKELY(!sess)) {
17,030✔
1351
        return;
×
1352
    }
×
1353

1354
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,030✔
1355
        close_due_to_protocol_error(std::move(status)); // Throws
10✔
1356
}
17,030✔
1357

1358

1359
void Connection::receive_unbound_message(session_ident_type session_ident)
1360
{
4,074✔
1361
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,074✔
1362
    if (REALM_UNLIKELY(!sess)) {
4,074✔
1363
        return;
×
1364
    }
×
1365

1366
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,074✔
1367
        close_due_to_protocol_error(std::move(status)); // Throws
×
1368
        return;
×
1369
    }
×
1370

1371
    if (sess->m_state == Session::Deactivated) {
4,074✔
1372
        finish_session_deactivation(sess);
4,074✔
1373
    }
4,074✔
1374
}
4,074✔
1375

1376

1377
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1378
                                               std::string_view body)
1379
{
64✔
1380
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
64✔
1381
    if (REALM_UNLIKELY(!sess)) {
64✔
1382
        return;
×
1383
    }
×
1384

1385
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
64✔
1386
        close_due_to_protocol_error(std::move(status));
×
1387
    }
×
1388
}
64✔
1389

1390

1391
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1392
                                            std::string_view message)
1393
{
5,982✔
1394
    if (session_ident != 0) {
5,982✔
1395
        if (auto sess = get_session(session_ident)) {
3,958✔
1396
            sess->logger.log(LogCategory::session, level, "Server log: %1", message);
3,952✔
1397
            return;
3,952✔
1398
        }
3,952✔
1399

1400
        logger.log(util::LogCategory::session, level, "Server log for unknown session %1: %2", session_ident,
6✔
1401
                   message);
6✔
1402
        return;
6✔
1403
    }
3,958✔
1404

1405
    logger.log(level, "Server log: %1", message);
2,024✔
1406
}
2,024✔
1407

1408

1409
void Connection::receive_appservices_request_id(std::string_view coid)
1410
{
5,672✔
1411
    if (coid.empty() || !m_appservices_coid.empty()) {
5,672✔
1412
        return;
2,060✔
1413
    }
2,060✔
1414
    m_appservices_coid = coid;
3,612✔
1415
    logger.log(util::LogCategory::session, util::LogCategory::Level::info,
3,612✔
1416
               "Connected to app services with request id: \"%1\". Further log entries for this connection will be "
3,612✔
1417
               "prefixed with \"Connection[%2:%1]\" instead of \"Connection[%2]\"",
3,612✔
1418
               m_appservices_coid, m_ident);
3,612✔
1419
    logger.base_logger = make_logger(m_ident, m_appservices_coid, get_client().logger.base_logger);
3,612✔
1420

1421
    for (auto& [ident, sess] : m_sessions) {
4,752✔
1422
        sess->logger.base_logger = Session::make_logger(ident, logger.base_logger);
4,752✔
1423
    }
4,752✔
1424
}
3,612✔
1425

1426

1427
void Connection::handle_protocol_error(Status status)
1428
{
×
1429
    close_due_to_protocol_error(std::move(status));
×
1430
}
×
1431

1432

1433
// Sessions are guaranteed to be granted the opportunity to send a message in
1434
// the order that they enlist. Note that this is important to ensure
1435
// nonoverlapping communication with the server for consecutive sessions
1436
// associated with the same Realm file.
1437
//
1438
// CAUTION: The specified session may get destroyed before this function
1439
// returns, but only if its Session::send_message() puts it into the Deactivated
1440
// state.
1441
void Connection::enlist_to_send(Session* sess)
1442
{
177,128✔
1443
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
177,128✔
1444
    m_sessions_enlisted_to_send.push_back(sess); // Throws
177,128✔
1445
    if (!m_sending)
177,128✔
1446
        send_next_message(); // Throws
75,688✔
1447
}
177,128✔
1448

1449

1450
std::string Connection::get_active_appservices_connection_id()
1451
{
76✔
1452
    return m_appservices_coid;
76✔
1453
}
76✔
1454

1455
void Session::cancel_resumption_delay()
1456
{
4,296✔
1457
    REALM_ASSERT_EX(m_state == Active, m_state);
4,296✔
1458

1459
    if (!m_suspended)
4,296✔
1460
        return;
4,122✔
1461

1462
    m_suspended = false;
174✔
1463

1464
    logger.debug("Resumed"); // Throws
174✔
1465

1466
    if (unbind_process_complete())
174✔
1467
        initiate_rebind(); // Throws
138✔
1468

1469
    try {
174✔
1470
        process_pending_flx_bootstrap(); // throws
174✔
1471
    }
174✔
1472
    catch (const IntegrationException& error) {
174✔
1473
        on_integration_failure(error);
×
1474
    }
×
1475
    catch (...) {
174✔
1476
        on_integration_failure(IntegrationException(exception_to_status()));
×
1477
    }
×
1478

1479
    m_conn.one_more_active_unsuspended_session(); // Throws
174✔
1480
    if (m_try_again_activation_timer) {
174✔
1481
        m_try_again_activation_timer.reset();
8✔
1482
    }
8✔
1483

1484
    on_resumed(); // Throws
174✔
1485
}
174✔
1486

1487

1488
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1489
                                                 std::vector<ProtocolErrorInfo>* out)
1490
{
23,564✔
1491
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,564✔
1492
        return;
23,508✔
1493
    }
23,508✔
1494

1495
#ifdef REALM_DEBUG
56✔
1496
    REALM_ASSERT_DEBUG(
56✔
1497
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
56✔
1498
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
56✔
1499
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
56✔
1500
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
56✔
1501
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
56✔
1502
                       }));
56✔
1503
#endif
56✔
1504

1505
    while (!m_pending_compensating_write_errors.empty() &&
112✔
1506
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
112✔
1507
               changesets.back().version) {
56✔
1508
        auto& cur_error = m_pending_compensating_write_errors.front();
56✔
1509
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
56✔
1510
        out->push_back(std::move(cur_error));
56✔
1511
        m_pending_compensating_write_errors.pop_front();
56✔
1512
    }
56✔
1513
}
56✔
1514

1515

1516
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1517
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1518
                                   DownloadBatchState download_batch_state)
1519
{
45,446✔
1520
    auto& history = get_history();
45,446✔
1521
    if (received_changesets.empty()) {
45,446✔
1522
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,858✔
1523
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1524
                                       "received empty download message that was not the last in batch",
×
1525
                                       ProtocolError::bad_progress);
×
1526
        }
×
1527
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,858✔
1528
        return;
21,858✔
1529
    }
21,858✔
1530

1531
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,588✔
1532
    auto transact = get_db()->start_read();
23,588✔
1533
    history.integrate_server_changesets(
23,588✔
1534
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,588✔
1535
        [&](const Transaction&, util::Span<Changeset> changesets) {
23,588✔
1536
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,564✔
1537
        }); // Throws
23,564✔
1538
    if (received_changesets.size() == 1) {
23,588✔
1539
        logger.debug("1 remote changeset integrated, producing client version %1",
15,648✔
1540
                     version_info.sync_version.version); // Throws
15,648✔
1541
    }
15,648✔
1542
    else {
7,940✔
1543
        logger.debug("%2 remote changesets integrated, producing client version %1",
7,940✔
1544
                     version_info.sync_version.version, received_changesets.size()); // Throws
7,940✔
1545
    }
7,940✔
1546

1547
    for (const auto& pending_error : pending_compensating_write_errors) {
23,588✔
1548
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
56✔
1549
                    pending_error.compensating_write_rejected_client_version,
56✔
1550
                    *pending_error.compensating_write_server_version, pending_error.message);
56✔
1551
        try {
56✔
1552
            on_connection_state_changed(
56✔
1553
                m_conn.get_state(),
56✔
1554
                SessionErrorInfo{pending_error,
56✔
1555
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
56✔
1556
                                                          pending_error.message)});
56✔
1557
        }
56✔
1558
        catch (...) {
56✔
1559
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1560
        }
×
1561
    }
56✔
1562
}
23,588✔
1563

1564

1565
void Session::on_integration_failure(const IntegrationException& error)
1566
{
40✔
1567
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1568
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1569
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1570

1571
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1572
    m_error_to_send = true;
40✔
1573
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1574
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1575
    // Surface the error to the user otherwise is lost.
1576
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1577

1578
    // Since the deactivation process has not been initiated, the UNBIND
1579
    // message cannot have been sent unless an ERROR message was received.
1580
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1581
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1582
        ensure_enlisted_to_send(); // Throws
36✔
1583
    }
36✔
1584
}
40✔
1585

1586
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1587
{
47,812✔
1588
    REALM_ASSERT_EX(m_state == Active, m_state);
47,812✔
1589
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,812✔
1590

1591
    m_download_progress = progress.download;
47,812✔
1592
    m_progress = progress;
47,812✔
1593

1594
    if (progress.upload.client_version > m_upload_progress.client_version)
47,812✔
1595
        m_upload_progress = progress.upload;
584✔
1596

1597
    do_recognize_sync_version(client_version); // Allows upload process to resume
47,812✔
1598

1599
    check_for_download_completion(); // Throws
47,812✔
1600

1601
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1602
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,812✔
1603
        auto& flx_subscription_store = *get_flx_subscription_store();
3,926✔
1604
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,926✔
1605
    }
3,926✔
1606

1607
    // Since the deactivation process has not been initiated, the UNBIND
1608
    // message cannot have been sent unless an ERROR message was received.
1609
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,812✔
1610
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,812✔
1611
        ensure_enlisted_to_send(); // Throws
47,804✔
1612
    }
47,804✔
1613
}
47,812✔
1614

1615

1616
Session::~Session()
1617
{
10,408✔
1618
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1619
}
10,408✔
1620

1621

1622
std::shared_ptr<util::Logger> Session::make_logger(session_ident_type ident,
1623
                                                   std::shared_ptr<util::Logger> base_logger)
1624
{
17,330✔
1625
    auto prefix = util::format("Session[%1]: ", ident);
17,330✔
1626
    return std::make_shared<util::PrefixLogger>(util::LogCategory::session, std::move(prefix),
17,330✔
1627
                                                std::move(base_logger));
17,330✔
1628
}
17,330✔
1629

1630
void Session::activate()
1631
{
10,410✔
1632
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,410✔
1633

1634
    logger.debug("Activating"); // Throws
10,410✔
1635

1636
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,410✔
1637
        bool file_exists = util::File::exists(get_realm_path());
10,408✔
1638

1639
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,408✔
1640
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,408✔
1641
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,408✔
1642
    }
10,408✔
1643
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,410✔
1644
                 m_client_file_ident.salt); // Throws
10,410✔
1645
    m_upload_progress = m_progress.upload;
10,410✔
1646
    m_download_progress = m_progress.download;
10,410✔
1647
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,410✔
1648
    init_progress_handler();
10,410✔
1649

1650
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,410✔
1651
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,410✔
1652
    logger.debug("progress_download_client_version = %1",
10,410✔
1653
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,410✔
1654
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,410✔
1655
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,410✔
1656

1657
    reset_protocol_state();
10,410✔
1658
    m_state = Active;
10,410✔
1659

1660
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,410✔
1661

1662
    REALM_ASSERT(!m_suspended);
10,410✔
1663
    m_conn.one_more_active_unsuspended_session(); // Throws
10,410✔
1664

1665
    try {
10,410✔
1666
        process_pending_flx_bootstrap(); // throws
10,410✔
1667
    }
10,410✔
1668
    catch (const IntegrationException& error) {
10,410✔
1669
        on_integration_failure(error);
×
1670
    }
×
1671
    catch (...) {
10,410✔
1672
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1673
    }
4✔
1674

1675
    // Checks if there is a pending client reset
1676
    handle_pending_client_reset_acknowledgement();
10,410✔
1677
}
10,406✔
1678

1679

1680
// The caller (Connection) must discard the session if the session has become
1681
// deactivated upon return.
1682
void Session::initiate_deactivation()
1683
{
10,410✔
1684
    REALM_ASSERT_EX(m_state == Active, m_state);
10,410✔
1685

1686
    logger.debug("Initiating deactivation"); // Throws
10,410✔
1687

1688
    m_state = Deactivating;
10,410✔
1689

1690
    if (!m_suspended)
10,410✔
1691
        m_conn.one_less_active_unsuspended_session(); // Throws
9,748✔
1692

1693
    if (m_enlisted_to_send) {
10,410✔
1694
        REALM_ASSERT(!unbind_process_complete());
5,396✔
1695
        return;
5,396✔
1696
    }
5,396✔
1697

1698
    // Deactivate immediately if the BIND message has not yet been sent and the
1699
    // session is not enlisted to send, or if the unbinding process has already
1700
    // completed.
1701
    if (!m_bind_message_sent || unbind_process_complete()) {
5,014✔
1702
        complete_deactivation(); // Throws
952✔
1703
        // Life cycle state is now Deactivated
1704
        return;
952✔
1705
    }
952✔
1706

1707
    // Ready to send the UNBIND message, if it has not already been sent
1708
    if (!m_unbind_message_sent) {
4,062✔
1709
        enlist_to_send(); // Throws
3,852✔
1710
        return;
3,852✔
1711
    }
3,852✔
1712
}
4,062✔
1713

1714

1715
void Session::complete_deactivation()
1716
{
10,410✔
1717
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,410✔
1718
    m_state = Deactivated;
10,410✔
1719

1720
    logger.debug("Deactivation completed"); // Throws
10,410✔
1721
}
10,410✔
1722

1723

1724
// Called by the associated Connection object when this session is granted an
1725
// opportunity to send a message.
1726
//
1727
// The caller (Connection) must discard the session if the session has become
1728
// deactivated upon return.
1729
void Session::send_message()
1730
{
175,574✔
1731
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
175,574✔
1732
    REALM_ASSERT(m_enlisted_to_send);
175,574✔
1733
    m_enlisted_to_send = false;
175,574✔
1734
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
175,574✔
1735
        // Deactivation has been initiated. If the UNBIND message has not been
1736
        // sent yet, there is no point in sending it. Instead, we can let the
1737
        // deactivation process complete.
1738
        if (!m_bind_message_sent) {
9,716✔
1739
            return complete_deactivation(); // Throws
2,896✔
1740
            // Life cycle state is now Deactivated
1741
        }
2,896✔
1742

1743
        // Session life cycle state is Deactivating or the unbinding process has
1744
        // been initiated by a session specific ERROR message
1745
        if (!m_unbind_message_sent)
6,820✔
1746
            send_unbind_message(); // Throws
6,820✔
1747
        return;
6,820✔
1748
    }
9,716✔
1749

1750
    // Session life cycle state is Active and the unbinding process has
1751
    // not been initiated
1752
    REALM_ASSERT(!m_unbind_message_sent);
165,858✔
1753

1754
    if (!m_bind_message_sent)
165,858✔
1755
        return send_bind_message(); // Throws
9,332✔
1756

1757
    // Pending test commands can be sent any time after the BIND message is sent
1758
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
156,526✔
1759
                                                      [](const PendingTestCommand& command) {
156,526✔
1760
                                                          return command.pending;
158✔
1761
                                                      });
158✔
1762
    if (has_pending_test_command) {
156,526✔
1763
        return send_test_command_message();
64✔
1764
    }
64✔
1765

1766
    if (!m_ident_message_sent) {
156,462✔
1767
        if (have_client_file_ident())
7,832✔
1768
            send_ident_message(); // Throws
7,832✔
1769
        return;
7,832✔
1770
    }
7,832✔
1771

1772
    if (m_error_to_send)
148,630✔
1773
        return send_json_error_message(); // Throws
32✔
1774

1775
    // Stop sending upload, mark and query messages when the client detects an error.
1776
    if (m_client_error) {
148,598✔
1777
        return;
12✔
1778
    }
12✔
1779

1780
    if (m_target_download_mark > m_last_download_mark_sent)
148,586✔
1781
        return send_mark_message(); // Throws
17,902✔
1782

1783
    auto is_upload_allowed = [&]() -> bool {
130,692✔
1784
        if (!m_is_flx_sync_session) {
130,692✔
1785
            return true;
109,778✔
1786
        }
109,778✔
1787

1788
        auto migration_store = get_migration_store();
20,914✔
1789
        if (!migration_store) {
20,914✔
1790
            return true;
×
1791
        }
×
1792

1793
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
20,914✔
1794
        if (!sentinel_query_version) {
20,914✔
1795
            return true;
20,884✔
1796
        }
20,884✔
1797

1798
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1799
        return m_last_sent_flx_query_version != *sentinel_query_version;
30✔
1800
    };
20,914✔
1801

1802
    if (!is_upload_allowed()) {
130,684✔
1803
        return;
16✔
1804
    }
16✔
1805

1806
    auto check_pending_flx_version = [&]() -> bool {
130,676✔
1807
        if (!m_is_flx_sync_session) {
130,676✔
1808
            return false;
109,778✔
1809
        }
109,778✔
1810

1811
        if (m_delay_uploads) {
20,898✔
1812
            return false;
3,062✔
1813
        }
3,062✔
1814

1815
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
17,836✔
1816

1817
        if (!m_pending_flx_sub_set) {
17,836✔
1818
            return false;
15,360✔
1819
        }
15,360✔
1820

1821
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1822
        // of a pending subscription
1823
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,476✔
1824
    };
17,836✔
1825

1826
    if (check_pending_flx_version()) {
130,668✔
1827
        return send_query_change_message(); // throws
1,376✔
1828
    }
1,376✔
1829

1830
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
129,292✔
1831
        return send_upload_message(); // Throws
61,342✔
1832
    }
61,342✔
1833
}
129,292✔
1834

1835

1836
void Session::send_bind_message()
1837
{
9,332✔
1838
    REALM_ASSERT_EX(m_state == Active, m_state);
9,332✔
1839

1840
    session_ident_type session_ident = m_ident;
9,332✔
1841
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1842
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1843
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1844
    // to the server.
1845
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
9,332✔
1846
    const bool is_subserver = false;
9,332✔
1847

1848
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,332✔
1849
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,332✔
1850
    OutputBuffer& out = m_conn.get_output_buffer();
9,332✔
1851
    // Discard the token since it's ignored by the server.
1852
    std::string empty_access_token;
9,332✔
1853
    if (m_is_flx_sync_session) {
9,332✔
1854
        nlohmann::json bind_json_data;
1,894✔
1855
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,894✔
1856
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1857
        }
60✔
1858
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,894✔
1859
        auto schema_version = get_schema_version();
1,894✔
1860
        // Send 0 if schema is not versioned.
1861
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,894✔
1862
        if (logger.would_log(util::Logger::Level::debug)) {
1,894✔
1863
            std::string json_data_dump;
1,894✔
1864
            if (!bind_json_data.empty()) {
1,894✔
1865
                json_data_dump = bind_json_data.dump();
1,894✔
1866
            }
1,894✔
1867
            logger.debug(
1,894✔
1868
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,894✔
1869
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,894✔
1870
        }
1,894✔
1871
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,894✔
1872
                                       need_client_file_ident, is_subserver); // Throws
1,894✔
1873
    }
1,894✔
1874
    else {
7,438✔
1875
        std::string server_path = get_virt_path();
7,438✔
1876
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,438✔
1877
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,438✔
1878
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,438✔
1879
                                       need_client_file_ident, is_subserver); // Throws
7,438✔
1880
    }
7,438✔
1881
    m_conn.initiate_write_message(out, this); // Throws
9,332✔
1882

1883
    m_bind_message_sent = true;
9,332✔
1884
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
9,332✔
1885

1886
    // If there is a pending client reset diff, process that when the BIND message has
1887
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1888
    // ready to send the IDENT message if the file identifier pair is already available.
1889
    if (!need_client_file_ident)
9,332✔
1890
        enlist_to_send(); // Throws
5,506✔
1891
}
9,332✔
1892

1893

1894
void Session::send_ident_message()
1895
{
7,832✔
1896
    REALM_ASSERT_EX(m_state == Active, m_state);
7,832✔
1897
    REALM_ASSERT(m_bind_message_sent);
7,832✔
1898
    REALM_ASSERT(!m_unbind_message_sent);
7,832✔
1899
    REALM_ASSERT(have_client_file_ident());
7,832✔
1900

1901
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,832✔
1902
    OutputBuffer& out = m_conn.get_output_buffer();
7,832✔
1903
    session_ident_type session_ident = m_ident;
7,832✔
1904

1905
    if (m_is_flx_sync_session) {
7,832✔
1906
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,796✔
1907
        const auto active_query_body = active_query_set.to_ext_json();
1,796✔
1908
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,796✔
1909
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,796✔
1910
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,796✔
1911
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,796✔
1912
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,796✔
1913
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,796✔
1914
                     active_query_body); // Throws
1,796✔
1915
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,796✔
1916
                                        active_query_set.version(), active_query_body); // Throws
1,796✔
1917
        m_last_sent_flx_query_version = active_query_set.version();
1,796✔
1918
    }
1,796✔
1919
    else {
6,036✔
1920
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,036✔
1921
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,036✔
1922
                     "latest_server_version_salt=%6)",
6,036✔
1923
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,036✔
1924
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,036✔
1925
                     m_progress.latest_server_version.salt);                                  // Throws
6,036✔
1926
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
6,036✔
1927
    }
6,036✔
1928
    m_conn.initiate_write_message(out, this); // Throws
7,832✔
1929

1930
    m_ident_message_sent = true;
7,832✔
1931
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,832✔
1932

1933
    // Other messages may be waiting to be sent
1934
    enlist_to_send(); // Throws
7,832✔
1935
}
7,832✔
1936

1937
void Session::send_query_change_message()
1938
{
1,376✔
1939
    REALM_ASSERT_EX(m_state == Active, m_state);
1,376✔
1940
    REALM_ASSERT(m_ident_message_sent);
1,376✔
1941
    REALM_ASSERT(!m_unbind_message_sent);
1,376✔
1942
    REALM_ASSERT(m_pending_flx_sub_set);
1,376✔
1943
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,376✔
1944

1945
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,376✔
1946
        return;
×
1947
    }
×
1948

1949
    auto sub_store = get_flx_subscription_store();
1,376✔
1950
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,376✔
1951
    auto latest_queries = latest_sub_set.to_ext_json();
1,376✔
1952
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,376✔
1953
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,376✔
1954

1955
    OutputBuffer& out = m_conn.get_output_buffer();
1,376✔
1956
    session_ident_type session_ident = get_ident();
1,376✔
1957
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,376✔
1958
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,376✔
1959
    m_conn.initiate_write_message(out, this);
1,376✔
1960

1961
    m_last_sent_flx_query_version = latest_sub_set.version();
1,376✔
1962

1963
    request_download_completion_notification();
1,376✔
1964
}
1,376✔
1965

1966
void Session::send_upload_message()
1967
{
61,340✔
1968
    REALM_ASSERT_EX(m_state == Active, m_state);
61,340✔
1969
    REALM_ASSERT(m_ident_message_sent);
61,340✔
1970
    REALM_ASSERT(!m_unbind_message_sent);
61,340✔
1971

1972
    if (REALM_UNLIKELY(get_client().is_dry_run()))
61,340✔
1973
        return;
×
1974

1975
    version_type target_upload_version = m_last_version_available;
61,340✔
1976
    if (m_pending_flx_sub_set) {
61,340✔
1977
        REALM_ASSERT(m_is_flx_sync_session);
1,098✔
1978
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,098✔
1979
    }
1,098✔
1980

1981
    bool server_version_to_ack =
61,340✔
1982
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
61,340✔
1983

1984
    std::vector<UploadChangeset> uploadable_changesets;
61,340✔
1985
    version_type locked_server_version = 0;
61,340✔
1986
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
61,340✔
1987
                                             locked_server_version); // Throws
61,340✔
1988

1989
    if (uploadable_changesets.empty()) {
61,340✔
1990
        // Nothing more to upload right now if:
1991
        //  1. We need to limit upload up to some version other than the last client version
1992
        //     available and there are no changes to upload
1993
        //  2. There are no changes to upload and no server version(s) to acknowledge
1994
        if (m_pending_flx_sub_set || !server_version_to_ack) {
31,758✔
1995
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
5,360✔
1996
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
5,360✔
1997
            // Other messages may be waiting to be sent
1998
            return enlist_to_send(); // Throws
5,360✔
1999
        }
5,360✔
2000
    }
31,758✔
2001

2002
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
55,980✔
2003
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
748✔
2004
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
748✔
2005
    }
748✔
2006

2007
    version_type progress_client_version = m_upload_progress.client_version;
55,980✔
2008
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
55,980✔
2009

2010
    if (!upload_messages_allowed()) {
55,980✔
2011
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
604✔
2012
                     "locked_server_version=%3, num_changesets=%4)",
604✔
2013
                     progress_client_version, progress_server_version, locked_server_version,
604✔
2014
                     uploadable_changesets.size()); // Throws
604✔
2015
        // Other messages may be waiting to be sent
2016
        return enlist_to_send(); // Throws
604✔
2017
    }
604✔
2018

2019
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,376✔
2020
                 "locked_server_version=%3, num_changesets=%4)",
55,376✔
2021
                 progress_client_version, progress_server_version, locked_server_version,
55,376✔
2022
                 uploadable_changesets.size()); // Throws
55,376✔
2023

2024
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,376✔
2025
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,376✔
2026

2027
    for (const UploadChangeset& uc : uploadable_changesets) {
55,376✔
2028
        logger.debug(util::LogCategory::changeset,
43,414✔
2029
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,414✔
2030
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,414✔
2031
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,414✔
2032
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,414✔
2033
        if (logger.would_log(util::Logger::Level::trace)) {
43,414✔
2034
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2035
            if (changeset_data.size() < 1024) {
×
2036
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2037
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2038
            }
×
2039
            else {
×
2040
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2041
                             protocol.compressed_hex_dump(changeset_data));
×
2042
            }
×
2043

2044
#if REALM_DEBUG
×
2045
            ChunkedBinaryInputStream in{changeset_data};
×
2046
            Changeset log;
×
2047
            try {
×
2048
                parse_changeset(in, log);
×
2049
                std::stringstream ss;
×
2050
                log.print(ss);
×
2051
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2052
            }
×
2053
            catch (const BadChangesetError& err) {
×
2054
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2055
            }
×
2056
#endif
×
2057
        }
×
2058

2059
        {
43,414✔
2060
            upload_message_builder.add_changeset(uc.progress.client_version,
43,414✔
2061
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,414✔
2062
                                                 uc.origin_file_ident,
43,414✔
2063
                                                 uc.changeset); // Throws
43,414✔
2064
        }
43,414✔
2065
    }
43,414✔
2066

2067
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,376✔
2068
    OutputBuffer& out = m_conn.get_output_buffer();
55,376✔
2069
    session_ident_type session_ident = get_ident();
55,376✔
2070
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,376✔
2071
                                               progress_server_version,
55,376✔
2072
                                               locked_server_version); // Throws
55,376✔
2073
    m_conn.initiate_write_message(out, this);                          // Throws
55,376✔
2074

2075
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
55,376✔
2076

2077
    // Other messages may be waiting to be sent
2078
    enlist_to_send(); // Throws
55,376✔
2079
}
55,376✔
2080

2081

2082
void Session::send_mark_message()
2083
{
17,902✔
2084
    REALM_ASSERT_EX(m_state == Active, m_state);
17,902✔
2085
    REALM_ASSERT(m_ident_message_sent);
17,902✔
2086
    REALM_ASSERT(!m_unbind_message_sent);
17,902✔
2087
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,902✔
2088

2089
    request_ident_type request_ident = m_target_download_mark;
17,902✔
2090
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,902✔
2091

2092
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,902✔
2093
    OutputBuffer& out = m_conn.get_output_buffer();
17,902✔
2094
    session_ident_type session_ident = get_ident();
17,902✔
2095
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,902✔
2096
    m_conn.initiate_write_message(out, this);                      // Throws
17,902✔
2097

2098
    m_last_download_mark_sent = request_ident;
17,902✔
2099

2100
    // Other messages may be waiting to be sent
2101
    enlist_to_send(); // Throws
17,902✔
2102
}
17,902✔
2103

2104

2105
void Session::send_unbind_message()
2106
{
6,820✔
2107
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,820✔
2108
    REALM_ASSERT(m_bind_message_sent);
6,820✔
2109
    REALM_ASSERT(!m_unbind_message_sent);
6,820✔
2110

2111
    logger.debug("Sending: UNBIND"); // Throws
6,820✔
2112

2113
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,820✔
2114
    OutputBuffer& out = m_conn.get_output_buffer();
6,820✔
2115
    session_ident_type session_ident = get_ident();
6,820✔
2116
    protocol.make_unbind_message(out, session_ident); // Throws
6,820✔
2117
    m_conn.initiate_write_message(out, this);         // Throws
6,820✔
2118

2119
    m_unbind_message_sent = true;
6,820✔
2120
}
6,820✔
2121

2122

2123
void Session::send_json_error_message()
2124
{
32✔
2125
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2126
    REALM_ASSERT(m_ident_message_sent);
32✔
2127
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2128
    REALM_ASSERT(m_error_to_send);
32✔
2129
    REALM_ASSERT(m_client_error);
32✔
2130

2131
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2132
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2133
    session_ident_type session_ident = get_ident();
32✔
2134
    auto protocol_error = m_client_error->error_for_server;
32✔
2135

2136
    auto message = util::format("%1", m_client_error->to_status());
32✔
2137
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2138
                session_ident); // Throws
32✔
2139

2140
    nlohmann::json error_body_json;
32✔
2141
    error_body_json["message"] = std::move(message);
32✔
2142
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2143
                                     error_body_json.dump()); // Throws
32✔
2144
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2145

2146
    m_error_to_send = false;
32✔
2147
    enlist_to_send(); // Throws
32✔
2148
}
32✔
2149

2150

2151
void Session::send_test_command_message()
2152
{
64✔
2153
    REALM_ASSERT_EX(m_state == Active, m_state);
64✔
2154

2155
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2156
                           [](const PendingTestCommand& command) {
68✔
2157
                               return command.pending;
68✔
2158
                           });
68✔
2159
    REALM_ASSERT(it != m_pending_test_commands.end());
64✔
2160

2161
    ClientProtocol& protocol = m_conn.get_client_protocol();
64✔
2162
    OutputBuffer& out = m_conn.get_output_buffer();
64✔
2163
    auto session_ident = get_ident();
64✔
2164

2165
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
64✔
2166
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
64✔
2167

2168
    m_conn.initiate_write_message(out, this); // Throws;
64✔
2169
    it->pending = false;
64✔
2170

2171
    enlist_to_send();
64✔
2172
}
64✔
2173

2174
bool Session::client_reset_if_needed()
2175
{
424✔
2176
    // Even if we end up not actually performing a client reset, consume the
2177
    // config to ensure that the resources it holds are released
2178
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2179
    if (!client_reset_config) {
424✔
2180
        return false;
×
2181
    }
×
2182

2183
    // Save a copy of the status and action in case an error/exception occurs
2184
    Status cr_status = client_reset_config->error;
424✔
2185
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2186

2187
    try {
424✔
2188
        // The file ident from the fresh realm will be copied over to the local realm
2189
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2190
                                                            get_flx_subscription_store());
424✔
2191

2192
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2193
        if (!did_reset) {
424✔
2194
            return false;
×
2195
        }
×
2196
    }
424✔
2197
    catch (const std::exception& e) {
424✔
2198
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2199
                                    cr_status, e.what());
80✔
2200
        logger.error(err_msg.c_str());
80✔
2201
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2202
        suspend(err_info);
80✔
2203
        return false;
80✔
2204
    }
80✔
2205

2206
    // The fresh Realm has been used to reset the state
2207
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2208

2209
    // Update the version, file ident and progress info after the client reset diff is done
2210
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2211
    // Print the version/progress information before performing the asserts
2212
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2213
                 m_client_file_ident.salt);                                // Throws
344✔
2214
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2215
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2216
                 m_progress.upload.client_version,
344✔
2217
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2218
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2219
                 m_progress.download.last_integrated_client_version,
344✔
2220
                 m_progress.download.server_version); // Throws
344✔
2221

2222
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2223
                    m_progress.download.last_integrated_client_version);
344✔
2224
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2225

2226
    m_upload_progress = m_progress.upload;
344✔
2227
    m_download_progress = m_progress.download;
344✔
2228
    init_progress_handler();
344✔
2229
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2230
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2231
    // For both, we want to allow uploads again without needing external changes to download first.
2232
    m_delay_uploads = false;
344✔
2233

2234
    // Checks if there is a pending client reset
2235
    handle_pending_client_reset_acknowledgement();
344✔
2236

2237
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2238
    if (auto migration_store = get_migration_store()) {
344✔
2239
        migration_store->complete_migration_or_rollback();
316✔
2240
    }
316✔
2241

2242
    return true;
344✔
2243
}
424✔
2244

2245
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2246
{
3,624✔
2247
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,624✔
2248
                 client_file_ident.salt); // Throws
3,624✔
2249

2250
    // Ignore the message if the deactivation process has been initiated,
2251
    // because in that case, the associated Realm and SessionWrapper must
2252
    // not be accessed any longer.
2253
    if (m_state != Active)
3,624✔
2254
        return Status::OK(); // Success
84✔
2255

2256
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,540✔
2257
                               !m_unbound_message_received);
3,540✔
2258
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,540✔
2259
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2260
    }
×
2261
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,540✔
2262
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2263
    }
×
2264
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,540✔
2265
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2266
    }
×
2267

2268
    m_client_file_ident = client_file_ident;
3,540✔
2269

2270
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,540✔
2271
        // Ready to send the IDENT message
2272
        ensure_enlisted_to_send(); // Throws
×
2273
        return Status::OK();       // Success
×
2274
    }
×
2275

2276
    get_history().set_client_file_ident(client_file_ident,
3,540✔
2277
                                        m_fix_up_object_ids); // Throws
3,540✔
2278
    m_progress.download.last_integrated_client_version = 0;
3,540✔
2279
    m_progress.upload.client_version = 0;
3,540✔
2280

2281
    // Ready to send the IDENT message
2282
    ensure_enlisted_to_send(); // Throws
3,540✔
2283
    return Status::OK();       // Success
3,540✔
2284
}
3,540✔
2285

2286
Status Session::receive_download_message(const DownloadMessage& message)
2287
{
49,996✔
2288
    // Ignore the message if the deactivation process has been initiated,
2289
    // because in that case, the associated Realm and SessionWrapper must
2290
    // not be accessed any longer.
2291
    if (m_state != Active)
49,996✔
2292
        return Status::OK();
570✔
2293

2294
    bool is_flx = m_conn.is_flx_sync_connection();
49,426✔
2295
    int64_t query_version = is_flx ? *message.query_version : 0;
49,426✔
2296

2297
    if (!is_flx || query_version > 0)
49,426✔
2298
        enable_progress_notifications();
47,472✔
2299

2300
    auto&& progress = message.progress;
49,426✔
2301
    if (is_flx) {
49,426✔
2302
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,512✔
2303
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,512✔
2304
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,512✔
2305
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,512✔
2306
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,512✔
2307
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,512✔
2308
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,512✔
2309
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,512✔
2310
                     message.changesets.size()); // Throws
5,512✔
2311
    }
5,512✔
2312
    else {
43,914✔
2313
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,914✔
2314
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,914✔
2315
                     "upload_client_version=%5, upload_server_version=%6, "
43,914✔
2316
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,914✔
2317
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,914✔
2318
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,914✔
2319
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,914✔
2320
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,914✔
2321
    }
43,914✔
2322

2323
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2324
    // changeset over and over again.
2325
    if (m_client_error) {
49,426✔
2326
        logger.debug("Ignoring download message because the client detected an integration error");
×
2327
        return Status::OK();
×
2328
    }
×
2329

2330
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
49,426✔
2331
    if (REALM_UNLIKELY(!legal_at_this_time)) {
49,426✔
2332
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
2✔
2333
    }
2✔
2334
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
49,424✔
2335
        logger.error("Bad sync progress received (%1)", status);
×
2336
        return status;
×
2337
    }
×
2338

2339
    version_type server_version = m_progress.download.server_version;
49,424✔
2340
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
49,424✔
2341
    for (const RemoteChangeset& changeset : message.changesets) {
50,886✔
2342
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2343
        // version must be increasing, but can stay the same during bootstraps.
2344
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
47,978✔
2345
                                                         : (changeset.remote_version > server_version);
47,978✔
2346
        // Each server version cannot be greater than the one in the header of the download message.
2347
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
47,978✔
2348
        if (!good_server_version) {
47,978✔
2349
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2350
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2351
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2352
        }
×
2353
        server_version = changeset.remote_version;
47,978✔
2354

2355
        // Check that per-changeset last integrated client version is "weakly"
2356
        // increasing.
2357
        bool good_client_version =
47,978✔
2358
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
47,978✔
2359
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
47,978✔
2360
        if (!good_client_version) {
47,978✔
2361
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2362
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2363
                                 "(%1, %2, %3)",
×
2364
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2365
                                 progress.download.last_integrated_client_version)};
×
2366
        }
×
2367
        last_integrated_client_version = changeset.last_integrated_local_version;
47,978✔
2368
        // Server shouldn't send our own changes, and zero is not a valid client
2369
        // file identifier.
2370
        bool good_file_ident =
47,978✔
2371
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
47,978✔
2372
        if (!good_file_ident) {
47,978✔
2373
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2374
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2375
                                 changeset.origin_file_ident)};
×
2376
        }
×
2377
    }
47,978✔
2378

2379
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
49,424✔
2380
                                       message.batch_state, message.changesets.size());
49,424✔
2381
    if (hook_action == SyncClientHookAction::EarlyReturn) {
49,424✔
2382
        return Status::OK();
24✔
2383
    }
24✔
2384
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
49,400✔
2385

2386
    if (process_flx_bootstrap_message(message)) {
49,400✔
2387
        clear_resumption_delay_state();
3,956✔
2388
        return Status::OK();
3,956✔
2389
    }
3,956✔
2390

2391
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
45,444✔
2392
                                  message.changesets); // Throws
45,444✔
2393

2394
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
45,444✔
2395
                                  message.batch_state, message.changesets.size());
45,444✔
2396
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,444✔
2397
        return Status::OK();
×
2398
    }
×
2399
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,444✔
2400

2401
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2402
    // after a retryable session error.
2403
    clear_resumption_delay_state();
45,444✔
2404
    return Status::OK();
45,444✔
2405
}
45,444✔
2406

2407
Status Session::receive_mark_message(request_ident_type request_ident)
2408
{
17,030✔
2409
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,030✔
2410

2411
    // Ignore the message if the deactivation process has been initiated,
2412
    // because in that case, the associated Realm and SessionWrapper must
2413
    // not be accessed any longer.
2414
    if (m_state != Active)
17,030✔
2415
        return Status::OK(); // Success
64✔
2416

2417
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,966✔
2418
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,966✔
2419
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
10✔
2420
    }
10✔
2421
    bool good_request_ident =
16,956✔
2422
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,956✔
2423
    if (REALM_UNLIKELY(!good_request_ident)) {
16,956✔
2424
        return {
×
2425
            ErrorCodes::SyncProtocolInvariantFailed,
×
2426
            util::format(
×
2427
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2428
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2429
    }
×
2430

2431
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,956✔
2432
    m_last_download_mark_received = request_ident;
16,956✔
2433
    check_for_download_completion(); // Throws
16,956✔
2434

2435
    return Status::OK(); // Success
16,956✔
2436
}
16,956✔
2437

2438

2439
// The caller (Connection) must discard the session if the session has become
2440
// deactivated upon return.
2441
Status Session::receive_unbound_message()
2442
{
4,074✔
2443
    logger.debug("Received: UNBOUND");
4,074✔
2444

2445
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,074✔
2446
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,074✔
2447
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2448
    }
×
2449

2450
    // The fact that the UNBIND message has been sent, but an ERROR message has
2451
    // not been received, implies that the deactivation process must have been
2452
    // initiated, so this session must be in the Deactivating state or the session
2453
    // has been suspended because of a client side error.
2454
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,074!
2455

2456
    m_unbound_message_received = true;
4,074✔
2457

2458
    // Detect completion of the unbinding process
2459
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,074✔
2460
        // The deactivation process completes when the unbinding process
2461
        // completes.
2462
        complete_deactivation(); // Throws
4,074✔
2463
        // Life cycle state is now Deactivated
2464
    }
4,074✔
2465

2466
    return Status::OK(); // Success
4,074✔
2467
}
4,074✔
2468

2469

2470
void Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2471
{
20✔
2472
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2473
    on_flx_sync_error(query_version, message); // throws
20✔
2474
}
20✔
2475

2476
// The caller (Connection) must discard the session if the session has become
2477
// deactivated upon return.
2478
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2479
{
904✔
2480
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
904✔
2481
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
904✔
2482

2483
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
904✔
2484
    if (REALM_UNLIKELY(!legal_at_this_time)) {
904✔
2485
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2486
    }
×
2487

2488
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
904✔
2489
    auto status = protocol_error_to_status(protocol_error, info.message);
904✔
2490
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
904✔
2491
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2492
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2493
                             info.raw_error_code)};
×
2494
    }
×
2495

2496
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2497
    // the SessionWrapper may not be available
2498
    if (m_state == Active) {
904✔
2499
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
894✔
2500
        if (debug_action == SyncClientHookAction::EarlyReturn) {
894✔
2501
            return Status::OK();
12✔
2502
        }
12✔
2503
    }
894✔
2504

2505
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2506
    // containing the compensating write has appeared in a download message.
2507
    if (status == ErrorCodes::SyncCompensatingWrite) {
892✔
2508
        // If the client is not active, the compensating writes will not be processed now, but will be
2509
        // sent again the next time the client connects
2510
        if (m_state == Active) {
60✔
2511
            REALM_ASSERT(info.compensating_write_server_version.has_value());
60✔
2512
            m_pending_compensating_write_errors.push_back(info);
60✔
2513
        }
60✔
2514
        return Status::OK();
60✔
2515
    }
60✔
2516

2517
    if (protocol_error == ProtocolError::schema_version_changed) {
832✔
2518
        // Enable upload immediately if the session is still active.
2519
        if (m_state == Active) {
68✔
2520
            auto wt = get_db()->start_write();
68✔
2521
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
68✔
2522
            wt->commit();
68✔
2523
            // Notify SyncSession a schema migration is required.
2524
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
68✔
2525
        }
68✔
2526
        // Keep the session active to upload any unsynced changes.
2527
        return Status::OK();
68✔
2528
    }
68✔
2529

2530
    m_error_message_received = true;
764✔
2531
    suspend(SessionErrorInfo{info, std::move(status)});
764✔
2532
    return Status::OK();
764✔
2533
}
832✔
2534

2535
void Session::suspend(const SessionErrorInfo& info)
2536
{
844✔
2537
    REALM_ASSERT(!m_suspended);
844✔
2538
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
844✔
2539
    logger.debug("Suspended"); // Throws
844✔
2540

2541
    m_suspended = true;
844✔
2542

2543
    // Detect completion of the unbinding process
2544
    if (m_unbind_message_send_complete && m_error_message_received) {
844✔
2545
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2546
        // we received an ERROR message implies that the deactivation process must
2547
        // have been initiated, so this session must be in the Deactivating state.
2548
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
10✔
2549

2550
        // The deactivation process completes when the unbinding process
2551
        // completes.
2552
        complete_deactivation(); // Throws
10✔
2553
        // Life cycle state is now Deactivated
2554
    }
10✔
2555

2556
    // Notify the application of the suspension of the session if the session is
2557
    // still in the Active state
2558
    if (m_state == Active) {
844✔
2559
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
834✔
2560
        m_conn.one_less_active_unsuspended_session(); // Throws
834✔
2561
        on_suspended(info);                           // Throws
834✔
2562
    }
834✔
2563

2564
    if (!info.is_fatal) {
844✔
2565
        begin_resumption_delay(info);
180✔
2566
    }
180✔
2567

2568
    // Ready to send the UNBIND message, if it has not been sent already
2569
    if (!m_unbind_message_sent)
844✔
2570
        ensure_enlisted_to_send(); // Throws
834✔
2571
}
844✔
2572

2573
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2574
{
64✔
2575
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
64✔
2576
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
64✔
2577
                           [&](const PendingTestCommand& command) {
64✔
2578
                               return command.id == ident;
64✔
2579
                           });
64✔
2580
    if (it == m_pending_test_commands.end()) {
64✔
2581
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2582
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2583
    }
×
2584

2585
    it->promise.emplace_value(std::string{body});
64✔
2586
    m_pending_test_commands.erase(it);
64✔
2587

2588
    return Status::OK();
64✔
2589
}
64✔
2590

2591
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2592
{
180✔
2593
    REALM_ASSERT(!m_try_again_activation_timer);
180✔
2594

2595
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
180✔
2596
                                  error_info.resumption_delay_interval);
180✔
2597
    auto try_again_interval = m_try_again_delay_info.delay_interval();
180✔
2598
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
180✔
2599
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2600
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2601
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2602
        try_again_interval = std::chrono::milliseconds{1000};
142✔
2603
    }
142✔
2604
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
180✔
2605
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this] {
180✔
2606
        m_try_again_activation_timer.reset();
154✔
2607
        cancel_resumption_delay();
154✔
2608
    });
154✔
2609
}
180✔
2610

2611
void Session::clear_resumption_delay_state()
2612
{
49,398✔
2613
    if (m_try_again_activation_timer) {
49,398✔
2614
        logger.debug("Clearing resumption delay state after successful download");
×
2615
        m_try_again_delay_info.reset();
×
2616
    }
×
2617
}
49,398✔
2618

2619
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2620
{
49,426✔
2621
    const SyncProgress& a = m_progress;
49,426✔
2622
    const SyncProgress& b = progress;
49,426✔
2623
    std::string message;
49,426✔
2624
    if (b.latest_server_version.version < a.latest_server_version.version) {
49,426✔
2625
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2626
                               "session (current: %1, received: %2)",
×
2627
                               a.latest_server_version.version, b.latest_server_version.version);
×
2628
    }
×
2629
    if (b.upload.client_version < a.upload.client_version) {
49,426✔
2630
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2631
                               "throughout a session (current: %1, received: %2)",
×
2632
                               a.upload.client_version, b.upload.client_version);
×
2633
    }
×
2634
    if (b.upload.client_version > m_last_version_available) {
49,426✔
2635
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2636
                               "version in existence (current: %1, received: %2)",
×
2637
                               m_last_version_available, b.upload.client_version);
×
2638
    }
×
2639
    if (b.download.server_version < a.download.server_version) {
49,426✔
2640
        message =
×
2641
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2642
                         a.download.server_version, b.download.server_version);
×
2643
    }
×
2644
    if (b.download.server_version > b.latest_server_version.version) {
49,426✔
2645
        message = util::format(
×
2646
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2647
            b.download.server_version, b.latest_server_version.version);
×
2648
    }
×
2649
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
49,426✔
2650
        message = util::format(
×
2651
            "Last integrated client version on the server at the position in the server's history of the download "
×
2652
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2653
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2654
    }
×
2655
    if (b.download.last_integrated_client_version > b.upload.client_version) {
49,426✔
2656
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2657
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2658
                               "on the server (download: %1, upload: %2)",
×
2659
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2660
    }
×
2661
    if (b.download.server_version < b.upload.last_integrated_server_version) {
49,426✔
2662
        message = util::format(
×
2663
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2664
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2665
            b.download.server_version, b.upload.last_integrated_server_version);
×
2666
    }
×
2667

2668
    if (message.empty()) {
49,426✔
2669
        return Status::OK();
49,424✔
2670
    }
49,424✔
2671
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2672
}
49,426✔
2673

2674

2675
void Session::check_for_download_completion()
2676
{
64,770✔
2677
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
64,770✔
2678
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
64,770✔
2679
    if (m_last_download_mark_received == m_last_triggering_download_mark)
64,770✔
2680
        return;
47,586✔
2681
    if (m_last_download_mark_received < m_target_download_mark)
17,184✔
2682
        return;
344✔
2683
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,840✔
2684
        return;
×
2685
    m_last_triggering_download_mark = m_target_download_mark;
16,840✔
2686
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,840✔
2687
        // Activate the upload process now, and enable immediate reactivation
2688
        // after a subsequent fast reconnect.
2689
        m_delay_uploads = false;
4,766✔
2690
        ensure_enlisted_to_send(); // Throws
4,766✔
2691
    }
4,766✔
2692
    on_download_completion(); // Throws
16,840✔
2693
}
16,840✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc