• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2515

24 Jul 2024 01:19AM UTC coverage: 91.018% (+0.003%) from 91.015%
2515

push

Evergreen

web-flow
RCORE-2210 Remove unused websocket too new/old errors (#7917)

102670 of 181468 branches covered (56.58%)

1 of 3 new or added lines in 2 files covered. (33.33%)

71 existing lines in 10 files now uncovered.

216363 of 237714 relevant lines covered (91.02%)

5835141.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.5
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,922✔
51
    m_backoff_state.reset();
1,922✔
52
    scheduled_reset = false;
1,922✔
53
}
1,922✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,816✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,816✔
60
}
3,816✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
6,088✔
65
    if (scheduled_reset) {
6,088✔
66
        reset();
8✔
67
    }
8✔
68

69
    if (!m_backoff_state.triggering_error) {
6,088✔
70
        return std::chrono::milliseconds::zero();
4,650✔
71
    }
4,650✔
72

73
    switch (*m_backoff_state.triggering_error) {
1,438✔
74
        case ConnectionTerminationReason::closed_voluntarily:
80✔
75
            return std::chrono::milliseconds::zero();
80✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,334✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,334✔
80
                return std::chrono::milliseconds::max();
984✔
81
            }
984✔
82

83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
350✔
84
            return m_backoff_state.delay_interval();
350✔
85
    }
1,438✔
86
}
1,438✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
4,372✔
92
    util::Uri uri(url); // Throws
4,372✔
93
    uri.canonicalize(); // Throws
4,372✔
94
    std::string userinfo, address_2, port_2;
4,372✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,372✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,372✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,372✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,372✔
99
    if (REALM_UNLIKELY(!good))
4,372✔
100
        return false;
×
101
    ProtocolEnvelope protocol_2;
4,372✔
102
    port_type port_3;
4,372✔
103
    if (realm_scheme) {
4,372✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
4,372✔
114
        REALM_ASSERT(ws_scheme);
4,372✔
115
        if (uri.get_scheme() == "ws:") {
4,372✔
116
            protocol_2 = ProtocolEnvelope::ws;
4,292✔
117
            port_3 = 80;
4,292✔
118
        }
4,292✔
119
        else {
80✔
120
            protocol_2 = ProtocolEnvelope::wss;
80✔
121
            port_3 = 443;
80✔
122
        }
80✔
123
    }
4,372✔
124
    if (!port_2.empty()) {
4,372✔
125
        std::istringstream in(port_2);    // Throws
4,268✔
126
        in.imbue(std::locale::classic()); // Throws
4,268✔
127
        in >> port_3;
4,268✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
4,268✔
129
            return false;
×
130
    }
4,268✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
4,372✔
132

133
    protocol = protocol_2;
4,372✔
134
    address = std::move(address_2);
4,372✔
135
    port = port_3;
4,372✔
136
    path = std::move(path_2);
4,372✔
137
    return true;
4,372✔
138
}
4,372✔
139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
4,910✔
142
    , logger{*logger_ptr}
4,910✔
143
    , m_reconnect_mode{config.reconnect_mode}
4,910✔
144
    , m_connect_timeout{config.connect_timeout}
4,910✔
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,910✔
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,910✔
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,910✔
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,910✔
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,910✔
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,910✔
151
    , m_dry_run{config.dry_run}
4,910✔
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,910✔
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
4,910✔
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,910✔
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,910✔
156
    , m_socket_provider{std::move(config.socket_provider)}
4,910✔
157
    , m_client_protocol{} // Throws
4,910✔
158
    , m_one_connection_per_session{config.one_connection_per_session}
4,910✔
159
    , m_random{}
4,910✔
160
{
9,958✔
161
    // FIXME: Would be better if seeding was up to the application.
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,958✔
163

164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,958✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,958✔
166
                 get_current_protocol_version()); // Throws
9,958✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,958✔
168
    const char* build_mode;
9,958✔
169
#if REALM_DEBUG
9,958✔
170
    build_mode = "Debug";
9,958✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,958✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,958✔
176
                 config.one_connection_per_session); // Throws
9,958✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,958✔
178
                 config.connect_timeout); // Throws
9,958✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,958✔
180
                 config.connection_linger_time); // Throws
9,958✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,958✔
182
                 config.ping_keepalive_period); // Throws
9,958✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,958✔
184
                 config.pong_keepalive_timeout); // Throws
9,958✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,958✔
186
                 config.fast_reconnect_limit); // Throws
9,958✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,958✔
188
                 config.disable_upload_compaction); // Throws
9,958✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,958✔
190
                 config.disable_sync_to_disk); // Throws
9,958✔
191
    logger.debug(
9,958✔
192
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,958✔
193
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,958✔
194
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,958✔
195
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,958✔
196

197
    if (config.reconnect_mode != ReconnectMode::normal) {
9,958✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
199
                    "never do this in production!");
776✔
200
    }
776✔
201

202
    if (config.dry_run) {
9,958✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
×
205
    }
×
206

207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,958✔
208

209
    if (m_one_connection_per_session) {
9,958✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
4✔
212
    }
4✔
213

214
    if (config.disable_upload_activation_delay) {
9,958✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

219
    if (config.disable_sync_to_disk) {
9,958✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
×
222
    }
×
223

224
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,882✔
225
        if (status == ErrorCodes::OperationAborted)
14,882✔
226
            return;
×
227
        else if (!status.is_ok())
14,882✔
228
            throw Exception(status);
×
229
        actualize_and_finalize_session_wrappers(); // Throws
14,882✔
230
    });
14,882✔
231
}
9,958✔
232

233
void ClientImpl::incr_outstanding_posts()
234
{
207,650✔
235
    util::CheckedLockGuard lock(m_drain_mutex);
207,650✔
236
    ++m_outstanding_posts;
207,650✔
237
    m_drained = false;
207,650✔
238
}
207,650✔
239

240
void ClientImpl::decr_outstanding_posts()
241
{
207,632✔
242
    util::CheckedLockGuard lock(m_drain_mutex);
207,632✔
243
    REALM_ASSERT(m_outstanding_posts);
207,632✔
244
    if (--m_outstanding_posts <= 0) {
207,632✔
245
        // Notify must happen with lock held or another thread could destroy
246
        // ClientImpl between when we release the lock and when we call notify
247
        m_drain_cv.notify_all();
18,232✔
248
    }
18,232✔
249
}
207,632✔
250

251
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
252
{
56,574✔
253
    REALM_ASSERT(m_socket_provider);
56,574✔
254
    incr_outstanding_posts();
56,574✔
255
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
56,574✔
256
        auto decr_guard = util::make_scope_exit([&]() noexcept {
56,572✔
257
            decr_outstanding_posts();
56,570✔
258
        });
56,570✔
259
        handler(status);
56,566✔
260
    });
56,566✔
261
}
56,574✔
262

263
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
264
{
133,088✔
265
    REALM_ASSERT(m_socket_provider);
133,088✔
266
    incr_outstanding_posts();
133,088✔
267
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
133,088✔
268
        auto decr_guard = util::make_scope_exit([&]() noexcept {
133,086✔
269
            decr_outstanding_posts();
133,086✔
270
        });
133,086✔
271
        if (status == ErrorCodes::OperationAborted)
133,084✔
272
            return;
×
273
        if (!status.is_ok())
133,084✔
274
            throw Exception(status);
×
275
        handler();
133,084✔
276
    });
133,084✔
277
}
133,088✔
278

279

280
void ClientImpl::drain_connections()
281
{
9,956✔
282
    logger.debug("Draining connections during sync client shutdown");
9,956✔
283
    for (auto& server_slot_pair : m_server_slots) {
9,956✔
284
        auto& server_slot = server_slot_pair.second;
2,722✔
285

286
        if (server_slot.connection) {
2,722✔
287
            auto& conn = server_slot.connection;
2,496✔
288
            conn->force_close();
2,496✔
289
        }
2,496✔
290
        else {
226✔
291
            for (auto& conn_pair : server_slot.alt_connections) {
226✔
UNCOV
292
                conn_pair.second->force_close();
×
UNCOV
293
            }
×
294
        }
226✔
295
    }
2,722✔
296
}
9,956✔
297

298

299
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
300
                                                       SyncSocketProvider::FunctionHandler&& handler)
301
{
17,994✔
302
    REALM_ASSERT(m_socket_provider);
17,994✔
303
    incr_outstanding_posts();
17,994✔
304
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,994✔
305
        auto decr_guard = util::make_scope_exit([&]() noexcept {
17,986✔
306
            decr_outstanding_posts();
17,984✔
307
        });
17,984✔
308
        handler(status);
17,984✔
309
    });
17,984✔
310
}
17,994✔
311

312

313
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
314
{
12,784✔
315
    REALM_ASSERT(m_socket_provider);
12,784✔
316
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,784✔
317
}
12,784✔
318

319
Connection::~Connection()
320
{
2,822✔
321
    if (m_websocket_sentinel) {
2,822✔
322
        m_websocket_sentinel->destroyed = true;
×
323
        m_websocket_sentinel.reset();
×
324
    }
×
325
}
2,822✔
326

327
void Connection::activate()
328
{
2,828✔
329
    REALM_ASSERT(m_on_idle);
2,828✔
330
    m_activated = true;
2,828✔
331
    if (m_num_active_sessions == 0)
2,828✔
332
        m_on_idle->trigger();
×
333
    // We cannot in general connect immediately, because a prior failure to
334
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
335
    initiate_reconnect_wait(); // Throws
2,828✔
336
}
2,828✔
337

338

339
void Connection::activate_session(std::unique_ptr<Session> sess)
340
{
10,402✔
341
    REALM_ASSERT(sess);
10,402✔
342
    REALM_ASSERT(&sess->m_conn == this);
10,402✔
343
    REALM_ASSERT(!m_force_closed);
10,402✔
344
    Session& sess_2 = *sess;
10,402✔
345
    session_ident_type ident = sess->m_ident;
10,402✔
346
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,402✔
347
    bool was_inserted = p.second;
10,402✔
348
    REALM_ASSERT(was_inserted);
10,402✔
349
    // Save the session ident to the historical list of session idents
350
    m_session_history.insert(ident);
10,402✔
351
    sess_2.activate(); // Throws
10,402✔
352
    if (m_state == ConnectionState::connected) {
10,402✔
353
        bool fast_reconnect = false;
7,304✔
354
        sess_2.connection_established(fast_reconnect); // Throws
7,304✔
355
    }
7,304✔
356
    ++m_num_active_sessions;
10,402✔
357
}
10,402✔
358

359

360
void Connection::initiate_session_deactivation(Session* sess)
361
{
10,400✔
362
    REALM_ASSERT(sess);
10,400✔
363
    REALM_ASSERT(&sess->m_conn == this);
10,400✔
364
    REALM_ASSERT(m_num_active_sessions);
10,400✔
365
    // Since the client may be waiting for m_num_active_sessions to reach 0
366
    // in stop_and_wait() (on a separate thread), deactivate Session before
367
    // decrementing the num active sessions value.
368
    sess->initiate_deactivation(); // Throws
10,400✔
369
    if (sess->m_state == Session::Deactivated) {
10,400✔
370
        finish_session_deactivation(sess);
952✔
371
    }
952✔
372
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,400✔
373
        if (m_activated && m_state == ConnectionState::disconnected)
4,630✔
374
            m_on_idle->trigger();
370✔
375
    }
4,630✔
376
}
10,400✔
377

378

379
void Connection::cancel_reconnect_delay()
380
{
2,148✔
381
    REALM_ASSERT(m_activated);
2,148✔
382

383
    if (m_reconnect_delay_in_progress) {
2,148✔
384
        if (m_nonzero_reconnect_delay)
1,910✔
385
            logger.detail("Canceling reconnect delay"); // Throws
956✔
386

387
        // Cancel the in-progress wait operation by destroying the timer
388
        // object. Destruction is needed in this case, because a new wait
389
        // operation might have to be initiated before the previous one
390
        // completes (its completion handler starts to execute), so the new wait
391
        // operation must be done on a new timer object.
392
        m_reconnect_disconnect_timer.reset();
1,910✔
393
        m_reconnect_delay_in_progress = false;
1,910✔
394
        m_reconnect_info.reset();
1,910✔
395
        initiate_reconnect_wait(); // Throws
1,910✔
396
        return;
1,910✔
397
    }
1,910✔
398

399
    // If we are not disconnected, then we need to make sure the next time we get disconnected
400
    // that we are allowed to re-connect as quickly as possible.
401
    //
402
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
403
    // backoff/delay state before calculating the next delay, unless a PONG message is received
404
    // for the urgent PING message we send below.
405
    //
406
    // If we get a PONG message for the urgent PING message sent below, then the connection is
407
    // healthy and we can calculate the next delay normally.
408
    if (m_state != ConnectionState::disconnected) {
238✔
409
        m_reconnect_info.scheduled_reset = true;
238✔
410
        m_ping_after_scheduled_reset_of_reconnect_info = false;
238✔
411

412
        schedule_urgent_ping(); // Throws
238✔
413
        return;
238✔
414
    }
238✔
415
    // Nothing to do in this case. The next reconnect attemp will be made as
416
    // soon as there are any sessions that are both active and unsuspended.
417
}
238✔
418

419
void Connection::finish_session_deactivation(Session* sess)
420
{
7,952✔
421
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,952✔
422
    auto ident = sess->m_ident;
7,952✔
423
    m_sessions.erase(ident);
7,952✔
424
    m_session_history.erase(ident);
7,952✔
425
}
7,952✔
426

427
void Connection::force_close()
428
{
2,496✔
429
    if (m_force_closed) {
2,496✔
430
        return;
×
431
    }
×
432

433
    m_force_closed = true;
2,496✔
434

435
    if (m_state != ConnectionState::disconnected) {
2,496✔
436
        voluntary_disconnect();
2,464✔
437
    }
2,464✔
438

439
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,496✔
440
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,496✔
441
        m_reconnect_disconnect_timer.reset();
32✔
442
        m_reconnect_delay_in_progress = false;
32✔
443
        m_disconnect_delay_in_progress = false;
32✔
444
    }
32✔
445

446
    // We must copy any session pointers we want to close to a vector because force_closing
447
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
448
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
449
    std::vector<Session*> to_close;
2,496✔
450
    for (auto& session_pair : m_sessions) {
2,496✔
451
        if (session_pair.second->m_state == Session::State::Active) {
102✔
452
            to_close.push_back(session_pair.second.get());
102✔
453
        }
102✔
454
    }
102✔
455

456
    for (auto& sess : to_close) {
2,496✔
457
        sess->force_close();
102✔
458
    }
102✔
459

460
    logger.debug("Force closed idle connection");
2,496✔
461
}
2,496✔
462

463

464
void Connection::websocket_connected_handler(const std::string& protocol)
465
{
3,608✔
466
    if (!protocol.empty()) {
3,608✔
467
        std::string_view expected_prefix =
3,608✔
468
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,608✔
469
        // FIXME: Use std::string_view::begins_with() in C++20.
470
        auto prefix_matches = [&](std::string_view other) {
3,608✔
471
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,608✔
472
        };
3,608✔
473
        if (prefix_matches(expected_prefix)) {
3,608✔
474
            util::MemoryInputStream in;
3,608✔
475
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,608✔
476
            in.imbue(std::locale::classic());
3,608✔
477
            in.unsetf(std::ios_base::skipws);
3,608✔
478
            int value_2 = 0;
3,608✔
479
            in >> value_2;
3,608✔
480
            if (in && in.eof() && value_2 >= 0) {
3,608✔
481
                bool good_version =
3,608✔
482
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,608✔
483
                if (good_version) {
3,608✔
484
                    logger.detail("Negotiated protocol version: %1", value_2);
3,608✔
485
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
486
                    // will provide the appservices connection ID via a log message.
487
                    // TODO: Remove once the server starts sending the connection ID
488
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,608✔
489
                    m_negotiated_protocol_version = value_2;
3,608✔
490
                    handle_connection_established(); // Throws
3,608✔
491
                    return;
3,608✔
492
                }
3,608✔
493
            }
3,608✔
494
        }
3,608✔
UNCOV
495
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
UNCOV
496
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
UNCOV
497
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
UNCOV
498
    }
×
UNCOV
499
    else {
×
UNCOV
500
        close_due_to_client_side_error(
×
UNCOV
501
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
UNCOV
502
            ConnectionTerminationReason::bad_headers_in_http_response);
×
UNCOV
503
    }
×
504
}
3,608✔
505

506

507
bool Connection::websocket_binary_message_received(util::Span<const char> data)
508
{
81,848✔
509
    if (m_force_closed) {
81,848✔
510
        logger.debug("Received binary message after connection was force closed");
×
511
        return false;
×
512
    }
×
513

514
    using sf = SimulatedFailure;
81,848✔
515
    if (sf::check_trigger(sf::sync_client__read_head)) {
81,848✔
516
        close_due_to_client_side_error(
438✔
517
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
438✔
518
            ConnectionTerminationReason::read_or_write_error);
438✔
519
        return bool(m_websocket);
438✔
520
    }
438✔
521

522
    handle_message_received(data);
81,410✔
523
    return bool(m_websocket);
81,410✔
524
}
81,848✔
525

526

527
void Connection::websocket_error_handler()
528
{
722✔
529
    m_websocket_error_received = true;
722✔
530
}
722✔
531

532
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
533
{
838✔
534
    if (m_force_closed) {
838✔
535
        logger.debug("Received websocket close message after connection was force closed");
×
536
        return false;
×
537
    }
×
538
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
838✔
539

540
    switch (error_code) {
838✔
541
        case WebSocketError::websocket_ok:
68✔
542
            break;
68✔
543
        case WebSocketError::websocket_resolve_failed:
4✔
544
            [[fallthrough]];
4✔
545
        case WebSocketError::websocket_connection_failed: {
112✔
546
            SessionErrorInfo error_info(
112✔
547
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
548
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
549
            // to make sure the websocket URL is correct
550
            if (!m_server_endpoint.is_verified) {
112✔
551
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
552
            }
84✔
553
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
554
            break;
112✔
555
        }
4✔
556
        case WebSocketError::websocket_read_error:
588✔
557
            [[fallthrough]];
588✔
558
        case WebSocketError::websocket_write_error: {
588✔
559
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
588✔
560
                                         ConnectionTerminationReason::read_or_write_error);
588✔
561
            break;
588✔
562
        }
588✔
563
        case WebSocketError::websocket_going_away:
✔
564
            [[fallthrough]];
×
565
        case WebSocketError::websocket_protocol_error:
✔
566
            [[fallthrough]];
×
567
        case WebSocketError::websocket_unsupported_data:
✔
568
            [[fallthrough]];
×
569
        case WebSocketError::websocket_invalid_payload_data:
✔
570
            [[fallthrough]];
×
571
        case WebSocketError::websocket_policy_violation:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_reserved:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_no_status_received:
✔
576
            [[fallthrough]];
×
577
        case WebSocketError::websocket_invalid_extension: {
✔
578
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
579
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
580
            break;
×
581
        }
×
582
        case WebSocketError::websocket_message_too_big: {
4✔
583
            auto message = util::format(
4✔
584
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
585
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
586
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
587
            involuntary_disconnect(std::move(error_info),
4✔
588
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
589
            break;
4✔
590
        }
×
591
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
592
            close_due_to_client_side_error(
10✔
593
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
594
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
595
            break;
10✔
596
        }
×
597
        case WebSocketError::websocket_fatal_error: {
✔
598
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
599
            // been verified, then use a non-fatal error and try to perform a location update.
600
            SessionErrorInfo error_info(
×
601
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
602
                IsFatal{m_server_endpoint.is_verified});
×
603
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
604
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
605
            // to make sure the websocket URL is correct
606
            if (!m_server_endpoint.is_verified) {
×
607
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
608
                reason = ConnectionTerminationReason::connect_operation_failed;
×
609
            }
×
610
            involuntary_disconnect(std::move(error_info), reason);
×
611
            break;
×
612
        }
×
613
        case WebSocketError::websocket_forbidden: {
✔
614
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
615
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
616
            involuntary_disconnect(std::move(error_info),
×
617
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
618
            break;
×
619
        }
×
620
        case WebSocketError::websocket_unauthorized: {
44✔
621
            SessionErrorInfo error_info(
44✔
622
                {ErrorCodes::AuthError,
44✔
623
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
624
                IsFatal{false});
44✔
625
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
626
            involuntary_disconnect(std::move(error_info),
44✔
627
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
628
            break;
44✔
629
        }
×
630
        case WebSocketError::websocket_moved_permanently: {
12✔
631
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
632
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
633
            involuntary_disconnect(std::move(error_info),
12✔
634
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
635
            break;
12✔
636
        }
×
637
        case WebSocketError::websocket_abnormal_closure: {
✔
638
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
639
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
640
            involuntary_disconnect(std::move(error_info),
×
641
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
642
            break;
×
643
        }
×
644
        case WebSocketError::websocket_internal_server_error:
✔
645
            [[fallthrough]];
×
646
        case WebSocketError::websocket_retry_error: {
✔
647
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
648
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
649
            break;
×
650
        }
×
651
    }
838✔
652

653
    return bool(m_websocket);
838✔
654
}
838✔
655

656
// Guarantees that handle_reconnect_wait() is never called from within the
657
// execution of initiate_reconnect_wait() (no callback reentrance).
658
void Connection::initiate_reconnect_wait()
659
{
8,548✔
660
    REALM_ASSERT(m_activated);
8,548✔
661
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,548✔
662
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,548✔
663

664
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
665
    if (m_force_closed) {
8,548✔
666
        return;
2,460✔
667
    }
2,460✔
668

669
    m_reconnect_delay_in_progress = true;
6,088✔
670
    auto delay = m_reconnect_info.delay_interval();
6,088✔
671
    if (delay == std::chrono::milliseconds::max()) {
6,088✔
672
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,002✔
673
        // Not actually starting a timer corresponds to an infinite wait
674
        m_nonzero_reconnect_delay = true;
1,002✔
675
        return;
1,002✔
676
    }
1,002✔
677

678
    if (delay == std::chrono::milliseconds::zero()) {
5,086✔
679
        m_nonzero_reconnect_delay = false;
4,730✔
680
    }
4,730✔
681
    else {
356✔
682
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
356✔
683
        m_nonzero_reconnect_delay = true;
356✔
684
    }
356✔
685

686
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
687
    // we need it to be cancelable in case the connection is terminated before the timer
688
    // callback is run.
689
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,088✔
690
        // If the operation is aborted, the connection object may have been
691
        // destroyed.
692
        if (status != ErrorCodes::OperationAborted)
5,086✔
693
            handle_reconnect_wait(status); // Throws
3,816✔
694
    });                                    // Throws
5,086✔
695
}
5,086✔
696

697

698
void Connection::handle_reconnect_wait(Status status)
699
{
3,816✔
700
    if (!status.is_ok()) {
3,816✔
701
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
702
        throw Exception(status);
×
703
    }
×
704

705
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,816✔
706
    m_reconnect_delay_in_progress = false;
3,816✔
707

708
    if (m_num_active_unsuspended_sessions > 0)
3,816✔
709
        initiate_reconnect(); // Throws
3,808✔
710
}
3,816✔
711

712
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
713
    explicit WebSocketObserverShim(Connection* conn)
714
        : conn(conn)
1,788✔
715
        , sentinel(conn->m_websocket_sentinel)
1,788✔
716
    {
3,816✔
717
    }
3,816✔
718

719
    Connection* conn;
720
    util::bind_ptr<LifecycleSentinel> sentinel;
721

722
    void websocket_connected_handler(const std::string& protocol) override
723
    {
3,608✔
724
        if (sentinel->destroyed) {
3,608✔
725
            return;
×
726
        }
×
727

728
        return conn->websocket_connected_handler(protocol);
3,608✔
729
    }
3,608✔
730

731
    void websocket_error_handler() override
732
    {
722✔
733
        if (sentinel->destroyed) {
722✔
734
            return;
×
735
        }
×
736

737
        conn->websocket_error_handler();
722✔
738
    }
722✔
739

740
    bool websocket_binary_message_received(util::Span<const char> data) override
741
    {
81,848✔
742
        if (sentinel->destroyed) {
81,848✔
743
            return false;
×
744
        }
×
745

746
        return conn->websocket_binary_message_received(data);
81,848✔
747
    }
81,848✔
748

749
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
750
    {
838✔
751
        if (sentinel->destroyed) {
838✔
752
            return true;
×
753
        }
×
754

755
        return conn->websocket_closed_handler(was_clean, error_code, msg);
838✔
756
    }
838✔
757
};
758

759
void Connection::initiate_reconnect()
760
{
3,816✔
761
    REALM_ASSERT(m_activated);
3,816✔
762

763
    m_state = ConnectionState::connecting;
3,816✔
764
    report_connection_state_change(ConnectionState::connecting); // Throws
3,816✔
765
    if (m_websocket_sentinel) {
3,816✔
766
        m_websocket_sentinel->destroyed = true;
×
767
    }
×
768
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,816✔
769
    m_websocket.reset();
3,816✔
770

771
    // Watchdog
772
    initiate_connect_wait(); // Throws
3,816✔
773

774
    std::vector<std::string> sec_websocket_protocol;
3,816✔
775
    {
3,816✔
776
        auto protocol_prefix =
3,816✔
777
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,816✔
778
        int min = get_oldest_supported_protocol_version();
3,816✔
779
        int max = get_current_protocol_version();
3,816✔
780
        REALM_ASSERT_3(min, <=, max);
3,816✔
781
        // List protocol version in descending order to ensure that the server
782
        // selects the highest possible version.
783
        for (int version = max; version >= min; --version) {
53,418✔
784
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
49,602✔
785
        }
49,602✔
786
    }
3,816✔
787

788
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,816✔
789
                m_server_endpoint.port, m_http_request_path_prefix);
3,816✔
790

791
    m_websocket_error_received = false;
3,816✔
792
    m_websocket =
3,816✔
793
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,816✔
794
                                            WebSocketEndpoint{
3,816✔
795
                                                m_server_endpoint.address,
3,816✔
796
                                                m_server_endpoint.port,
3,816✔
797
                                                get_http_request_path(),
3,816✔
798
                                                std::move(sec_websocket_protocol),
3,816✔
799
                                                is_ssl(m_server_endpoint.envelope),
3,816✔
800
                                                /// DEPRECATED - The following will be removed in a future release
801
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,816✔
802
                                                m_verify_servers_ssl_certificate,
3,816✔
803
                                                m_ssl_trust_certificate_path,
3,816✔
804
                                                m_ssl_verify_callback,
3,816✔
805
                                                m_proxy_config,
3,816✔
806
                                            });
3,816✔
807
}
3,816✔
808

809

810
void Connection::initiate_connect_wait()
811
{
3,816✔
812
    // Deploy a watchdog to enforce an upper bound on the time it can take to
813
    // fully establish the connection (including SSL and WebSocket
814
    // handshakes). Without such a watchdog, connect operations could take very
815
    // long, or even indefinite time.
816
    milliseconds_type time = m_client.m_connect_timeout;
3,816✔
817

818
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,816✔
819
        // If the operation is aborted, the connection object may have been
820
        // destroyed.
821
        if (status != ErrorCodes::OperationAborted)
3,816✔
822
            handle_connect_wait(status); // Throws
×
823
    });                                  // Throws
3,816✔
824
}
3,816✔
825

826

827
void Connection::handle_connect_wait(Status status)
828
{
×
829
    if (!status.is_ok()) {
×
830
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
831
        throw Exception(status);
×
832
    }
×
833

834
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
835
    logger.info("Connect timeout"); // Throws
×
836
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
837
                                IsFatal{false});
×
838
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
839
    // to make sure the websocket URL is correct
840
    if (!m_server_endpoint.is_verified) {
×
841
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
842
    }
×
843
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
844
}
×
845

846

847
void Connection::handle_connection_established()
848
{
3,608✔
849
    // Cancel connect timeout watchdog
850
    m_connect_timer.reset();
3,608✔
851

852
    m_state = ConnectionState::connected;
3,608✔
853
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,608✔
854

855
    milliseconds_type now = monotonic_clock_now();
3,608✔
856
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,608✔
857
    initiate_ping_delay(now);     // Throws
3,608✔
858

859
    bool fast_reconnect = false;
3,608✔
860
    if (m_disconnect_has_occurred) {
3,608✔
861
        milliseconds_type time = now - m_disconnect_time;
1,038✔
862
        if (time <= m_client.m_fast_reconnect_limit)
1,038✔
863
            fast_reconnect = true;
1,038✔
864
    }
1,038✔
865

866
    for (auto& p : m_sessions) {
4,732✔
867
        Session& sess = *p.second;
4,732✔
868
        sess.connection_established(fast_reconnect); // Throws
4,732✔
869
    }
4,732✔
870

871
    report_connection_state_change(ConnectionState::connected); // Throws
3,608✔
872
}
3,608✔
873

874

875
void Connection::schedule_urgent_ping()
876
{
238✔
877
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
238✔
878
    if (m_ping_delay_in_progress) {
238✔
879
        m_heartbeat_timer.reset();
146✔
880
        m_ping_delay_in_progress = false;
146✔
881
        m_minimize_next_ping_delay = true;
146✔
882
        milliseconds_type now = monotonic_clock_now();
146✔
883
        initiate_ping_delay(now); // Throws
146✔
884
        return;
146✔
885
    }
146✔
886
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
92✔
887
    if (!m_send_ping)
92✔
888
        m_minimize_next_ping_delay = true;
90✔
889
}
92✔
890

891

892
void Connection::initiate_ping_delay(milliseconds_type now)
893
{
3,934✔
894
    REALM_ASSERT(!m_ping_delay_in_progress);
3,934✔
895
    REALM_ASSERT(!m_waiting_for_pong);
3,934✔
896
    REALM_ASSERT(!m_send_ping);
3,934✔
897

898
    milliseconds_type delay = 0;
3,934✔
899
    if (!m_minimize_next_ping_delay) {
3,934✔
900
        delay = m_client.m_ping_keepalive_period;
3,774✔
901
        // Make a randomized deduction of up to 10%, or up to 100% if this is
902
        // the first PING message to be sent since the connection was
903
        // established. The purpose of this randomized deduction is to reduce
904
        // the risk of many connections sending PING messages simultaneously to
905
        // the server.
906
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,774✔
907
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,774✔
908
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,774✔
909
        delay -= randomized_deduction;
3,774✔
910
        // Deduct the time spent waiting for PONG
911
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,774✔
912
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,774✔
913
        if (spent_time < delay) {
3,774✔
914
            delay -= spent_time;
3,766✔
915
        }
3,766✔
916
        else {
8✔
917
            delay = 0;
8✔
918
        }
8✔
919
    }
3,774✔
920
    else {
160✔
921
        m_minimize_next_ping_delay = false;
160✔
922
    }
160✔
923

924

925
    m_ping_delay_in_progress = true;
3,934✔
926

927
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,934✔
928
        if (status == ErrorCodes::OperationAborted)
3,930✔
929
            return;
3,732✔
930
        else if (!status.is_ok())
198✔
931
            throw Exception(status);
×
932

933
        handle_ping_delay();                                    // Throws
198✔
934
    });                                                         // Throws
198✔
935
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,934✔
936
}
3,934✔
937

938

939
void Connection::handle_ping_delay()
940
{
198✔
941
    REALM_ASSERT(m_ping_delay_in_progress);
198✔
942
    m_ping_delay_in_progress = false;
198✔
943
    m_send_ping = true;
198✔
944

945
    initiate_pong_timeout(); // Throws
198✔
946

947
    if (m_state == ConnectionState::connected && !m_sending)
198✔
948
        send_next_message(); // Throws
144✔
949
}
198✔
950

951

952
void Connection::initiate_pong_timeout()
953
{
198✔
954
    REALM_ASSERT(!m_ping_delay_in_progress);
198✔
955
    REALM_ASSERT(!m_waiting_for_pong);
198✔
956
    REALM_ASSERT(m_send_ping);
198✔
957

958
    m_waiting_for_pong = true;
198✔
959
    m_pong_wait_started_at = monotonic_clock_now();
198✔
960

961
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
198✔
962
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
198✔
963
        if (status == ErrorCodes::OperationAborted)
198✔
964
            return;
186✔
965
        else if (!status.is_ok())
12✔
966
            throw Exception(status);
×
967

968
        handle_pong_timeout(); // Throws
12✔
969
    });                        // Throws
12✔
970
}
198✔
971

972

973
void Connection::handle_pong_timeout()
974
{
12✔
975
    REALM_ASSERT(m_waiting_for_pong);
12✔
976
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
977
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
978
                                 ConnectionTerminationReason::pong_timeout);
12✔
979
}
12✔
980

981

982
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
983
{
98,492✔
984
    // Stop sending messages if an websocket error was received.
985
    if (m_websocket_error_received)
98,492✔
986
        return;
×
987

988
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
98,492✔
989
        if (sentinel->destroyed) {
98,398✔
990
            return;
1,418✔
991
        }
1,418✔
992
        if (!status.is_ok()) {
96,980✔
993
            if (status != ErrorCodes::Error::OperationAborted) {
×
994
                // Write errors will be handled by the websocket_write_error_handler() callback
995
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
996
            }
×
997
            return;
×
998
        }
×
999
        handle_write_message(); // Throws
96,980✔
1000
    });                         // Throws
96,980✔
1001
    m_sending_session = sess;
98,492✔
1002
    m_sending = true;
98,492✔
1003
}
98,492✔
1004

1005

1006
void Connection::handle_write_message()
1007
{
96,980✔
1008
    m_sending_session->message_sent(); // Throws
96,980✔
1009
    if (m_sending_session->m_state == Session::Deactivated) {
96,980✔
1010
        finish_session_deactivation(m_sending_session);
122✔
1011
    }
122✔
1012
    m_sending_session = nullptr;
96,980✔
1013
    m_sending = false;
96,980✔
1014
    send_next_message(); // Throws
96,980✔
1015
}
96,980✔
1016

1017

1018
void Connection::send_next_message()
1019
{
175,100✔
1020
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
175,100✔
1021
    REALM_ASSERT(!m_sending_session);
175,100✔
1022
    REALM_ASSERT(!m_sending);
175,100✔
1023
    if (m_send_ping) {
175,100✔
1024
        send_ping(); // Throws
186✔
1025
        return;
186✔
1026
    }
186✔
1027
    while (!m_sessions_enlisted_to_send.empty()) {
253,654✔
1028
        // The state of being connected is not supposed to be able to change
1029
        // across this loop thanks to the "no callback reentrance" guarantee
1030
        // provided by Websocket::async_write_text(), and friends.
1031
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
177,620✔
1032

1033
        Session& sess = *m_sessions_enlisted_to_send.front();
177,620✔
1034
        m_sessions_enlisted_to_send.pop_front();
177,620✔
1035
        sess.send_message(); // Throws
177,620✔
1036

1037
        if (sess.m_state == Session::Deactivated) {
177,620✔
1038
            finish_session_deactivation(&sess);
2,906✔
1039
        }
2,906✔
1040

1041
        // An enlisted session may choose to not send a message. In that case,
1042
        // we should pass the opportunity to the next enlisted session.
1043
        if (m_sending)
177,620✔
1044
            break;
98,880✔
1045
    }
177,620✔
1046
}
174,914✔
1047

1048

1049
void Connection::send_ping()
1050
{
186✔
1051
    REALM_ASSERT(!m_ping_delay_in_progress);
186✔
1052
    REALM_ASSERT(m_waiting_for_pong);
186✔
1053
    REALM_ASSERT(m_send_ping);
186✔
1054

1055
    m_send_ping = false;
186✔
1056
    if (m_reconnect_info.scheduled_reset)
186✔
1057
        m_ping_after_scheduled_reset_of_reconnect_info = true;
148✔
1058

1059
    m_last_ping_sent_at = monotonic_clock_now();
186✔
1060
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
186✔
1061
                 m_previous_ping_rtt); // Throws
186✔
1062

1063
    ClientProtocol& protocol = get_client_protocol();
186✔
1064
    OutputBuffer& out = get_output_buffer();
186✔
1065
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
186✔
1066
    initiate_write_ping(out);                                          // Throws
186✔
1067
    m_ping_sent = true;
186✔
1068
}
186✔
1069

1070

1071
void Connection::initiate_write_ping(const OutputBuffer& out)
1072
{
186✔
1073
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
186✔
1074
        if (sentinel->destroyed) {
186✔
1075
            return;
2✔
1076
        }
2✔
1077
        if (!status.is_ok()) {
184✔
1078
            if (status != ErrorCodes::Error::OperationAborted) {
×
1079
                // Write errors will be handled by the websocket_write_error_handler() callback
1080
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1081
            }
×
1082
            return;
×
1083
        }
×
1084
        handle_write_ping(); // Throws
184✔
1085
    });                      // Throws
184✔
1086
    m_sending = true;
186✔
1087
}
186✔
1088

1089

1090
void Connection::handle_write_ping()
1091
{
184✔
1092
    REALM_ASSERT(m_sending);
184✔
1093
    REALM_ASSERT(!m_sending_session);
184✔
1094
    m_sending = false;
184✔
1095
    send_next_message(); // Throws
184✔
1096
}
184✔
1097

1098

1099
void Connection::handle_message_received(util::Span<const char> data)
1100
{
81,410✔
1101
    // parse_message_received() parses the message and calls the proper handler
1102
    // on the Connection object (this).
1103
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,410✔
1104
}
81,410✔
1105

1106

1107
void Connection::initiate_disconnect_wait()
1108
{
4,784✔
1109
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,784✔
1110

1111
    if (m_disconnect_delay_in_progress) {
4,784✔
1112
        m_reconnect_disconnect_timer.reset();
2,222✔
1113
        m_disconnect_delay_in_progress = false;
2,222✔
1114
    }
2,222✔
1115

1116
    milliseconds_type time = m_client.m_connection_linger_time;
4,784✔
1117

1118
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,784✔
1119
        // If the operation is aborted, the connection object may have been
1120
        // destroyed.
1121
        if (status != ErrorCodes::OperationAborted)
4,776✔
1122
            handle_disconnect_wait(status); // Throws
12✔
1123
    });                                     // Throws
4,776✔
1124
    m_disconnect_delay_in_progress = true;
4,784✔
1125
}
4,784✔
1126

1127

1128
void Connection::handle_disconnect_wait(Status status)
1129
{
12✔
1130
    if (!status.is_ok()) {
12✔
1131
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1132
        throw Exception(status);
×
1133
    }
×
1134

1135
    m_disconnect_delay_in_progress = false;
12✔
1136

1137
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1138
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1139
        if (m_client.m_connection_linger_time > 0)
12✔
1140
            logger.detail("Linger time expired"); // Throws
×
1141
        voluntary_disconnect();                   // Throws
12✔
1142
        logger.info("Disconnected");              // Throws
12✔
1143
    }
12✔
1144
}
12✔
1145

1146

1147
void Connection::close_due_to_protocol_error(Status status)
1148
{
16✔
1149
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1150
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1151
    involuntary_disconnect(std::move(error_info),
16✔
1152
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1153
}
16✔
1154

1155

1156
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1157
{
448✔
1158
    logger.info("Connection closed due to error: %1", status); // Throws
448✔
1159

1160
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
448✔
1161
}
448✔
1162

1163

1164
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1165
{
600✔
1166
    logger.info("Connection closed due to transient error: %1", status); // Throws
600✔
1167
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
600✔
1168
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
600✔
1169

1170
    involuntary_disconnect(std::move(error_info), reason); // Throw
600✔
1171
}
600✔
1172

1173

1174
// Close connection due to error discovered on the server-side, and then
1175
// reported to the client by way of a connection-level ERROR message.
1176
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1177
{
66✔
1178
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1179
                int(error_code)); // Throws
66✔
1180

1181
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
66✔
1182
                                      : ConnectionTerminationReason::server_said_try_again_later;
66✔
1183
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1184
                           reason); // Throws
66✔
1185
}
66✔
1186

1187

1188
void Connection::disconnect(const SessionErrorInfo& info)
1189
{
3,814✔
1190
    // Cancel connect timeout watchdog
1191
    m_connect_timer.reset();
3,814✔
1192

1193
    if (m_state == ConnectionState::connected) {
3,814✔
1194
        m_disconnect_time = monotonic_clock_now();
3,604✔
1195
        m_disconnect_has_occurred = true;
3,604✔
1196

1197
        // Sessions that are in the Deactivating state at this time can be
1198
        // immediately discarded, in part because they are no longer enlisted to
1199
        // send. Such sessions will be taken to the Deactivated state by
1200
        // Session::connection_lost(), and then they will be removed from
1201
        // `m_sessions`.
1202
        auto i = m_sessions.begin(), end = m_sessions.end();
3,604✔
1203
        while (i != end) {
8,218✔
1204
            // Prevent invalidation of the main iterator when erasing elements
1205
            auto j = i++;
4,614✔
1206
            Session& sess = *j->second;
4,614✔
1207
            sess.connection_lost(); // Throws
4,614✔
1208
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,616✔
1209
                m_sessions.erase(j);
2,450✔
1210
        }
4,614✔
1211
    }
3,604✔
1212

1213
    change_state_to_disconnected();
3,814✔
1214

1215
    m_ping_delay_in_progress = false;
3,814✔
1216
    m_waiting_for_pong = false;
3,814✔
1217
    m_send_ping = false;
3,814✔
1218
    m_minimize_next_ping_delay = false;
3,814✔
1219
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,814✔
1220
    m_ping_sent = false;
3,814✔
1221
    m_heartbeat_timer.reset();
3,814✔
1222
    m_previous_ping_rtt = 0;
3,814✔
1223

1224
    m_websocket_sentinel->destroyed = true;
3,814✔
1225
    m_websocket_sentinel.reset();
3,814✔
1226
    m_websocket.reset();
3,814✔
1227
    m_input_body_buffer.reset();
3,814✔
1228
    m_sending_session = nullptr;
3,814✔
1229
    m_sessions_enlisted_to_send.clear();
3,814✔
1230
    m_sending = false;
3,814✔
1231

1232
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,814✔
1233
    initiate_reconnect_wait();                                           // Throws
3,814✔
1234
}
3,814✔
1235

1236
bool Connection::is_flx_sync_connection() const noexcept
1237
{
116,594✔
1238
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
116,594✔
1239
}
116,594✔
1240

1241
void Connection::receive_pong(milliseconds_type timestamp)
1242
{
180✔
1243
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
180✔
1244

1245
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
180✔
1246
    if (REALM_UNLIKELY(!legal_at_this_time)) {
180✔
1247
        close_due_to_protocol_error(
×
1248
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1249
        return;
×
1250
    }
×
1251

1252
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
180✔
1253
        close_due_to_protocol_error(
×
1254
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1255
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1256
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1257
        return;
×
1258
    }
×
1259

1260
    milliseconds_type now = monotonic_clock_now();
180✔
1261
    milliseconds_type round_trip_time = now - timestamp;
180✔
1262
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
180✔
1263
    m_previous_ping_rtt = round_trip_time;
180✔
1264

1265
    // If this PONG message is a response to a PING mesage that was sent after
1266
    // the last invocation of cancel_reconnect_delay(), then the connection is
1267
    // still good, and we do not have to skip the next reconnect delay.
1268
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
180✔
1269
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
136✔
1270
        m_ping_after_scheduled_reset_of_reconnect_info = false;
136✔
1271
        m_reconnect_info.scheduled_reset = false;
136✔
1272
    }
136✔
1273

1274
    m_heartbeat_timer.reset();
180✔
1275
    m_waiting_for_pong = false;
180✔
1276

1277
    initiate_ping_delay(now); // Throws
180✔
1278

1279
    if (m_client.m_roundtrip_time_handler)
180✔
1280
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1281
}
180✔
1282

1283
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1284
{
75,252✔
1285
    if (session_ident == 0) {
75,252✔
1286
        return nullptr;
×
1287
    }
×
1288

1289
    auto* sess = get_session(session_ident);
75,252✔
1290
    if (REALM_LIKELY(sess)) {
75,252✔
1291
        return sess;
75,244✔
1292
    }
75,244✔
1293
    // Check the history to see if the message received was for a previous session
1294
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
8✔
1295
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1296
        close_due_to_protocol_error(
×
1297
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1298
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1299
                          session_ident)});
×
1300
    }
×
1301
    else {
8✔
1302
        logger.error("Received %1 message for closed session, session_ident = %2", message,
8✔
1303
                     session_ident); // Throws
8✔
1304
    }
8✔
1305
    return nullptr;
8✔
1306
}
75,252✔
1307

1308
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1309
{
944✔
1310
    Session* sess = nullptr;
944✔
1311
    if (session_ident != 0) {
944✔
1312
        sess = find_and_validate_session(session_ident, "ERROR");
874✔
1313
        if (REALM_UNLIKELY(!sess)) {
874✔
1314
            return;
×
1315
        }
×
1316
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
874✔
1317
            close_due_to_protocol_error(std::move(status)); // Throws
×
1318
            return;
×
1319
        }
×
1320

1321
        if (sess->m_state == Session::Deactivated) {
874✔
1322
            finish_session_deactivation(sess);
12✔
1323
        }
12✔
1324
        return;
874✔
1325
    }
874✔
1326

1327
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1328
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1329
                info.server_requests_action); // Throws
70✔
1330

1331
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1332
    if (REALM_LIKELY(known_error_code)) {
70✔
1333
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1334
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1335
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1336
            return;
66✔
1337
        }
66✔
1338
        close_due_to_protocol_error(
×
1339
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1340
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1341
                          info.raw_error_code)});
×
1342
    }
×
1343
    else {
4✔
1344
        close_due_to_protocol_error(
4✔
1345
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1346
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1347
    }
4✔
1348
}
70✔
1349

1350

1351
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1352
                                             session_ident_type session_ident)
1353
{
20✔
1354
    if (session_ident == 0) {
20✔
1355
        return close_due_to_protocol_error(
×
1356
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1357
    }
×
1358

1359
    if (!is_flx_sync_connection()) {
20✔
1360
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1361
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1362
    }
×
1363

1364
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1365
    if (REALM_UNLIKELY(!sess)) {
20✔
1366
        return;
×
1367
    }
×
1368

1369
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1370
        close_due_to_protocol_error(std::move(status));
×
1371
    }
×
1372
}
20✔
1373

1374

1375
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1376
{
3,608✔
1377
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,608✔
1378
    if (REALM_UNLIKELY(!sess)) {
3,608✔
1379
        return;
×
1380
    }
×
1381

1382
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,608✔
1383
        close_due_to_protocol_error(std::move(status)); // Throws
×
1384
}
3,608✔
1385

1386
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1387
{
49,730✔
1388
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
49,730✔
1389
    if (REALM_UNLIKELY(!sess)) {
49,730✔
1390
        return;
×
1391
    }
×
1392

1393
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
49,730✔
1394
        close_due_to_protocol_error(std::move(status));
4✔
1395
    }
4✔
1396
}
49,730✔
1397

1398
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1399
{
17,002✔
1400
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,002✔
1401
    if (REALM_UNLIKELY(!sess)) {
17,002✔
1402
        return;
×
1403
    }
×
1404

1405
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,002✔
1406
        close_due_to_protocol_error(std::move(status)); // Throws
8✔
1407
}
17,002✔
1408

1409

1410
void Connection::receive_unbound_message(session_ident_type session_ident)
1411
{
3,960✔
1412
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,960✔
1413
    if (REALM_UNLIKELY(!sess)) {
3,960✔
1414
        return;
×
1415
    }
×
1416

1417
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,960✔
1418
        close_due_to_protocol_error(std::move(status)); // Throws
×
1419
        return;
×
1420
    }
×
1421

1422
    if (sess->m_state == Session::Deactivated) {
3,960✔
1423
        finish_session_deactivation(sess);
3,960✔
1424
    }
3,960✔
1425
}
3,960✔
1426

1427

1428
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1429
                                               std::string_view body)
1430
{
60✔
1431
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
60✔
1432
    if (REALM_UNLIKELY(!sess)) {
60✔
1433
        return;
×
1434
    }
×
1435

1436
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
60✔
1437
        close_due_to_protocol_error(std::move(status));
×
1438
    }
×
1439
}
60✔
1440

1441

1442
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1443
                                            std::string_view message)
1444
{
5,908✔
1445
    std::string prefix;
5,908✔
1446
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,908✔
1447
        prefix = util::format("Server[%1]", m_appservices_coid);
5,908✔
1448
    }
5,908✔
UNCOV
1449
    else {
×
UNCOV
1450
        prefix = "Server";
×
UNCOV
1451
    }
×
1452

1453
    if (session_ident != 0) {
5,908✔
1454
        if (auto sess = get_session(session_ident)) {
3,916✔
1455
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,908✔
1456
            return;
3,908✔
1457
        }
3,908✔
1458

1459
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
8✔
1460
                   message);
8✔
1461
        return;
8✔
1462
    }
3,916✔
1463

1464
    logger.log(level, "%1 log: %2", prefix, message);
1,992✔
1465
}
1,992✔
1466

1467

1468
void Connection::receive_appservices_request_id(std::string_view coid)
1469
{
5,600✔
1470
    // Only set once per connection
1471
    if (!coid.empty() && m_appservices_coid.empty()) {
5,600✔
1472
        m_appservices_coid = coid;
2,552✔
1473
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,552✔
1474
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,552✔
1475
    }
2,552✔
1476
}
5,600✔
1477

1478

1479
void Connection::handle_protocol_error(Status status)
1480
{
×
1481
    close_due_to_protocol_error(std::move(status));
×
1482
}
×
1483

1484

1485
// Sessions are guaranteed to be granted the opportunity to send a message in
1486
// the order that they enlist. Note that this is important to ensure
1487
// nonoverlapping communication with the server for consecutive sessions
1488
// associated with the same Realm file.
1489
//
1490
// CAUTION: The specified session may get destroyed before this function
1491
// returns, but only if its Session::send_message() puts it into the Deactivated
1492
// state.
1493
void Connection::enlist_to_send(Session* sess)
1494
{
179,182✔
1495
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
179,182✔
1496
    m_sessions_enlisted_to_send.push_back(sess); // Throws
179,182✔
1497
    if (!m_sending)
179,182✔
1498
        send_next_message(); // Throws
77,788✔
1499
}
179,182✔
1500

1501

1502
std::string Connection::get_active_appservices_connection_id()
1503
{
72✔
1504
    return m_appservices_coid;
72✔
1505
}
72✔
1506

1507
void Session::cancel_resumption_delay()
1508
{
4,192✔
1509
    REALM_ASSERT_EX(m_state == Active, m_state);
4,192✔
1510

1511
    if (!m_suspended)
4,192✔
1512
        return;
4,022✔
1513

1514
    m_suspended = false;
170✔
1515

1516
    logger.debug("Resumed"); // Throws
170✔
1517

1518
    if (unbind_process_complete())
170✔
1519
        initiate_rebind(); // Throws
132✔
1520

1521
    try {
170✔
1522
        process_pending_flx_bootstrap(); // throws
170✔
1523
    }
170✔
1524
    catch (const IntegrationException& error) {
170✔
1525
        on_integration_failure(error);
×
1526
    }
×
1527
    catch (...) {
170✔
1528
        on_integration_failure(IntegrationException(exception_to_status()));
×
1529
    }
×
1530

1531
    m_conn.one_more_active_unsuspended_session(); // Throws
170✔
1532
    if (m_try_again_activation_timer) {
170✔
1533
        m_try_again_activation_timer.reset();
8✔
1534
    }
8✔
1535

1536
    on_resumed(); // Throws
170✔
1537
}
170✔
1538

1539

1540
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1541
                                                 std::vector<ProtocolErrorInfo>* out)
1542
{
23,410✔
1543
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,410✔
1544
        return;
23,362✔
1545
    }
23,362✔
1546

1547
#ifdef REALM_DEBUG
48✔
1548
    REALM_ASSERT_DEBUG(
48✔
1549
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
48✔
1550
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
48✔
1551
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
48✔
1552
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
48✔
1553
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
48✔
1554
                       }));
48✔
1555
#endif
48✔
1556

1557
    while (!m_pending_compensating_write_errors.empty() &&
96✔
1558
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
96✔
1559
               changesets.back().version) {
48✔
1560
        auto& cur_error = m_pending_compensating_write_errors.front();
48✔
1561
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
48✔
1562
        out->push_back(std::move(cur_error));
48✔
1563
        m_pending_compensating_write_errors.pop_front();
48✔
1564
    }
48✔
1565
}
48✔
1566

1567

1568
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1569
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1570
                                   DownloadBatchState download_batch_state)
1571
{
45,060✔
1572
    auto& history = get_history();
45,060✔
1573
    if (received_changesets.empty()) {
45,060✔
1574
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,630✔
1575
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1576
                                       "received empty download message that was not the last in batch",
×
1577
                                       ProtocolError::bad_progress);
×
1578
        }
×
1579
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,630✔
1580
        return;
21,630✔
1581
    }
21,630✔
1582

1583
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,430✔
1584
    auto transact = get_db()->start_read();
23,430✔
1585
    history.integrate_server_changesets(
23,430✔
1586
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,430✔
1587
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
23,430✔
1588
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,410✔
1589
        }); // Throws
23,410✔
1590
    if (received_changesets.size() == 1) {
23,430✔
1591
        logger.debug("1 remote changeset integrated, producing client version %1",
15,406✔
1592
                     version_info.sync_version.version); // Throws
15,406✔
1593
    }
15,406✔
1594
    else {
8,024✔
1595
        logger.debug("%2 remote changesets integrated, producing client version %1",
8,024✔
1596
                     version_info.sync_version.version, received_changesets.size()); // Throws
8,024✔
1597
    }
8,024✔
1598

1599
    for (const auto& pending_error : pending_compensating_write_errors) {
23,430✔
1600
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
48✔
1601
                    pending_error.compensating_write_rejected_client_version,
48✔
1602
                    *pending_error.compensating_write_server_version, pending_error.message);
48✔
1603
        try {
48✔
1604
            on_connection_state_changed(
48✔
1605
                m_conn.get_state(),
48✔
1606
                SessionErrorInfo{pending_error,
48✔
1607
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
48✔
1608
                                                          pending_error.message)});
48✔
1609
        }
48✔
1610
        catch (...) {
48✔
1611
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1612
        }
×
1613
    }
48✔
1614
}
23,430✔
1615

1616

1617
void Session::on_integration_failure(const IntegrationException& error)
1618
{
40✔
1619
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1620
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1621
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1622

1623
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1624
    m_error_to_send = true;
40✔
1625
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1626
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1627
    // Surface the error to the user otherwise is lost.
1628
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1629

1630
    // Since the deactivation process has not been initiated, the UNBIND
1631
    // message cannot have been sent unless an ERROR message was received.
1632
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1633
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1634
        ensure_enlisted_to_send(); // Throws
36✔
1635
    }
36✔
1636
}
40✔
1637

1638
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1639
{
47,424✔
1640
    REALM_ASSERT_EX(m_state == Active, m_state);
47,424✔
1641
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,424✔
1642

1643
    m_download_progress = progress.download;
47,424✔
1644
    m_progress = progress;
47,424✔
1645

1646
    if (progress.upload.client_version > m_upload_progress.client_version)
47,424✔
1647
        m_upload_progress = progress.upload;
510✔
1648

1649
    do_recognize_sync_version(client_version); // Allows upload process to resume
47,424✔
1650

1651
    check_for_download_completion(); // Throws
47,424✔
1652

1653
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1654
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,424✔
1655
        auto& flx_subscription_store = *get_flx_subscription_store();
3,908✔
1656
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,908✔
1657
    }
3,908✔
1658

1659
    // Since the deactivation process has not been initiated, the UNBIND
1660
    // message cannot have been sent unless an ERROR message was received.
1661
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,424✔
1662
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,424✔
1663
        ensure_enlisted_to_send(); // Throws
47,416✔
1664
    }
47,416✔
1665
}
47,424✔
1666

1667

1668
Session::~Session()
1669
{
10,402✔
1670
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1671
}
10,402✔
1672

1673

1674
std::string Session::make_logger_prefix(session_ident_type ident)
1675
{
10,402✔
1676
    std::ostringstream out;
10,402✔
1677
    out.imbue(std::locale::classic());
10,402✔
1678
    out << "Session[" << ident << "]: "; // Throws
10,402✔
1679
    return out.str();                    // Throws
10,402✔
1680
}
10,402✔
1681

1682

1683
void Session::activate()
1684
{
10,402✔
1685
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,402✔
1686

1687
    logger.debug("Activating"); // Throws
10,402✔
1688

1689
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,402✔
1690
        bool file_exists = util::File::exists(get_realm_path());
10,400✔
1691

1692
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,400✔
1693
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,400✔
1694
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,400✔
1695
    }
10,400✔
1696
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,402✔
1697
                 m_client_file_ident.salt); // Throws
10,402✔
1698
    m_upload_progress = m_progress.upload;
10,402✔
1699
    m_download_progress = m_progress.download;
10,402✔
1700
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,402✔
1701
    init_progress_handler();
10,402✔
1702

1703
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,402✔
1704
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,402✔
1705
    logger.debug("progress_download_client_version = %1",
10,402✔
1706
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,402✔
1707
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,402✔
1708
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,402✔
1709

1710
    reset_protocol_state();
10,402✔
1711
    m_state = Active;
10,402✔
1712

1713
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,402✔
1714

1715
    REALM_ASSERT(!m_suspended);
10,402✔
1716
    m_conn.one_more_active_unsuspended_session(); // Throws
10,402✔
1717

1718
    try {
10,402✔
1719
        process_pending_flx_bootstrap(); // throws
10,402✔
1720
    }
10,402✔
1721
    catch (const IntegrationException& error) {
10,402✔
1722
        on_integration_failure(error);
×
1723
    }
×
1724
    catch (...) {
10,402✔
1725
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1726
    }
4✔
1727

1728
    // Checks if there is a pending client reset
1729
    handle_pending_client_reset_acknowledgement();
10,402✔
1730
}
10,400✔
1731

1732

1733
// The caller (Connection) must discard the session if the session has become
1734
// deactivated upon return.
1735
void Session::initiate_deactivation()
1736
{
10,402✔
1737
    REALM_ASSERT_EX(m_state == Active, m_state);
10,402✔
1738

1739
    logger.debug("Initiating deactivation"); // Throws
10,402✔
1740

1741
    m_state = Deactivating;
10,402✔
1742

1743
    if (!m_suspended)
10,402✔
1744
        m_conn.one_less_active_unsuspended_session(); // Throws
9,742✔
1745

1746
    if (m_enlisted_to_send) {
10,402✔
1747
        REALM_ASSERT(!unbind_process_complete());
5,480✔
1748
        return;
5,480✔
1749
    }
5,480✔
1750

1751
    // Deactivate immediately if the BIND message has not yet been sent and the
1752
    // session is not enlisted to send, or if the unbinding process has already
1753
    // completed.
1754
    if (!m_bind_message_sent || unbind_process_complete()) {
4,922✔
1755
        complete_deactivation(); // Throws
952✔
1756
        // Life cycle state is now Deactivated
1757
        return;
952✔
1758
    }
952✔
1759

1760
    // Ready to send the UNBIND message, if it has not already been sent
1761
    if (!m_unbind_message_sent) {
3,970✔
1762
        enlist_to_send(); // Throws
3,766✔
1763
        return;
3,766✔
1764
    }
3,766✔
1765
}
3,970✔
1766

1767

1768
void Session::complete_deactivation()
1769
{
10,400✔
1770
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,400✔
1771
    m_state = Deactivated;
10,400✔
1772

1773
    logger.debug("Deactivation completed"); // Throws
10,400✔
1774
}
10,400✔
1775

1776

1777
// Called by the associated Connection object when this session is granted an
1778
// opportunity to send a message.
1779
//
1780
// The caller (Connection) must discard the session if the session has become
1781
// deactivated upon return.
1782
void Session::send_message()
1783
{
177,616✔
1784
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
177,616✔
1785
    REALM_ASSERT(m_enlisted_to_send);
177,616✔
1786
    m_enlisted_to_send = false;
177,616✔
1787
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
177,616✔
1788
        // Deactivation has been initiated. If the UNBIND message has not been
1789
        // sent yet, there is no point in sending it. Instead, we can let the
1790
        // deactivation process complete.
1791
        if (!m_bind_message_sent) {
9,674✔
1792
            return complete_deactivation(); // Throws
2,906✔
1793
            // Life cycle state is now Deactivated
1794
        }
2,906✔
1795

1796
        // Session life cycle state is Deactivating or the unbinding process has
1797
        // been initiated by a session specific ERROR message
1798
        if (!m_unbind_message_sent)
6,768✔
1799
            send_unbind_message(); // Throws
6,768✔
1800
        return;
6,768✔
1801
    }
9,674✔
1802

1803
    // Session life cycle state is Active and the unbinding process has
1804
    // not been initiated
1805
    REALM_ASSERT(!m_unbind_message_sent);
167,942✔
1806

1807
    if (!m_bind_message_sent)
167,942✔
1808
        return send_bind_message(); // Throws
9,210✔
1809

1810
    // Pending test commands can be sent any time after the BIND message is sent
1811
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
158,732✔
1812
                                                      [](const PendingTestCommand& command) {
158,732✔
1813
                                                          return command.pending;
148✔
1814
                                                      });
148✔
1815
    if (has_pending_test_command) {
158,732✔
1816
        return send_test_command_message();
60✔
1817
    }
60✔
1818

1819
    if (!m_ident_message_sent) {
158,672✔
1820
        if (have_client_file_ident())
7,752✔
1821
            send_ident_message(); // Throws
7,752✔
1822
        return;
7,752✔
1823
    }
7,752✔
1824

1825
    if (m_error_to_send)
150,920✔
1826
        return send_json_error_message(); // Throws
32✔
1827

1828
    // Stop sending upload, mark and query messages when the client detects an error.
1829
    if (m_client_error) {
150,888✔
1830
        return;
12✔
1831
    }
12✔
1832

1833
    if (m_target_download_mark > m_last_download_mark_sent)
150,876✔
1834
        return send_mark_message(); // Throws
17,856✔
1835

1836
    auto is_upload_allowed = [&]() -> bool {
133,024✔
1837
        if (!m_is_flx_sync_session) {
133,024✔
1838
            return true;
109,892✔
1839
        }
109,892✔
1840

1841
        auto migration_store = get_migration_store();
23,132✔
1842
        if (!migration_store) {
23,132✔
1843
            return true;
×
1844
        }
×
1845

1846
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
23,132✔
1847
        if (!sentinel_query_version) {
23,132✔
1848
            return true;
23,104✔
1849
        }
23,104✔
1850

1851
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1852
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1853
    };
23,132✔
1854

1855
    if (!is_upload_allowed()) {
133,020✔
1856
        return;
16✔
1857
    }
16✔
1858

1859
    auto check_pending_flx_version = [&]() -> bool {
133,008✔
1860
        if (!m_is_flx_sync_session) {
133,008✔
1861
            return false;
109,892✔
1862
        }
109,892✔
1863

1864
        if (m_delay_uploads) {
23,116✔
1865
            return false;
3,760✔
1866
        }
3,760✔
1867

1868
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
19,356✔
1869

1870
        if (!m_pending_flx_sub_set) {
19,356✔
1871
            return false;
16,878✔
1872
        }
16,878✔
1873

1874
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1875
        // of a pending subscription
1876
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,478✔
1877
    };
19,356✔
1878

1879
    if (check_pending_flx_version()) {
133,004✔
1880
        return send_query_change_message(); // throws
1,372✔
1881
    }
1,372✔
1882

1883
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
131,632✔
1884
        return send_upload_message(); // Throws
62,320✔
1885
    }
62,320✔
1886
}
131,632✔
1887

1888

1889
void Session::send_bind_message()
1890
{
9,210✔
1891
    REALM_ASSERT_EX(m_state == Active, m_state);
9,210✔
1892

1893
    session_ident_type session_ident = m_ident;
9,210✔
1894
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1895
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1896
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1897
    // to the server.
1898
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
9,210✔
1899
    const bool is_subserver = false;
9,210✔
1900

1901
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,210✔
1902
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,210✔
1903
    OutputBuffer& out = m_conn.get_output_buffer();
9,210✔
1904
    // Discard the token since it's ignored by the server.
1905
    std::string empty_access_token;
9,210✔
1906
    if (m_is_flx_sync_session) {
9,210✔
1907
        nlohmann::json bind_json_data;
1,884✔
1908
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,884✔
1909
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1910
        }
60✔
1911
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,884✔
1912
        auto schema_version = get_schema_version();
1,884✔
1913
        // Send 0 if schema is not versioned.
1914
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,884✔
1915
        if (logger.would_log(util::Logger::Level::debug)) {
1,884✔
1916
            std::string json_data_dump;
1,884✔
1917
            if (!bind_json_data.empty()) {
1,884✔
1918
                json_data_dump = bind_json_data.dump();
1,884✔
1919
            }
1,884✔
1920
            logger.debug(
1,884✔
1921
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,884✔
1922
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,884✔
1923
        }
1,884✔
1924
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,884✔
1925
                                       need_client_file_ident, is_subserver); // Throws
1,884✔
1926
    }
1,884✔
1927
    else {
7,326✔
1928
        std::string server_path = get_virt_path();
7,326✔
1929
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,326✔
1930
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,326✔
1931
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,326✔
1932
                                       need_client_file_ident, is_subserver); // Throws
7,326✔
1933
    }
7,326✔
1934
    m_conn.initiate_write_message(out, this); // Throws
9,210✔
1935

1936
    m_bind_message_sent = true;
9,210✔
1937
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
9,210✔
1938

1939
    // If there is a pending client reset diff, process that when the BIND message has
1940
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1941
    // ready to send the IDENT message if the file identifier pair is already available.
1942
    if (!need_client_file_ident)
9,210✔
1943
        enlist_to_send(); // Throws
5,406✔
1944
}
9,210✔
1945

1946

1947
void Session::send_ident_message()
1948
{
7,752✔
1949
    REALM_ASSERT_EX(m_state == Active, m_state);
7,752✔
1950
    REALM_ASSERT(m_bind_message_sent);
7,752✔
1951
    REALM_ASSERT(!m_unbind_message_sent);
7,752✔
1952
    REALM_ASSERT(have_client_file_ident());
7,752✔
1953

1954
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,752✔
1955
    OutputBuffer& out = m_conn.get_output_buffer();
7,752✔
1956
    session_ident_type session_ident = m_ident;
7,752✔
1957

1958
    if (m_is_flx_sync_session) {
7,752✔
1959
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,790✔
1960
        const auto active_query_body = active_query_set.to_ext_json();
1,790✔
1961
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,790✔
1962
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,790✔
1963
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,790✔
1964
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,790✔
1965
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,790✔
1966
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,790✔
1967
                     active_query_body); // Throws
1,790✔
1968
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,790✔
1969
                                        active_query_set.version(), active_query_body); // Throws
1,790✔
1970
        m_last_sent_flx_query_version = active_query_set.version();
1,790✔
1971
    }
1,790✔
1972
    else {
5,962✔
1973
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,962✔
1974
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,962✔
1975
                     "latest_server_version_salt=%6)",
5,962✔
1976
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,962✔
1977
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,962✔
1978
                     m_progress.latest_server_version.salt);                                  // Throws
5,962✔
1979
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,962✔
1980
    }
5,962✔
1981
    m_conn.initiate_write_message(out, this); // Throws
7,752✔
1982

1983
    m_ident_message_sent = true;
7,752✔
1984
    call_debug_hook(SyncClientHookEvent::IdentMessageSent);
7,752✔
1985

1986
    // Other messages may be waiting to be sent
1987
    enlist_to_send(); // Throws
7,752✔
1988
}
7,752✔
1989

1990
void Session::send_query_change_message()
1991
{
1,372✔
1992
    REALM_ASSERT_EX(m_state == Active, m_state);
1,372✔
1993
    REALM_ASSERT(m_ident_message_sent);
1,372✔
1994
    REALM_ASSERT(!m_unbind_message_sent);
1,372✔
1995
    REALM_ASSERT(m_pending_flx_sub_set);
1,372✔
1996
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,372✔
1997

1998
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,372✔
1999
        return;
×
2000
    }
×
2001

2002
    auto sub_store = get_flx_subscription_store();
1,372✔
2003
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,372✔
2004
    auto latest_queries = latest_sub_set.to_ext_json();
1,372✔
2005
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,372✔
2006
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,372✔
2007

2008
    OutputBuffer& out = m_conn.get_output_buffer();
1,372✔
2009
    session_ident_type session_ident = get_ident();
1,372✔
2010
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,372✔
2011
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,372✔
2012
    m_conn.initiate_write_message(out, this);
1,372✔
2013

2014
    m_last_sent_flx_query_version = latest_sub_set.version();
1,372✔
2015

2016
    request_download_completion_notification();
1,372✔
2017
}
1,372✔
2018

2019
void Session::send_upload_message()
2020
{
62,316✔
2021
    REALM_ASSERT_EX(m_state == Active, m_state);
62,316✔
2022
    REALM_ASSERT(m_ident_message_sent);
62,316✔
2023
    REALM_ASSERT(!m_unbind_message_sent);
62,316✔
2024

2025
    if (REALM_UNLIKELY(get_client().is_dry_run()))
62,316✔
2026
        return;
×
2027

2028
    version_type target_upload_version = m_last_version_available;
62,316✔
2029
    if (m_pending_flx_sub_set) {
62,316✔
2030
        REALM_ASSERT(m_is_flx_sync_session);
1,106✔
2031
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
1,106✔
2032
    }
1,106✔
2033

2034
    bool server_version_to_ack =
62,316✔
2035
        m_upload_progress.last_integrated_server_version < m_download_progress.server_version;
62,316✔
2036

2037
    std::vector<UploadChangeset> uploadable_changesets;
62,316✔
2038
    version_type locked_server_version = 0;
62,316✔
2039
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
62,316✔
2040
                                             locked_server_version); // Throws
62,316✔
2041

2042
    if (uploadable_changesets.empty()) {
62,316✔
2043
        // Nothing more to upload right now if:
2044
        //  1. We need to limit upload up to some version other than the last client version
2045
        //     available and there are no changes to upload
2046
        //  2. There are no changes to upload and no server version(s) to acknowledge
2047
        if (m_pending_flx_sub_set || !server_version_to_ack) {
32,660✔
2048
            logger.trace("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
6,308✔
2049
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
6,308✔
2050
            // Other messages may be waiting to be sent
2051
            return enlist_to_send(); // Throws
6,308✔
2052
        }
6,308✔
2053
    }
32,660✔
2054

2055
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
56,008✔
2056
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
744✔
2057
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
744✔
2058
    }
744✔
2059

2060
    version_type progress_client_version = m_upload_progress.client_version;
56,008✔
2061
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
56,008✔
2062

2063
    if (!upload_messages_allowed()) {
56,008✔
2064
        logger.trace("UPLOAD not allowed (progress_client_version=%1, progress_server_version=%2, "
572✔
2065
                     "locked_server_version=%3, num_changesets=%4)",
572✔
2066
                     progress_client_version, progress_server_version, locked_server_version,
572✔
2067
                     uploadable_changesets.size()); // Throws
572✔
2068
        // Other messages may be waiting to be sent
2069
        return enlist_to_send(); // Throws
572✔
2070
    }
572✔
2071

2072
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,436✔
2073
                 "locked_server_version=%3, num_changesets=%4)",
55,436✔
2074
                 progress_client_version, progress_server_version, locked_server_version,
55,436✔
2075
                 uploadable_changesets.size()); // Throws
55,436✔
2076

2077
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,436✔
2078
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,436✔
2079

2080
    for (const UploadChangeset& uc : uploadable_changesets) {
55,436✔
2081
        logger.debug(util::LogCategory::changeset,
43,150✔
2082
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,150✔
2083
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,150✔
2084
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,150✔
2085
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,150✔
2086
        if (logger.would_log(util::Logger::Level::trace)) {
43,150✔
2087
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2088
            if (changeset_data.size() < 1024) {
×
2089
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2090
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2091
            }
×
2092
            else {
×
2093
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2094
                             protocol.compressed_hex_dump(changeset_data));
×
2095
            }
×
2096

2097
#if REALM_DEBUG
×
2098
            ChunkedBinaryInputStream in{changeset_data};
×
2099
            Changeset log;
×
2100
            try {
×
2101
                parse_changeset(in, log);
×
2102
                std::stringstream ss;
×
2103
                log.print(ss);
×
2104
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2105
            }
×
2106
            catch (const BadChangesetError& err) {
×
2107
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2108
            }
×
2109
#endif
×
2110
        }
×
2111

2112
#if 0 // Upload log compaction is currently not implemented
2113
        if (!get_client().m_disable_upload_compaction) {
2114
            ChangesetEncoder::Buffer encode_buffer;
2115

2116
            {
2117
                // Upload compaction only takes place within single changesets to
2118
                // avoid another client seeing inconsistent snapshots.
2119
                ChunkedBinaryInputStream stream{uc.changeset};
2120
                Changeset changeset;
2121
                parse_changeset(stream, changeset); // Throws
2122
                // FIXME: What is the point of setting these? How can compaction care about them?
2123
                changeset.version = uc.progress.client_version;
2124
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2125
                changeset.origin_timestamp = uc.origin_timestamp;
2126
                changeset.origin_file_ident = uc.origin_file_ident;
2127

2128
                compact_changesets(&changeset, 1);
2129
                encode_changeset(changeset, encode_buffer);
2130

2131
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2132
                             encode_buffer.size()); // Throws
2133
            }
2134

2135
            upload_message_builder.add_changeset(
2136
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2137
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2138
        }
2139
        else
2140
#endif
2141
        {
43,150✔
2142
            upload_message_builder.add_changeset(uc.progress.client_version,
43,150✔
2143
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,150✔
2144
                                                 uc.origin_file_ident,
43,150✔
2145
                                                 uc.changeset); // Throws
43,150✔
2146
        }
43,150✔
2147
    }
43,150✔
2148

2149
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,436✔
2150
    OutputBuffer& out = m_conn.get_output_buffer();
55,436✔
2151
    session_ident_type session_ident = get_ident();
55,436✔
2152
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,436✔
2153
                                               progress_server_version,
55,436✔
2154
                                               locked_server_version); // Throws
55,436✔
2155
    m_conn.initiate_write_message(out, this);                          // Throws
55,436✔
2156

2157
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
55,436✔
2158

2159
    // Other messages may be waiting to be sent
2160
    enlist_to_send(); // Throws
55,436✔
2161
}
55,436✔
2162

2163

2164
void Session::send_mark_message()
2165
{
17,856✔
2166
    REALM_ASSERT_EX(m_state == Active, m_state);
17,856✔
2167
    REALM_ASSERT(m_ident_message_sent);
17,856✔
2168
    REALM_ASSERT(!m_unbind_message_sent);
17,856✔
2169
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,856✔
2170

2171
    request_ident_type request_ident = m_target_download_mark;
17,856✔
2172
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,856✔
2173

2174
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,856✔
2175
    OutputBuffer& out = m_conn.get_output_buffer();
17,856✔
2176
    session_ident_type session_ident = get_ident();
17,856✔
2177
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,856✔
2178
    m_conn.initiate_write_message(out, this);                      // Throws
17,856✔
2179

2180
    m_last_download_mark_sent = request_ident;
17,856✔
2181

2182
    // Other messages may be waiting to be sent
2183
    enlist_to_send(); // Throws
17,856✔
2184
}
17,856✔
2185

2186

2187
void Session::send_unbind_message()
2188
{
6,768✔
2189
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,768✔
2190
    REALM_ASSERT(m_bind_message_sent);
6,768✔
2191
    REALM_ASSERT(!m_unbind_message_sent);
6,768✔
2192

2193
    logger.debug("Sending: UNBIND"); // Throws
6,768✔
2194

2195
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,768✔
2196
    OutputBuffer& out = m_conn.get_output_buffer();
6,768✔
2197
    session_ident_type session_ident = get_ident();
6,768✔
2198
    protocol.make_unbind_message(out, session_ident); // Throws
6,768✔
2199
    m_conn.initiate_write_message(out, this);         // Throws
6,768✔
2200

2201
    m_unbind_message_sent = true;
6,768✔
2202
}
6,768✔
2203

2204

2205
void Session::send_json_error_message()
2206
{
32✔
2207
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2208
    REALM_ASSERT(m_ident_message_sent);
32✔
2209
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2210
    REALM_ASSERT(m_error_to_send);
32✔
2211
    REALM_ASSERT(m_client_error);
32✔
2212

2213
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2214
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2215
    session_ident_type session_ident = get_ident();
32✔
2216
    auto protocol_error = m_client_error->error_for_server;
32✔
2217

2218
    auto message = util::format("%1", m_client_error->to_status());
32✔
2219
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2220
                session_ident); // Throws
32✔
2221

2222
    nlohmann::json error_body_json;
32✔
2223
    error_body_json["message"] = std::move(message);
32✔
2224
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2225
                                     error_body_json.dump()); // Throws
32✔
2226
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2227

2228
    m_error_to_send = false;
32✔
2229
    enlist_to_send(); // Throws
32✔
2230
}
32✔
2231

2232

2233
void Session::send_test_command_message()
2234
{
60✔
2235
    REALM_ASSERT_EX(m_state == Active, m_state);
60✔
2236

2237
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2238
                           [](const PendingTestCommand& command) {
64✔
2239
                               return command.pending;
64✔
2240
                           });
64✔
2241
    REALM_ASSERT(it != m_pending_test_commands.end());
60✔
2242

2243
    ClientProtocol& protocol = m_conn.get_client_protocol();
60✔
2244
    OutputBuffer& out = m_conn.get_output_buffer();
60✔
2245
    auto session_ident = get_ident();
60✔
2246

2247
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
60✔
2248
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
60✔
2249

2250
    m_conn.initiate_write_message(out, this); // Throws;
60✔
2251
    it->pending = false;
60✔
2252

2253
    enlist_to_send();
60✔
2254
}
60✔
2255

2256
bool Session::client_reset_if_needed()
2257
{
424✔
2258
    // Even if we end up not actually performing a client reset, consume the
2259
    // config to ensure that the resources it holds are released
2260
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
424✔
2261
    if (!client_reset_config) {
424✔
2262
        return false;
×
2263
    }
×
2264

2265
    // Save a copy of the status and action in case an error/exception occurs
2266
    Status cr_status = client_reset_config->error;
424✔
2267
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
424✔
2268

2269
    auto on_flx_version_complete = [this](int64_t version) {
424✔
2270
        this->on_flx_sync_version_complete(version);
348✔
2271
    };
348✔
2272
    try {
424✔
2273
        // The file ident from the fresh realm will be copied over to the local realm
2274
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
424✔
2275
                                                            get_flx_subscription_store(), on_flx_version_complete);
424✔
2276

2277
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
424✔
2278
        if (!did_reset) {
424✔
2279
            return false;
×
2280
        }
×
2281
    }
424✔
2282
    catch (const std::exception& e) {
424✔
2283
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2284
                                    cr_status, e.what());
80✔
2285
        logger.error(err_msg.c_str());
80✔
2286
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2287
        suspend(err_info);
80✔
2288
        return false;
80✔
2289
    }
80✔
2290

2291
    // The fresh Realm has been used to reset the state
2292
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
344✔
2293

2294
    // Update the version, file ident and progress info after the client reset diff is done
2295
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
344✔
2296
    // Print the version/progress information before performing the asserts
2297
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
344✔
2298
                 m_client_file_ident.salt);                                // Throws
344✔
2299
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
344✔
2300
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
344✔
2301
                 m_progress.upload.client_version,
344✔
2302
                 m_progress.upload.last_integrated_server_version); // Throws
344✔
2303
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
344✔
2304
                 m_progress.download.last_integrated_client_version,
344✔
2305
                 m_progress.download.server_version); // Throws
344✔
2306

2307
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2308
                    m_progress.download.last_integrated_client_version);
344✔
2309
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2310

2311
    m_upload_progress = m_progress.upload;
344✔
2312
    m_download_progress = m_progress.download;
344✔
2313
    init_progress_handler();
344✔
2314
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2315
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2316
    // For both, we want to allow uploads again without needing external changes to download first.
2317
    m_delay_uploads = false;
344✔
2318

2319
    // Checks if there is a pending client reset
2320
    handle_pending_client_reset_acknowledgement();
344✔
2321

2322
    update_subscription_version_info();
344✔
2323

2324
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2325
    if (auto migration_store = get_migration_store()) {
344✔
2326
        migration_store->complete_migration_or_rollback();
316✔
2327
    }
316✔
2328

2329
    return true;
344✔
2330
}
424✔
2331

2332
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2333
{
3,606✔
2334
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,606✔
2335
                 client_file_ident.salt); // Throws
3,606✔
2336

2337
    // Ignore the message if the deactivation process has been initiated,
2338
    // because in that case, the associated Realm and SessionWrapper must
2339
    // not be accessed any longer.
2340
    if (m_state != Active)
3,606✔
2341
        return Status::OK(); // Success
70✔
2342

2343
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,538✔
2344
                               !m_unbound_message_received);
3,538✔
2345
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,536✔
2346
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2347
    }
×
2348
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,536✔
2349
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2350
    }
×
2351
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,536✔
2352
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2353
    }
×
2354

2355
    m_client_file_ident = client_file_ident;
3,536✔
2356

2357
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,536✔
2358
        // Ready to send the IDENT message
2359
        ensure_enlisted_to_send(); // Throws
×
2360
        return Status::OK();       // Success
×
2361
    }
×
2362

2363
    get_history().set_client_file_ident(client_file_ident,
3,536✔
2364
                                        m_fix_up_object_ids); // Throws
3,536✔
2365
    m_progress.download.last_integrated_client_version = 0;
3,536✔
2366
    m_progress.upload.client_version = 0;
3,536✔
2367

2368
    // Ready to send the IDENT message
2369
    ensure_enlisted_to_send(); // Throws
3,536✔
2370
    return Status::OK();       // Success
3,536✔
2371
}
3,536✔
2372

2373
Status Session::receive_download_message(const DownloadMessage& message)
2374
{
49,726✔
2375
    // Ignore the message if the deactivation process has been initiated,
2376
    // because in that case, the associated Realm and SessionWrapper must
2377
    // not be accessed any longer.
2378
    if (m_state != Active)
49,726✔
2379
        return Status::OK();
704✔
2380

2381
    bool is_flx = m_conn.is_flx_sync_connection();
49,022✔
2382
    int64_t query_version = is_flx ? *message.query_version : 0;
49,022✔
2383

2384
    if (!is_flx || query_version > 0)
49,022✔
2385
        enable_progress_notifications();
47,074✔
2386

2387
    auto&& progress = message.progress;
49,022✔
2388
    if (is_flx) {
49,022✔
2389
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
5,482✔
2390
                     "latest_server_version=%3, latest_server_version_salt=%4, "
5,482✔
2391
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
5,482✔
2392
                     "batch_state=%8, query_version=%9, num_changesets=%10, ...)",
5,482✔
2393
                     progress.download.server_version, progress.download.last_integrated_client_version,
5,482✔
2394
                     progress.latest_server_version.version, progress.latest_server_version.salt,
5,482✔
2395
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
5,482✔
2396
                     message.downloadable.as_estimate(), message.batch_state, query_version,
5,482✔
2397
                     message.changesets.size()); // Throws
5,482✔
2398
    }
5,482✔
2399
    else {
43,540✔
2400
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,540✔
2401
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,540✔
2402
                     "upload_client_version=%5, upload_server_version=%6, "
43,540✔
2403
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,540✔
2404
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,540✔
2405
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,540✔
2406
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,540✔
2407
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,540✔
2408
    }
43,540✔
2409

2410
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2411
    // changeset over and over again.
2412
    if (m_client_error) {
49,022✔
2413
        logger.debug("Ignoring download message because the client detected an integration error");
×
2414
        return Status::OK();
×
2415
    }
×
2416

2417
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
49,026✔
2418
    if (REALM_UNLIKELY(!legal_at_this_time)) {
49,022✔
2419
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
4✔
2420
    }
4✔
2421
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
49,018✔
2422
        logger.error("Bad sync progress received (%1)", status);
×
2423
        return status;
×
2424
    }
×
2425

2426
    version_type server_version = m_progress.download.server_version;
49,018✔
2427
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
49,018✔
2428
    for (const RemoteChangeset& changeset : message.changesets) {
50,944✔
2429
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2430
        // version must be increasing, but can stay the same during bootstraps.
2431
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
48,032✔
2432
                                                         : (changeset.remote_version > server_version);
48,032✔
2433
        // Each server version cannot be greater than the one in the header of the download message.
2434
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
48,032✔
2435
        if (!good_server_version) {
48,032✔
2436
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2437
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2438
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2439
        }
×
2440
        server_version = changeset.remote_version;
48,032✔
2441

2442
        // Check that per-changeset last integrated client version is "weakly"
2443
        // increasing.
2444
        bool good_client_version =
48,032✔
2445
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
48,032✔
2446
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
48,032✔
2447
        if (!good_client_version) {
48,032✔
2448
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2449
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2450
                                 "(%1, %2, %3)",
×
2451
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2452
                                 progress.download.last_integrated_client_version)};
×
2453
        }
×
2454
        last_integrated_client_version = changeset.last_integrated_local_version;
48,032✔
2455
        // Server shouldn't send our own changes, and zero is not a valid client
2456
        // file identifier.
2457
        bool good_file_ident =
48,032✔
2458
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
48,032✔
2459
        if (!good_file_ident) {
48,032✔
2460
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2461
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2462
                                 changeset.origin_file_ident)};
×
2463
        }
×
2464
    }
48,032✔
2465

2466
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
49,018✔
2467
                                       message.batch_state, message.changesets.size());
49,018✔
2468
    if (hook_action == SyncClientHookAction::EarlyReturn) {
49,018✔
2469
        return Status::OK();
16✔
2470
    }
16✔
2471
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
49,002✔
2472

2473
    if (process_flx_bootstrap_message(message)) {
49,002✔
2474
        clear_resumption_delay_state();
3,942✔
2475
        return Status::OK();
3,942✔
2476
    }
3,942✔
2477

2478
    initiate_integrate_changesets(message.downloadable.as_bytes(), message.batch_state, progress,
45,060✔
2479
                                  message.changesets); // Throws
45,060✔
2480

2481
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
45,060✔
2482
                                  message.batch_state, message.changesets.size());
45,060✔
2483
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,060✔
2484
        return Status::OK();
×
2485
    }
×
2486
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,060✔
2487

2488
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2489
    // after a retryable session error.
2490
    clear_resumption_delay_state();
45,060✔
2491
    return Status::OK();
45,060✔
2492
}
45,060✔
2493

2494
Status Session::receive_mark_message(request_ident_type request_ident)
2495
{
17,000✔
2496
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,000✔
2497

2498
    // Ignore the message if the deactivation process has been initiated,
2499
    // because in that case, the associated Realm and SessionWrapper must
2500
    // not be accessed any longer.
2501
    if (m_state != Active)
17,000✔
2502
        return Status::OK(); // Success
54✔
2503

2504
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,948✔
2505
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,946✔
2506
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
8✔
2507
    }
8✔
2508
    bool good_request_ident =
16,938✔
2509
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,940✔
2510
    if (REALM_UNLIKELY(!good_request_ident)) {
16,938✔
2511
        return {
×
2512
            ErrorCodes::SyncProtocolInvariantFailed,
×
2513
            util::format(
×
2514
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2515
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2516
    }
×
2517

2518
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,938✔
2519
    m_last_download_mark_received = request_ident;
16,938✔
2520
    check_for_download_completion(); // Throws
16,938✔
2521

2522
    return Status::OK(); // Success
16,938✔
2523
}
16,938✔
2524

2525

2526
// The caller (Connection) must discard the session if the session has become
2527
// deactivated upon return.
2528
Status Session::receive_unbound_message()
2529
{
3,960✔
2530
    logger.debug("Received: UNBOUND");
3,960✔
2531

2532
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,960✔
2533
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,960✔
2534
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2535
    }
×
2536

2537
    // The fact that the UNBIND message has been sent, but an ERROR message has
2538
    // not been received, implies that the deactivation process must have been
2539
    // initiated, so this session must be in the Deactivating state or the session
2540
    // has been suspended because of a client side error.
2541
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,960!
2542

2543
    m_unbound_message_received = true;
3,960✔
2544

2545
    // Detect completion of the unbinding process
2546
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,960✔
2547
        // The deactivation process completes when the unbinding process
2548
        // completes.
2549
        complete_deactivation(); // Throws
3,960✔
2550
        // Life cycle state is now Deactivated
2551
    }
3,960✔
2552

2553
    return Status::OK(); // Success
3,960✔
2554
}
3,960✔
2555

2556

2557
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2558
{
20✔
2559
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2560
    // Ignore the message if the deactivation process has been initiated,
2561
    // because in that case, the associated Realm and SessionWrapper must
2562
    // not be accessed any longer.
2563
    if (m_state == Active) {
20✔
2564
        on_flx_sync_error(query_version, message); // throws
20✔
2565
    }
20✔
2566
    return Status::OK();
20✔
2567
}
20✔
2568

2569
// The caller (Connection) must discard the session if the session has become
2570
// deactivated upon return.
2571
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2572
{
886✔
2573
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
886✔
2574
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
886✔
2575

2576
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
886✔
2577
    if (REALM_UNLIKELY(!legal_at_this_time)) {
886✔
2578
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2579
    }
×
2580

2581
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
886✔
2582
    auto status = protocol_error_to_status(protocol_error, info.message);
886✔
2583
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
886✔
2584
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2585
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2586
                             info.raw_error_code)};
×
2587
    }
×
2588

2589
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2590
    // the SessionWrapper may not be available
2591
    if (m_state == Active) {
886✔
2592
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, &info);
874✔
2593
        if (debug_action == SyncClientHookAction::EarlyReturn) {
874✔
2594
            return Status::OK();
8✔
2595
        }
8✔
2596
    }
874✔
2597

2598
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2599
    // containing the compensating write has appeared in a download message.
2600
    if (status == ErrorCodes::SyncCompensatingWrite) {
878✔
2601
        // If the client is not active, the compensating writes will not be processed now, but will be
2602
        // sent again the next time the client connects
2603
        if (m_state == Active) {
48✔
2604
            REALM_ASSERT(info.compensating_write_server_version.has_value());
48✔
2605
            m_pending_compensating_write_errors.push_back(info);
48✔
2606
        }
48✔
2607
        return Status::OK();
48✔
2608
    }
48✔
2609

2610
    if (protocol_error == ProtocolError::schema_version_changed) {
830✔
2611
        // Enable upload immediately if the session is still active.
2612
        if (m_state == Active) {
68✔
2613
            auto wt = get_db()->start_write();
68✔
2614
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
68✔
2615
            wt->commit();
68✔
2616
            // Notify SyncSession a schema migration is required.
2617
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
68✔
2618
        }
68✔
2619
        // Keep the session active to upload any unsynced changes.
2620
        return Status::OK();
68✔
2621
    }
68✔
2622

2623
    m_error_message_received = true;
762✔
2624
    suspend(SessionErrorInfo{info, std::move(status)});
762✔
2625
    return Status::OK();
762✔
2626
}
830✔
2627

2628
void Session::suspend(const SessionErrorInfo& info)
2629
{
842✔
2630
    REALM_ASSERT(!m_suspended);
842✔
2631
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
842✔
2632
    logger.debug("Suspended"); // Throws
842✔
2633

2634
    m_suspended = true;
842✔
2635

2636
    // Detect completion of the unbinding process
2637
    if (m_unbind_message_send_complete && m_error_message_received) {
842✔
2638
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2639
        // we received an ERROR message implies that the deactivation process must
2640
        // have been initiated, so this session must be in the Deactivating state.
2641
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
12✔
2642

2643
        // The deactivation process completes when the unbinding process
2644
        // completes.
2645
        complete_deactivation(); // Throws
12✔
2646
        // Life cycle state is now Deactivated
2647
    }
12✔
2648

2649
    // Notify the application of the suspension of the session if the session is
2650
    // still in the Active state
2651
    if (m_state == Active) {
842✔
2652
        call_debug_hook(SyncClientHookEvent::SessionSuspended, &info);
830✔
2653
        m_conn.one_less_active_unsuspended_session(); // Throws
830✔
2654
        on_suspended(info);                           // Throws
830✔
2655
    }
830✔
2656

2657
    if (!info.is_fatal) {
842✔
2658
        begin_resumption_delay(info);
176✔
2659
    }
176✔
2660

2661
    // Ready to send the UNBIND message, if it has not been sent already
2662
    if (!m_unbind_message_sent)
842✔
2663
        ensure_enlisted_to_send(); // Throws
830✔
2664
}
842✔
2665

2666
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2667
{
60✔
2668
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
60✔
2669
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2670
                           [&](const PendingTestCommand& command) {
60✔
2671
                               return command.id == ident;
60✔
2672
                           });
60✔
2673
    if (it == m_pending_test_commands.end()) {
60✔
2674
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2675
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2676
    }
×
2677

2678
    it->promise.emplace_value(std::string{body});
60✔
2679
    m_pending_test_commands.erase(it);
60✔
2680

2681
    return Status::OK();
60✔
2682
}
60✔
2683

2684
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2685
{
176✔
2686
    REALM_ASSERT(!m_try_again_activation_timer);
176✔
2687

2688
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
176✔
2689
                                  error_info.resumption_delay_interval);
176✔
2690
    auto try_again_interval = m_try_again_delay_info.delay_interval();
176✔
2691
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
176✔
2692
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2693
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2694
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2695
        try_again_interval = std::chrono::milliseconds{1000};
144✔
2696
    }
144✔
2697
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
176✔
2698
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
176✔
2699
        if (status == ErrorCodes::OperationAborted)
176✔
2700
            return;
28✔
2701
        else if (!status.is_ok())
148✔
2702
            throw Exception(status);
×
2703

2704
        m_try_again_activation_timer.reset();
148✔
2705
        cancel_resumption_delay();
148✔
2706
    });
148✔
2707
}
176✔
2708

2709
void Session::clear_resumption_delay_state()
2710
{
49,004✔
2711
    if (m_try_again_activation_timer) {
49,004✔
2712
        logger.debug("Clearing resumption delay state after successful download");
×
2713
        m_try_again_delay_info.reset();
×
2714
    }
×
2715
}
49,004✔
2716

2717
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2718
{
49,022✔
2719
    const SyncProgress& a = m_progress;
49,022✔
2720
    const SyncProgress& b = progress;
49,022✔
2721
    std::string message;
49,022✔
2722
    if (b.latest_server_version.version < a.latest_server_version.version) {
49,022✔
2723
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2724
                               "session (current: %1, received: %2)",
×
2725
                               a.latest_server_version.version, b.latest_server_version.version);
×
2726
    }
×
2727
    if (b.upload.client_version < a.upload.client_version) {
49,022✔
2728
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2729
                               "throughout a session (current: %1, received: %2)",
×
2730
                               a.upload.client_version, b.upload.client_version);
×
2731
    }
×
2732
    if (b.upload.client_version > m_last_version_available) {
49,022✔
2733
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2734
                               "version in existence (current: %1, received: %2)",
×
2735
                               m_last_version_available, b.upload.client_version);
×
2736
    }
×
2737
    if (b.download.server_version < a.download.server_version) {
49,022✔
2738
        message =
×
2739
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2740
                         a.download.server_version, b.download.server_version);
×
2741
    }
×
2742
    if (b.download.server_version > b.latest_server_version.version) {
49,022✔
2743
        message = util::format(
×
2744
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2745
            b.download.server_version, b.latest_server_version.version);
×
2746
    }
×
2747
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
49,022✔
2748
        message = util::format(
×
2749
            "Last integrated client version on the server at the position in the server's history of the download "
×
2750
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2751
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2752
    }
×
2753
    if (b.download.last_integrated_client_version > b.upload.client_version) {
49,022✔
2754
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2755
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2756
                               "on the server (download: %1, upload: %2)",
×
2757
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2758
    }
×
2759
    if (b.download.server_version < b.upload.last_integrated_server_version) {
49,022✔
2760
        message = util::format(
×
2761
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2762
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2763
            b.download.server_version, b.upload.last_integrated_server_version);
×
2764
    }
×
2765

2766
    if (message.empty()) {
49,022✔
2767
        return Status::OK();
49,022✔
2768
    }
49,022✔
UNCOV
2769
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
2770
}
49,022✔
2771

2772

2773
void Session::check_for_download_completion()
2774
{
64,362✔
2775
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
64,362✔
2776
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
64,362✔
2777
    if (m_last_download_mark_received == m_last_triggering_download_mark)
64,362✔
2778
        return;
47,184✔
2779
    if (m_last_download_mark_received < m_target_download_mark)
17,178✔
2780
        return;
348✔
2781
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,830✔
2782
        return;
×
2783
    m_last_triggering_download_mark = m_target_download_mark;
16,830✔
2784
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,830✔
2785
        // Activate the upload process now, and enable immediate reactivation
2786
        // after a subsequent fast reconnect.
2787
        m_delay_uploads = false;
4,756✔
2788
        ensure_enlisted_to_send(); // Throws
4,756✔
2789
    }
4,756✔
2790
    on_download_completion(); // Throws
16,830✔
2791
}
16,830✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc