• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2291

02 May 2024 08:09PM UTC coverage: 90.759% (+0.01%) from 90.747%
2291

push

Evergreen

web-flow
Fix a deadlock when accessing current user from inside an App listener (#7671)

App::switch_user() emitted changes without first releasing the lock on
m_user_mutex, leading to a deadlock if anyone inside the listener tried to
acquire the mutex. The rest of the places where we emitted changes were
correct.

The newly added wrapper catches this error when building with clang.

101952 of 180246 branches covered (56.56%)

14 of 17 new or added lines in 2 files covered. (82.35%)

59 existing lines in 14 files now uncovered.

212566 of 234210 relevant lines covered (90.76%)

5856712.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.69
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,976✔
51
    m_backoff_state.reset();
1,976✔
52
    scheduled_reset = false;
1,976✔
53
}
1,976✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,754✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,754✔
60
}
3,754✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
6,092✔
65
    if (scheduled_reset) {
6,092✔
66
        reset();
4✔
67
    }
4✔
68

69
    if (!m_backoff_state.triggering_error) {
6,092✔
70
        return std::chrono::milliseconds::zero();
4,642✔
71
    }
4,642✔
72

73
    switch (*m_backoff_state.triggering_error) {
1,450✔
74
        case ConnectionTerminationReason::closed_voluntarily:
84✔
75
            return std::chrono::milliseconds::zero();
84✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,348✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,348✔
80
                return std::chrono::milliseconds::max();
1,014✔
81
            }
1,014✔
82

83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
334✔
84
            return m_backoff_state.delay_interval();
334✔
85
    }
1,450✔
86
}
1,450✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
4,048✔
92
    util::Uri uri(url); // Throws
4,048✔
93
    uri.canonicalize(); // Throws
4,048✔
94
    std::string userinfo, address_2, port_2;
4,048✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,048✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,048✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,048✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,048✔
99
    if (REALM_UNLIKELY(!good))
4,048✔
100
        return false;
×
101
    ProtocolEnvelope protocol_2;
4,048✔
102
    port_type port_3;
4,048✔
103
    if (realm_scheme) {
4,048✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
4,048✔
114
        REALM_ASSERT(ws_scheme);
4,048✔
115
        if (uri.get_scheme() == "ws:") {
4,048✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,968✔
117
            port_3 = 80;
3,968✔
118
        }
3,968✔
119
        else {
80✔
120
            protocol_2 = ProtocolEnvelope::wss;
80✔
121
            port_3 = 443;
80✔
122
        }
80✔
123
    }
4,048✔
124
    if (!port_2.empty()) {
4,048✔
125
        std::istringstream in(port_2);    // Throws
3,944✔
126
        in.imbue(std::locale::classic()); // Throws
3,944✔
127
        in >> port_3;
3,944✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,944✔
129
            return false;
×
130
    }
3,944✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
4,048✔
132

133
    protocol = protocol_2;
4,048✔
134
    address = std::move(address_2);
4,048✔
135
    port = port_3;
4,048✔
136
    path = std::move(path_2);
4,048✔
137
    return true;
4,048✔
138
}
4,048✔
139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
4,790✔
142
    , logger{*logger_ptr}
4,790✔
143
    , m_reconnect_mode{config.reconnect_mode}
4,790✔
144
    , m_connect_timeout{config.connect_timeout}
4,790✔
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,790✔
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,790✔
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,790✔
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,790✔
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,790✔
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,790✔
151
    , m_dry_run{config.dry_run}
4,790✔
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,790✔
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
4,790✔
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,790✔
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,790✔
156
    , m_socket_provider{std::move(config.socket_provider)}
4,790✔
157
    , m_client_protocol{} // Throws
4,790✔
158
    , m_one_connection_per_session{config.one_connection_per_session}
4,790✔
159
    , m_random{}
4,790✔
160
{
9,718✔
161
    // FIXME: Would be better if seeding was up to the application.
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,718✔
163

164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,718✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,718✔
166
                 get_current_protocol_version()); // Throws
9,718✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,718✔
168
    const char* build_mode;
9,718✔
169
#if REALM_DEBUG
9,718✔
170
    build_mode = "Debug";
9,718✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,718✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,718✔
176
                 config.one_connection_per_session); // Throws
9,718✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,718✔
178
                 config.connect_timeout); // Throws
9,718✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,718✔
180
                 config.connection_linger_time); // Throws
9,718✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,718✔
182
                 config.ping_keepalive_period); // Throws
9,718✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,718✔
184
                 config.pong_keepalive_timeout); // Throws
9,718✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,718✔
186
                 config.fast_reconnect_limit); // Throws
9,718✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,718✔
188
                 config.disable_upload_compaction); // Throws
9,718✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,718✔
190
                 config.disable_sync_to_disk); // Throws
9,718✔
191
    logger.debug(
9,718✔
192
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,718✔
193
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,718✔
194
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,718✔
195
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,718✔
196

197
    if (config.reconnect_mode != ReconnectMode::normal) {
9,718✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
780✔
199
                    "never do this in production!");
780✔
200
    }
780✔
201

202
    if (config.dry_run) {
9,718✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
×
205
    }
×
206

207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,718✔
208

209
    if (m_one_connection_per_session) {
9,718✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
4✔
212
    }
4✔
213

214
    if (config.disable_upload_activation_delay) {
9,718✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

219
    if (config.disable_sync_to_disk) {
9,718✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
×
222
    }
×
223

224
    m_actualize_and_finalize = create_trigger([this](Status status) {
15,286✔
225
        if (status == ErrorCodes::OperationAborted)
15,286✔
226
            return;
×
227
        else if (!status.is_ok())
15,286✔
228
            throw Exception(status);
×
229
        actualize_and_finalize_session_wrappers(); // Throws
15,286✔
230
    });
15,286✔
231
}
9,718✔
232

233

234
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
235
{
181,730✔
236
    REALM_ASSERT(m_socket_provider);
181,730✔
237
    {
181,730✔
238
        std::lock_guard lock(m_drain_mutex);
181,730✔
239
        ++m_outstanding_posts;
181,730✔
240
        m_drained = false;
181,730✔
241
    }
181,730✔
242
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
181,730✔
243
        auto decr_guard = util::make_scope_exit([&]() noexcept {
181,706✔
244
            std::lock_guard lock(m_drain_mutex);
181,702✔
245
            REALM_ASSERT(m_outstanding_posts);
181,702✔
246
            --m_outstanding_posts;
181,702✔
247
            m_drain_cv.notify_all();
181,702✔
248
        });
181,702✔
249
        handler(status);
181,706✔
250
    });
181,706✔
251
}
181,730✔
252

253

254
void ClientImpl::drain_connections()
255
{
9,706✔
256
    logger.debug("Draining connections during sync client shutdown");
9,706✔
257
    for (auto& server_slot_pair : m_server_slots) {
9,706✔
258
        auto& server_slot = server_slot_pair.second;
2,642✔
259

260
        if (server_slot.connection) {
2,642✔
261
            auto& conn = server_slot.connection;
2,418✔
262
            conn->force_close();
2,418✔
263
        }
2,418✔
264
        else {
224✔
265
            for (auto& conn_pair : server_slot.alt_connections) {
224✔
UNCOV
266
                conn_pair.second->force_close();
×
UNCOV
267
            }
×
268
        }
224✔
269
    }
2,642✔
270
}
9,706✔
271

272

273
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
274
                                                       SyncSocketProvider::FunctionHandler&& handler)
275
{
17,436✔
276
    REALM_ASSERT(m_socket_provider);
17,436✔
277
    {
17,436✔
278
        std::lock_guard lock(m_drain_mutex);
17,436✔
279
        ++m_outstanding_posts;
17,436✔
280
        m_drained = false;
17,436✔
281
    }
17,436✔
282
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,436✔
283
        handler(status);
17,422✔
284

285
        std::lock_guard lock(m_drain_mutex);
17,422✔
286
        REALM_ASSERT(m_outstanding_posts);
17,422✔
287
        --m_outstanding_posts;
17,422✔
288
        m_drain_cv.notify_all();
17,422✔
289
    });
17,422✔
290
}
17,436✔
291

292

293
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
294
{
12,482✔
295
    REALM_ASSERT(m_socket_provider);
12,482✔
296
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,482✔
297
}
12,482✔
298

299
Connection::~Connection()
300
{
2,752✔
301
    if (m_websocket_sentinel) {
2,752✔
302
        m_websocket_sentinel->destroyed = true;
×
303
        m_websocket_sentinel.reset();
×
304
    }
×
305
}
2,752✔
306

307
void Connection::activate()
308
{
2,764✔
309
    REALM_ASSERT(m_on_idle);
2,764✔
310
    m_activated = true;
2,764✔
311
    if (m_num_active_sessions == 0)
2,764✔
312
        m_on_idle->trigger();
×
313
    // We cannot in general connect immediately, because a prior failure to
314
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
315
    initiate_reconnect_wait(); // Throws
2,764✔
316
}
2,764✔
317

318

319
void Connection::activate_session(std::unique_ptr<Session> sess)
320
{
10,318✔
321
    REALM_ASSERT(sess);
10,318✔
322
    REALM_ASSERT(&sess->m_conn == this);
10,318✔
323
    REALM_ASSERT(!m_force_closed);
10,318✔
324
    Session& sess_2 = *sess;
10,318✔
325
    session_ident_type ident = sess->m_ident;
10,318✔
326
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,318✔
327
    bool was_inserted = p.second;
10,318✔
328
    REALM_ASSERT(was_inserted);
10,318✔
329
    // Save the session ident to the historical list of session idents
330
    m_session_history.insert(ident);
10,318✔
331
    sess_2.activate(); // Throws
10,318✔
332
    if (m_state == ConnectionState::connected) {
10,318✔
333
        bool fast_reconnect = false;
7,102✔
334
        sess_2.connection_established(fast_reconnect); // Throws
7,102✔
335
    }
7,102✔
336
    ++m_num_active_sessions;
10,318✔
337
}
10,318✔
338

339

340
void Connection::initiate_session_deactivation(Session* sess)
341
{
10,306✔
342
    REALM_ASSERT(sess);
10,306✔
343
    REALM_ASSERT(&sess->m_conn == this);
10,306✔
344
    REALM_ASSERT(m_num_active_sessions);
10,306✔
345
    // Since the client may be waiting for m_num_active_sessions to reach 0
346
    // in stop_and_wait() (on a separate thread), deactivate Session before
347
    // decrementing the num active sessions value.
348
    sess->initiate_deactivation(); // Throws
10,306✔
349
    if (sess->m_state == Session::Deactivated) {
10,306✔
350
        finish_session_deactivation(sess);
1,100✔
351
    }
1,100✔
352
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,306✔
353
        if (m_activated && m_state == ConnectionState::disconnected)
4,612✔
354
            m_on_idle->trigger();
376✔
355
    }
4,612✔
356
}
10,306✔
357

358

359
void Connection::cancel_reconnect_delay()
360
{
2,188✔
361
    REALM_ASSERT(m_activated);
2,188✔
362

363
    if (m_reconnect_delay_in_progress) {
2,188✔
364
        if (m_nonzero_reconnect_delay)
1,968✔
365
            logger.detail("Canceling reconnect delay"); // Throws
980✔
366

367
        // Cancel the in-progress wait operation by destroying the timer
368
        // object. Destruction is needed in this case, because a new wait
369
        // operation might have to be initiated before the previous one
370
        // completes (its completion handler starts to execute), so the new wait
371
        // operation must be done on a new timer object.
372
        m_reconnect_disconnect_timer.reset();
1,968✔
373
        m_reconnect_delay_in_progress = false;
1,968✔
374
        m_reconnect_info.reset();
1,968✔
375
        initiate_reconnect_wait(); // Throws
1,968✔
376
        return;
1,968✔
377
    }
1,968✔
378

379
    // If we are not disconnected, then we need to make sure the next time we get disconnected
380
    // that we are allowed to re-connect as quickly as possible.
381
    //
382
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
383
    // backoff/delay state before calculating the next delay, unless a PONG message is received
384
    // for the urgent PING message we send below.
385
    //
386
    // If we get a PONG message for the urgent PING message sent below, then the connection is
387
    // healthy and we can calculate the next delay normally.
388
    if (m_state != ConnectionState::disconnected) {
220✔
389
        m_reconnect_info.scheduled_reset = true;
220✔
390
        m_ping_after_scheduled_reset_of_reconnect_info = false;
220✔
391

392
        schedule_urgent_ping(); // Throws
220✔
393
        return;
220✔
394
    }
220✔
395
    // Nothing to do in this case. The next reconnect attemp will be made as
396
    // soon as there are any sessions that are both active and unsuspended.
397
}
220✔
398

399
void Connection::finish_session_deactivation(Session* sess)
400
{
8,314✔
401
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,314✔
402
    auto ident = sess->m_ident;
8,314✔
403
    m_sessions.erase(ident);
8,314✔
404
    m_session_history.erase(ident);
8,314✔
405
}
8,314✔
406

407
void Connection::force_close()
408
{
2,418✔
409
    if (m_force_closed) {
2,418✔
410
        return;
×
411
    }
×
412

413
    m_force_closed = true;
2,418✔
414

415
    if (m_state != ConnectionState::disconnected) {
2,418✔
416
        voluntary_disconnect();
2,382✔
417
    }
2,382✔
418

419
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,418✔
420
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,418✔
421
        m_reconnect_disconnect_timer.reset();
36✔
422
        m_reconnect_delay_in_progress = false;
36✔
423
        m_disconnect_delay_in_progress = false;
36✔
424
    }
36✔
425

426
    // We must copy any session pointers we want to close to a vector because force_closing
427
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
428
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
429
    std::vector<Session*> to_close;
2,418✔
430
    for (auto& session_pair : m_sessions) {
2,418✔
431
        if (session_pair.second->m_state == Session::State::Active) {
100✔
432
            to_close.push_back(session_pair.second.get());
100✔
433
        }
100✔
434
    }
100✔
435

436
    for (auto& sess : to_close) {
2,418✔
437
        sess->force_close();
100✔
438
    }
100✔
439

440
    logger.debug("Force closed idle connection");
2,418✔
441
}
2,418✔
442

443

444
void Connection::websocket_connected_handler(const std::string& protocol)
445
{
3,550✔
446
    if (!protocol.empty()) {
3,550✔
447
        std::string_view expected_prefix =
3,550✔
448
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,550✔
449
        // FIXME: Use std::string_view::begins_with() in C++20.
450
        auto prefix_matches = [&](std::string_view other) {
3,550✔
451
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,550✔
452
        };
3,550✔
453
        if (prefix_matches(expected_prefix)) {
3,550✔
454
            util::MemoryInputStream in;
3,550✔
455
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,550✔
456
            in.imbue(std::locale::classic());
3,550✔
457
            in.unsetf(std::ios_base::skipws);
3,550✔
458
            int value_2 = 0;
3,550✔
459
            in >> value_2;
3,550✔
460
            if (in && in.eof() && value_2 >= 0) {
3,550✔
461
                bool good_version =
3,550✔
462
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,550✔
463
                if (good_version) {
3,550✔
464
                    logger.detail("Negotiated protocol version: %1", value_2);
3,550✔
465
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
466
                    // will provide the appservices connection ID via a log message.
467
                    // TODO: Remove once the server starts sending the connection ID
468
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,550✔
469
                    m_negotiated_protocol_version = value_2;
3,550✔
470
                    handle_connection_established(); // Throws
3,550✔
471
                    return;
3,550✔
472
                }
3,550✔
473
            }
3,550✔
474
        }
3,550✔
475
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
476
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
477
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
478
    }
×
479
    else {
×
480
        close_due_to_client_side_error(
×
481
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
482
            ConnectionTerminationReason::bad_headers_in_http_response);
×
483
    }
×
484
}
3,550✔
485

486

487
bool Connection::websocket_binary_message_received(util::Span<const char> data)
488
{
79,950✔
489
    if (m_force_closed) {
79,950✔
490
        logger.debug("Received binary message after connection was force closed");
×
491
        return false;
×
492
    }
×
493

494
    using sf = SimulatedFailure;
79,950✔
495
    if (sf::check_trigger(sf::sync_client__read_head)) {
79,950✔
496
        close_due_to_client_side_error(
440✔
497
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
440✔
498
            ConnectionTerminationReason::read_or_write_error);
440✔
499
        return bool(m_websocket);
440✔
500
    }
440✔
501

502
    handle_message_received(data);
79,510✔
503
    return bool(m_websocket);
79,510✔
504
}
79,950✔
505

506

507
void Connection::websocket_error_handler()
508
{
736✔
509
    m_websocket_error_received = true;
736✔
510
}
736✔
511

512
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
513
{
840✔
514
    if (m_force_closed) {
840✔
515
        logger.debug("Received websocket close message after connection was force closed");
×
516
        return false;
×
517
    }
×
518
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
840✔
519

520
    switch (error_code) {
840✔
521
        case WebSocketError::websocket_ok:
56✔
522
            break;
56✔
523
        case WebSocketError::websocket_resolve_failed:
4✔
524
            [[fallthrough]];
4✔
525
        case WebSocketError::websocket_connection_failed: {
112✔
526
            SessionErrorInfo error_info(
112✔
527
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
528
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
529
            // to make sure the websocket URL is correct
530
            if (!m_server_endpoint.is_verified) {
112✔
531
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
532
            }
84✔
533
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
534
            break;
112✔
535
        }
4✔
536
        case WebSocketError::websocket_read_error:
602✔
537
            [[fallthrough]];
602✔
538
        case WebSocketError::websocket_write_error: {
602✔
539
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
602✔
540
                                         ConnectionTerminationReason::read_or_write_error);
602✔
541
            break;
602✔
542
        }
602✔
543
        case WebSocketError::websocket_going_away:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_protocol_error:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_unsupported_data:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_invalid_payload_data:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_policy_violation:
✔
552
            [[fallthrough]];
×
553
        case WebSocketError::websocket_reserved:
✔
554
            [[fallthrough]];
×
555
        case WebSocketError::websocket_no_status_received:
✔
556
            [[fallthrough]];
×
557
        case WebSocketError::websocket_invalid_extension: {
✔
558
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
559
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
560
            break;
×
561
        }
×
562
        case WebSocketError::websocket_message_too_big: {
4✔
563
            auto message = util::format(
4✔
564
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
565
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
566
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
567
            involuntary_disconnect(std::move(error_info),
4✔
568
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
569
            break;
4✔
570
        }
×
571
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
572
            close_due_to_client_side_error(
10✔
573
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
574
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
575
            break;
10✔
576
        }
×
577
        case WebSocketError::websocket_client_too_old:
✔
578
            [[fallthrough]];
×
579
        case WebSocketError::websocket_client_too_new:
✔
580
            [[fallthrough]];
×
581
        case WebSocketError::websocket_protocol_mismatch: {
✔
582
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
583
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
584
            break;
×
585
        }
×
586
        case WebSocketError::websocket_fatal_error: {
✔
587
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
588
            // been verified, then use a non-fatal error and try to perform a location update.
589
            SessionErrorInfo error_info(
×
590
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
591
                IsFatal{m_server_endpoint.is_verified});
×
592
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
593
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
594
            // to make sure the websocket URL is correct
595
            if (!m_server_endpoint.is_verified) {
×
596
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
597
                reason = ConnectionTerminationReason::connect_operation_failed;
×
598
            }
×
599
            involuntary_disconnect(std::move(error_info), reason);
×
600
            break;
×
601
        }
×
602
        case WebSocketError::websocket_forbidden: {
✔
603
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
605
            involuntary_disconnect(std::move(error_info),
×
606
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
607
            break;
×
608
        }
×
609
        case WebSocketError::websocket_unauthorized: {
44✔
610
            SessionErrorInfo error_info(
44✔
611
                {ErrorCodes::AuthError,
44✔
612
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
613
                IsFatal{false});
44✔
614
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
615
            involuntary_disconnect(std::move(error_info),
44✔
616
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
617
            break;
44✔
618
        }
×
619
        case WebSocketError::websocket_moved_permanently: {
12✔
620
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
621
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
622
            involuntary_disconnect(std::move(error_info),
12✔
623
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
624
            break;
12✔
625
        }
×
626
        case WebSocketError::websocket_abnormal_closure: {
✔
627
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
628
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
629
            involuntary_disconnect(std::move(error_info),
×
630
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
631
            break;
×
632
        }
×
633
        case WebSocketError::websocket_internal_server_error:
✔
634
            [[fallthrough]];
×
635
        case WebSocketError::websocket_retry_error: {
✔
636
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
637
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
638
            break;
×
639
        }
×
640
    }
840✔
641

642
    return bool(m_websocket);
828✔
643
}
840✔
644

645
// Guarantees that handle_reconnect_wait() is never called from within the
646
// execution of initiate_reconnect_wait() (no callback reentrance).
647
void Connection::initiate_reconnect_wait()
648
{
8,472✔
649
    REALM_ASSERT(m_activated);
8,472✔
650
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,472✔
651
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,472✔
652

653
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
654
    if (m_force_closed) {
8,472✔
655
        return;
2,382✔
656
    }
2,382✔
657

658
    m_reconnect_delay_in_progress = true;
6,090✔
659
    auto delay = m_reconnect_info.delay_interval();
6,090✔
660
    if (delay == std::chrono::milliseconds::max()) {
6,090✔
661
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,032✔
662
        // Not actually starting a timer corresponds to an infinite wait
663
        m_nonzero_reconnect_delay = true;
1,032✔
664
        return;
1,032✔
665
    }
1,032✔
666

667
    if (delay == std::chrono::milliseconds::zero()) {
5,058✔
668
        m_nonzero_reconnect_delay = false;
4,726✔
669
    }
4,726✔
670
    else {
332✔
671
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
332✔
672
        m_nonzero_reconnect_delay = true;
332✔
673
    }
332✔
674

675
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
676
    // we need it to be cancelable in case the connection is terminated before the timer
677
    // callback is run.
678
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
5,060✔
679
        // If the operation is aborted, the connection object may have been
680
        // destroyed.
681
        if (status != ErrorCodes::OperationAborted)
5,060✔
682
            handle_reconnect_wait(status); // Throws
3,754✔
683
    });                                    // Throws
5,060✔
684
}
5,058✔
685

686

687
void Connection::handle_reconnect_wait(Status status)
688
{
3,754✔
689
    if (!status.is_ok()) {
3,754✔
690
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
691
        throw Exception(status);
×
692
    }
×
693

694
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,754✔
695
    m_reconnect_delay_in_progress = false;
3,754✔
696

697
    if (m_num_active_unsuspended_sessions > 0)
3,754✔
698
        initiate_reconnect(); // Throws
3,746✔
699
}
3,754✔
700

701
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
702
    explicit WebSocketObserverShim(Connection* conn)
703
        : conn(conn)
1,776✔
704
        , sentinel(conn->m_websocket_sentinel)
1,776✔
705
    {
3,754✔
706
    }
3,754✔
707

708
    Connection* conn;
709
    util::bind_ptr<LifecycleSentinel> sentinel;
710

711
    void websocket_connected_handler(const std::string& protocol) override
712
    {
3,550✔
713
        if (sentinel->destroyed) {
3,550✔
714
            return;
×
715
        }
×
716

717
        return conn->websocket_connected_handler(protocol);
3,550✔
718
    }
3,550✔
719

720
    void websocket_error_handler() override
721
    {
736✔
722
        if (sentinel->destroyed) {
736✔
723
            return;
×
724
        }
×
725

726
        conn->websocket_error_handler();
736✔
727
    }
736✔
728

729
    bool websocket_binary_message_received(util::Span<const char> data) override
730
    {
79,952✔
731
        if (sentinel->destroyed) {
79,952✔
732
            return false;
×
733
        }
×
734

735
        return conn->websocket_binary_message_received(data);
79,952✔
736
    }
79,952✔
737

738
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
739
    {
840✔
740
        if (sentinel->destroyed) {
840✔
741
            return true;
×
742
        }
×
743

744
        return conn->websocket_closed_handler(was_clean, error_code, msg);
840✔
745
    }
840✔
746
};
747

748
void Connection::initiate_reconnect()
749
{
3,754✔
750
    REALM_ASSERT(m_activated);
3,754✔
751

752
    m_state = ConnectionState::connecting;
3,754✔
753
    report_connection_state_change(ConnectionState::connecting); // Throws
3,754✔
754
    if (m_websocket_sentinel) {
3,754✔
755
        m_websocket_sentinel->destroyed = true;
×
756
    }
×
757
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,754✔
758
    m_websocket.reset();
3,754✔
759

760
    // Watchdog
761
    initiate_connect_wait(); // Throws
3,754✔
762

763
    std::vector<std::string> sec_websocket_protocol;
3,754✔
764
    {
3,754✔
765
        auto protocol_prefix =
3,754✔
766
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,754✔
767
        int min = get_oldest_supported_protocol_version();
3,754✔
768
        int max = get_current_protocol_version();
3,754✔
769
        REALM_ASSERT_3(min, <=, max);
3,754✔
770
        // List protocol version in descending order to ensure that the server
771
        // selects the highest possible version.
772
        for (int version = max; version >= min; --version) {
45,042✔
773
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
41,288✔
774
        }
41,288✔
775
    }
3,754✔
776

777
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,754✔
778
                m_server_endpoint.port, m_http_request_path_prefix);
3,754✔
779

780
    m_websocket_error_received = false;
3,754✔
781
    m_websocket =
3,754✔
782
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,754✔
783
                                            WebSocketEndpoint{
3,754✔
784
                                                m_server_endpoint.address,
3,754✔
785
                                                m_server_endpoint.port,
3,754✔
786
                                                get_http_request_path(),
3,754✔
787
                                                std::move(sec_websocket_protocol),
3,754✔
788
                                                is_ssl(m_server_endpoint.envelope),
3,754✔
789
                                                /// DEPRECATED - The following will be removed in a future release
790
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,754✔
791
                                                m_verify_servers_ssl_certificate,
3,754✔
792
                                                m_ssl_trust_certificate_path,
3,754✔
793
                                                m_ssl_verify_callback,
3,754✔
794
                                                m_proxy_config,
3,754✔
795
                                            });
3,754✔
796
}
3,754✔
797

798

799
void Connection::initiate_connect_wait()
800
{
3,754✔
801
    // Deploy a watchdog to enforce an upper bound on the time it can take to
802
    // fully establish the connection (including SSL and WebSocket
803
    // handshakes). Without such a watchdog, connect operations could take very
804
    // long, or even indefinite time.
805
    milliseconds_type time = m_client.m_connect_timeout;
3,754✔
806

807
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,754✔
808
        // If the operation is aborted, the connection object may have been
809
        // destroyed.
810
        if (status != ErrorCodes::OperationAborted)
3,742✔
811
            handle_connect_wait(status); // Throws
×
812
    });                                  // Throws
3,742✔
813
}
3,754✔
814

815

816
void Connection::handle_connect_wait(Status status)
817
{
×
818
    if (!status.is_ok()) {
×
819
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
820
        throw Exception(status);
×
821
    }
×
822

823
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
824
    logger.info("Connect timeout"); // Throws
×
825
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
826
                                IsFatal{false});
×
827
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
828
    // to make sure the websocket URL is correct
829
    if (!m_server_endpoint.is_verified) {
×
830
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
831
    }
×
832
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
833
}
×
834

835

836
void Connection::handle_connection_established()
837
{
3,550✔
838
    // Cancel connect timeout watchdog
839
    m_connect_timer.reset();
3,550✔
840

841
    m_state = ConnectionState::connected;
3,550✔
842
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,550✔
843

844
    milliseconds_type now = monotonic_clock_now();
3,550✔
845
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,550✔
846
    initiate_ping_delay(now);     // Throws
3,550✔
847

848
    bool fast_reconnect = false;
3,550✔
849
    if (m_disconnect_has_occurred) {
3,550✔
850
        milliseconds_type time = now - m_disconnect_time;
1,054✔
851
        if (time <= m_client.m_fast_reconnect_limit)
1,054✔
852
            fast_reconnect = true;
1,054✔
853
    }
1,054✔
854

855
    for (auto& p : m_sessions) {
4,664✔
856
        Session& sess = *p.second;
4,664✔
857
        sess.connection_established(fast_reconnect); // Throws
4,664✔
858
    }
4,664✔
859

860
    report_connection_state_change(ConnectionState::connected); // Throws
3,550✔
861
}
3,550✔
862

863

864
void Connection::schedule_urgent_ping()
865
{
220✔
866
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
220✔
867
    if (m_ping_delay_in_progress) {
220✔
868
        m_heartbeat_timer.reset();
124✔
869
        m_ping_delay_in_progress = false;
124✔
870
        m_minimize_next_ping_delay = true;
124✔
871
        milliseconds_type now = monotonic_clock_now();
124✔
872
        initiate_ping_delay(now); // Throws
124✔
873
        return;
124✔
874
    }
124✔
875
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
96✔
876
    if (!m_send_ping)
96✔
877
        m_minimize_next_ping_delay = true;
96✔
878
}
96✔
879

880

881
void Connection::initiate_ping_delay(milliseconds_type now)
882
{
3,812✔
883
    REALM_ASSERT(!m_ping_delay_in_progress);
3,812✔
884
    REALM_ASSERT(!m_waiting_for_pong);
3,812✔
885
    REALM_ASSERT(!m_send_ping);
3,812✔
886

887
    milliseconds_type delay = 0;
3,812✔
888
    if (!m_minimize_next_ping_delay) {
3,812✔
889
        delay = m_client.m_ping_keepalive_period;
3,686✔
890
        // Make a randomized deduction of up to 10%, or up to 100% if this is
891
        // the first PING message to be sent since the connection was
892
        // established. The purpose of this randomized deduction is to reduce
893
        // the risk of many connections sending PING messages simultaneously to
894
        // the server.
895
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,686✔
896
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,686✔
897
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,686✔
898
        delay -= randomized_deduction;
3,686✔
899
        // Deduct the time spent waiting for PONG
900
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,686✔
901
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,686✔
902
        if (spent_time < delay) {
3,686✔
903
            delay -= spent_time;
3,678✔
904
        }
3,678✔
905
        else {
8✔
906
            delay = 0;
8✔
907
        }
8✔
908
    }
3,686✔
909
    else {
126✔
910
        m_minimize_next_ping_delay = false;
126✔
911
    }
126✔
912

913

914
    m_ping_delay_in_progress = true;
3,812✔
915

916
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,812✔
917
        if (status == ErrorCodes::OperationAborted)
3,812✔
918
            return;
3,654✔
919
        else if (!status.is_ok())
158✔
920
            throw Exception(status);
×
921

922
        handle_ping_delay();                                    // Throws
158✔
923
    });                                                         // Throws
158✔
924
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,812✔
925
}
3,812✔
926

927

928
void Connection::handle_ping_delay()
929
{
156✔
930
    REALM_ASSERT(m_ping_delay_in_progress);
156✔
931
    m_ping_delay_in_progress = false;
156✔
932
    m_send_ping = true;
156✔
933

934
    initiate_pong_timeout(); // Throws
156✔
935

936
    if (m_state == ConnectionState::connected && !m_sending)
156✔
937
        send_next_message(); // Throws
116✔
938
}
156✔
939

940

941
void Connection::initiate_pong_timeout()
942
{
156✔
943
    REALM_ASSERT(!m_ping_delay_in_progress);
156✔
944
    REALM_ASSERT(!m_waiting_for_pong);
156✔
945
    REALM_ASSERT(m_send_ping);
156✔
946

947
    m_waiting_for_pong = true;
156✔
948
    m_pong_wait_started_at = monotonic_clock_now();
156✔
949

950
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
156✔
951
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
156✔
952
        if (status == ErrorCodes::OperationAborted)
156✔
953
            return;
144✔
954
        else if (!status.is_ok())
12✔
955
            throw Exception(status);
×
956

957
        handle_pong_timeout(); // Throws
12✔
958
    });                        // Throws
12✔
959
}
156✔
960

961

962
void Connection::handle_pong_timeout()
963
{
12✔
964
    REALM_ASSERT(m_waiting_for_pong);
12✔
965
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
966
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
967
                                 ConnectionTerminationReason::pong_timeout);
12✔
968
}
12✔
969

970

971
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
972
{
100,740✔
973
    // Stop sending messages if an websocket error was received.
974
    if (m_websocket_error_received)
100,740✔
975
        return;
×
976

977
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
100,740✔
978
        if (sentinel->destroyed) {
100,670✔
979
            return;
1,468✔
980
        }
1,468✔
981
        if (!status.is_ok()) {
99,202✔
982
            if (status != ErrorCodes::Error::OperationAborted) {
×
983
                // Write errors will be handled by the websocket_write_error_handler() callback
984
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
985
            }
×
986
            return;
×
987
        }
×
988
        handle_write_message(); // Throws
99,202✔
989
    });                         // Throws
99,202✔
990
    m_sending_session = sess;
100,740✔
991
    m_sending = true;
100,740✔
992
}
100,740✔
993

994

995
void Connection::handle_write_message()
996
{
99,202✔
997
    m_sending_session->message_sent(); // Throws
99,202✔
998
    if (m_sending_session->m_state == Session::Deactivated) {
99,202✔
999
        finish_session_deactivation(m_sending_session);
126✔
1000
    }
126✔
1001
    m_sending_session = nullptr;
99,202✔
1002
    m_sending = false;
99,202✔
1003
    send_next_message(); // Throws
99,202✔
1004
}
99,202✔
1005

1006

1007
void Connection::send_next_message()
1008
{
159,324✔
1009
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
159,324✔
1010
    REALM_ASSERT(!m_sending_session);
159,324✔
1011
    REALM_ASSERT(!m_sending);
159,324✔
1012
    if (m_send_ping) {
159,324✔
1013
        send_ping(); // Throws
144✔
1014
        return;
144✔
1015
    }
144✔
1016
    while (!m_sessions_enlisted_to_send.empty()) {
222,090✔
1017
        // The state of being connected is not supposed to be able to change
1018
        // across this loop thanks to the "no callback reentrance" guarantee
1019
        // provided by Websocket::async_write_text(), and friends.
1020
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
163,920✔
1021

1022
        Session& sess = *m_sessions_enlisted_to_send.front();
163,920✔
1023
        m_sessions_enlisted_to_send.pop_front();
163,920✔
1024
        sess.send_message(); // Throws
163,920✔
1025

1026
        if (sess.m_state == Session::Deactivated) {
163,920✔
1027
            finish_session_deactivation(&sess);
2,814✔
1028
        }
2,814✔
1029

1030
        // An enlisted session may choose to not send a message. In that case,
1031
        // we should pass the opportunity to the next enlisted session.
1032
        if (m_sending)
163,920✔
1033
            break;
101,010✔
1034
    }
163,920✔
1035
}
159,180✔
1036

1037

1038
void Connection::send_ping()
1039
{
144✔
1040
    REALM_ASSERT(!m_ping_delay_in_progress);
144✔
1041
    REALM_ASSERT(m_waiting_for_pong);
144✔
1042
    REALM_ASSERT(m_send_ping);
144✔
1043

1044
    m_send_ping = false;
144✔
1045
    if (m_reconnect_info.scheduled_reset)
144✔
1046
        m_ping_after_scheduled_reset_of_reconnect_info = true;
104✔
1047

1048
    m_last_ping_sent_at = monotonic_clock_now();
144✔
1049
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
144✔
1050
                 m_previous_ping_rtt); // Throws
144✔
1051

1052
    ClientProtocol& protocol = get_client_protocol();
144✔
1053
    OutputBuffer& out = get_output_buffer();
144✔
1054
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
144✔
1055
    initiate_write_ping(out);                                          // Throws
144✔
1056
    m_ping_sent = true;
144✔
1057
}
144✔
1058

1059

1060
void Connection::initiate_write_ping(const OutputBuffer& out)
1061
{
144✔
1062
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
144✔
1063
        if (sentinel->destroyed) {
144✔
1064
            return;
×
1065
        }
×
1066
        if (!status.is_ok()) {
144✔
1067
            if (status != ErrorCodes::Error::OperationAborted) {
×
1068
                // Write errors will be handled by the websocket_write_error_handler() callback
1069
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1070
            }
×
1071
            return;
×
1072
        }
×
1073
        handle_write_ping(); // Throws
144✔
1074
    });                      // Throws
144✔
1075
    m_sending = true;
144✔
1076
}
144✔
1077

1078

1079
void Connection::handle_write_ping()
1080
{
144✔
1081
    REALM_ASSERT(m_sending);
144✔
1082
    REALM_ASSERT(!m_sending_session);
144✔
1083
    m_sending = false;
144✔
1084
    send_next_message(); // Throws
144✔
1085
}
144✔
1086

1087

1088
void Connection::handle_message_received(util::Span<const char> data)
1089
{
79,508✔
1090
    // parse_message_received() parses the message and calls the proper handler
1091
    // on the Connection object (this).
1092
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
79,508✔
1093
}
79,508✔
1094

1095

1096
void Connection::initiate_disconnect_wait()
1097
{
4,600✔
1098
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,600✔
1099

1100
    if (m_disconnect_delay_in_progress) {
4,600✔
1101
        m_reconnect_disconnect_timer.reset();
2,132✔
1102
        m_disconnect_delay_in_progress = false;
2,132✔
1103
    }
2,132✔
1104

1105
    milliseconds_type time = m_client.m_connection_linger_time;
4,600✔
1106

1107
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,600✔
1108
        // If the operation is aborted, the connection object may have been
1109
        // destroyed.
1110
        if (status != ErrorCodes::OperationAborted)
4,600✔
1111
            handle_disconnect_wait(status); // Throws
14✔
1112
    });                                     // Throws
4,600✔
1113
    m_disconnect_delay_in_progress = true;
4,600✔
1114
}
4,600✔
1115

1116

1117
void Connection::handle_disconnect_wait(Status status)
1118
{
14✔
1119
    if (!status.is_ok()) {
14✔
1120
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1121
        throw Exception(status);
×
1122
    }
×
1123

1124
    m_disconnect_delay_in_progress = false;
14✔
1125

1126
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
14✔
1127
    if (m_num_active_unsuspended_sessions == 0) {
14✔
1128
        if (m_client.m_connection_linger_time > 0)
14✔
1129
            logger.detail("Linger time expired"); // Throws
2✔
1130
        voluntary_disconnect();                   // Throws
14✔
1131
        logger.info("Disconnected");              // Throws
14✔
1132
    }
14✔
1133
}
14✔
1134

1135

1136
void Connection::close_due_to_protocol_error(Status status)
1137
{
16✔
1138
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1139
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1140
    involuntary_disconnect(std::move(error_info),
16✔
1141
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1142
}
16✔
1143

1144

1145
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1146
{
450✔
1147
    logger.info("Connection closed due to error: %1", status); // Throws
450✔
1148

1149
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
450✔
1150
}
450✔
1151

1152

1153
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1154
{
614✔
1155
    logger.info("Connection closed due to transient error: %1", status); // Throws
614✔
1156
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
614✔
1157
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
614✔
1158

1159
    involuntary_disconnect(std::move(error_info), reason); // Throw
614✔
1160
}
614✔
1161

1162

1163
// Close connection due to error discovered on the server-side, and then
1164
// reported to the client by way of a connection-level ERROR message.
1165
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1166
{
70✔
1167
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
70✔
1168
                int(error_code)); // Throws
70✔
1169

1170
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
70✔
1171
                                      : ConnectionTerminationReason::server_said_try_again_later;
70✔
1172
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
70✔
1173
                           reason); // Throws
70✔
1174
}
70✔
1175

1176

1177
void Connection::disconnect(const SessionErrorInfo& info)
1178
{
3,754✔
1179
    // Cancel connect timeout watchdog
1180
    m_connect_timer.reset();
3,754✔
1181

1182
    if (m_state == ConnectionState::connected) {
3,754✔
1183
        m_disconnect_time = monotonic_clock_now();
3,550✔
1184
        m_disconnect_has_occurred = true;
3,550✔
1185

1186
        // Sessions that are in the Deactivating state at this time can be
1187
        // immediately discarded, in part because they are no longer enlisted to
1188
        // send. Such sessions will be taken to the Deactivated state by
1189
        // Session::connection_lost(), and then they will be removed from
1190
        // `m_sessions`.
1191
        auto i = m_sessions.begin(), end = m_sessions.end();
3,550✔
1192
        while (i != end) {
7,744✔
1193
            // Prevent invalidation of the main iterator when erasing elements
1194
            auto j = i++;
4,194✔
1195
            Session& sess = *j->second;
4,194✔
1196
            sess.connection_lost(); // Throws
4,194✔
1197
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,198✔
1198
                m_sessions.erase(j);
1,992✔
1199
        }
4,194✔
1200
    }
3,550✔
1201

1202
    change_state_to_disconnected();
3,754✔
1203

1204
    m_ping_delay_in_progress = false;
3,754✔
1205
    m_waiting_for_pong = false;
3,754✔
1206
    m_send_ping = false;
3,754✔
1207
    m_minimize_next_ping_delay = false;
3,754✔
1208
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,754✔
1209
    m_ping_sent = false;
3,754✔
1210
    m_heartbeat_timer.reset();
3,754✔
1211
    m_previous_ping_rtt = 0;
3,754✔
1212

1213
    m_websocket_sentinel->destroyed = true;
3,754✔
1214
    m_websocket_sentinel.reset();
3,754✔
1215
    m_websocket.reset();
3,754✔
1216
    m_input_body_buffer.reset();
3,754✔
1217
    m_sending_session = nullptr;
3,754✔
1218
    m_sessions_enlisted_to_send.clear();
3,754✔
1219
    m_sending = false;
3,754✔
1220

1221
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,754✔
1222
    initiate_reconnect_wait();                                           // Throws
3,754✔
1223
}
3,754✔
1224

1225
bool Connection::is_flx_sync_connection() const noexcept
1226
{
113,052✔
1227
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
113,052✔
1228
}
113,052✔
1229

1230
void Connection::receive_pong(milliseconds_type timestamp)
1231
{
138✔
1232
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
138✔
1233

1234
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
138✔
1235
    if (REALM_UNLIKELY(!legal_at_this_time)) {
138✔
1236
        close_due_to_protocol_error(
×
1237
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1238
        return;
×
1239
    }
×
1240

1241
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
138✔
1242
        close_due_to_protocol_error(
×
1243
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1244
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1245
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1246
        return;
×
1247
    }
×
1248

1249
    milliseconds_type now = monotonic_clock_now();
138✔
1250
    milliseconds_type round_trip_time = now - timestamp;
138✔
1251
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
138✔
1252
    m_previous_ping_rtt = round_trip_time;
138✔
1253

1254
    // If this PONG message is a response to a PING mesage that was sent after
1255
    // the last invocation of cancel_reconnect_delay(), then the connection is
1256
    // still good, and we do not have to skip the next reconnect delay.
1257
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
138✔
1258
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
100✔
1259
        m_ping_after_scheduled_reset_of_reconnect_info = false;
100✔
1260
        m_reconnect_info.scheduled_reset = false;
100✔
1261
    }
100✔
1262

1263
    m_heartbeat_timer.reset();
138✔
1264
    m_waiting_for_pong = false;
138✔
1265

1266
    initiate_ping_delay(now); // Throws
138✔
1267

1268
    if (m_client.m_roundtrip_time_handler)
138✔
1269
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1270
}
138✔
1271

1272
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1273
{
73,230✔
1274
    if (session_ident == 0) {
73,230✔
1275
        return nullptr;
×
1276
    }
×
1277

1278
    auto* sess = get_session(session_ident);
73,230✔
1279
    if (REALM_LIKELY(sess)) {
73,230✔
1280
        return sess;
73,228✔
1281
    }
73,228✔
1282
    // Check the history to see if the message received was for a previous session
1283
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2✔
1284
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1285
        close_due_to_protocol_error(
×
1286
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1287
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1288
                          session_ident)});
×
1289
    }
×
1290
    else {
2✔
1291
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2✔
1292
                     session_ident); // Throws
2✔
1293
    }
2✔
1294
    return nullptr;
2✔
1295
}
73,230✔
1296

1297
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1298
{
764✔
1299
    Session* sess = nullptr;
764✔
1300
    if (session_ident != 0) {
764✔
1301
        sess = find_and_validate_session(session_ident, "ERROR");
690✔
1302
        if (REALM_UNLIKELY(!sess)) {
690✔
1303
            return;
×
1304
        }
×
1305
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
690✔
1306
            close_due_to_protocol_error(std::move(status)); // Throws
×
1307
            return;
×
1308
        }
×
1309

1310
        if (sess->m_state == Session::Deactivated) {
690✔
1311
            finish_session_deactivation(sess);
2✔
1312
        }
2✔
1313
        return;
690✔
1314
    }
690✔
1315

1316
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
74✔
1317
                info.message, info.raw_error_code, info.is_fatal, session_ident,
74✔
1318
                info.server_requests_action); // Throws
74✔
1319

1320
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
74✔
1321
    if (REALM_LIKELY(known_error_code)) {
74✔
1322
        ProtocolError error_code = ProtocolError(info.raw_error_code);
70✔
1323
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
70✔
1324
            close_due_to_server_side_error(error_code, info); // Throws
70✔
1325
            return;
70✔
1326
        }
70✔
1327
        close_due_to_protocol_error(
×
1328
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1329
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1330
                          info.raw_error_code)});
×
1331
    }
×
1332
    else {
4✔
1333
        close_due_to_protocol_error(
4✔
1334
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1335
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1336
    }
4✔
1337
}
74✔
1338

1339

1340
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1341
                                             session_ident_type session_ident)
1342
{
20✔
1343
    if (session_ident == 0) {
20✔
1344
        return close_due_to_protocol_error(
×
1345
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1346
    }
×
1347

1348
    if (!is_flx_sync_connection()) {
20✔
1349
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1350
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1351
    }
×
1352

1353
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1354
    if (REALM_UNLIKELY(!sess)) {
20✔
1355
        return;
×
1356
    }
×
1357

1358
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1359
        close_due_to_protocol_error(std::move(status));
×
1360
    }
×
1361
}
20✔
1362

1363

1364
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1365
{
3,678✔
1366
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,678✔
1367
    if (REALM_UNLIKELY(!sess)) {
3,678✔
1368
        return;
×
1369
    }
×
1370

1371
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,678✔
1372
        close_due_to_protocol_error(std::move(status)); // Throws
×
1373
}
3,678✔
1374

1375
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1376
{
47,964✔
1377
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
47,964✔
1378
    if (REALM_UNLIKELY(!sess)) {
47,964✔
1379
        return;
×
1380
    }
×
1381

1382
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
47,964✔
1383
        close_due_to_protocol_error(std::move(status));
2✔
1384
    }
2✔
1385
}
47,964✔
1386

1387
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1388
{
16,558✔
1389
    Session* sess = find_and_validate_session(session_ident, "MARK");
16,558✔
1390
    if (REALM_UNLIKELY(!sess)) {
16,558✔
1391
        return;
×
1392
    }
×
1393

1394
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
16,558✔
1395
        close_due_to_protocol_error(std::move(status)); // Throws
10✔
1396
}
16,558✔
1397

1398

1399
void Connection::receive_unbound_message(session_ident_type session_ident)
1400
{
4,272✔
1401
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,272✔
1402
    if (REALM_UNLIKELY(!sess)) {
4,272✔
1403
        return;
×
1404
    }
×
1405

1406
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,272✔
1407
        close_due_to_protocol_error(std::move(status)); // Throws
×
1408
        return;
×
1409
    }
×
1410

1411
    if (sess->m_state == Session::Deactivated) {
4,272✔
1412
        finish_session_deactivation(sess);
4,272✔
1413
    }
4,272✔
1414
}
4,272✔
1415

1416

1417
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1418
                                               std::string_view body)
1419
{
52✔
1420
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
52✔
1421
    if (REALM_UNLIKELY(!sess)) {
52✔
1422
        return;
×
1423
    }
×
1424

1425
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
52✔
1426
        close_due_to_protocol_error(std::move(status));
×
1427
    }
×
1428
}
52✔
1429

1430

1431
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1432
                                            std::string_view message)
1433
{
6,068✔
1434
    std::string prefix;
6,068✔
1435
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
6,068✔
1436
        prefix = util::format("Server[%1]", m_appservices_coid);
6,068✔
1437
    }
6,068✔
UNCOV
1438
    else {
×
UNCOV
1439
        prefix = "Server";
×
UNCOV
1440
    }
×
1441

1442
    if (session_ident != 0) {
6,068✔
1443
        if (auto sess = get_session(session_ident)) {
4,062✔
1444
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
4,046✔
1445
            return;
4,046✔
1446
        }
4,046✔
1447

1448
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
16✔
1449
                   message);
16✔
1450
        return;
16✔
1451
    }
4,062✔
1452

1453
    logger.log(level, "%1 log: %2", prefix, message);
2,006✔
1454
}
2,006✔
1455

1456

1457
void Connection::receive_appservices_request_id(std::string_view coid)
1458
{
5,556✔
1459
    // Only set once per connection
1460
    if (!coid.empty() && m_appservices_coid.empty()) {
5,556✔
1461
        m_appservices_coid = coid;
2,482✔
1462
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,482✔
1463
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,482✔
1464
    }
2,482✔
1465
}
5,556✔
1466

1467

1468
void Connection::handle_protocol_error(Status status)
1469
{
×
1470
    close_due_to_protocol_error(std::move(status));
×
1471
}
×
1472

1473

1474
// Sessions are guaranteed to be granted the opportunity to send a message in
1475
// the order that they enlist. Note that this is important to ensure
1476
// nonoverlapping communication with the server for consecutive sessions
1477
// associated with the same Realm file.
1478
//
1479
// CAUTION: The specified session may get destroyed before this function
1480
// returns, but only if its Session::send_message() puts it into the Deactivated
1481
// state.
1482
void Connection::enlist_to_send(Session* sess)
1483
{
165,560✔
1484
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
165,560✔
1485
    m_sessions_enlisted_to_send.push_back(sess); // Throws
165,560✔
1486
    if (!m_sending)
165,560✔
1487
        send_next_message(); // Throws
59,860✔
1488
}
165,560✔
1489

1490

1491
std::string Connection::get_active_appservices_connection_id()
1492
{
72✔
1493
    return m_appservices_coid;
72✔
1494
}
72✔
1495

1496
void Session::cancel_resumption_delay()
1497
{
4,180✔
1498
    REALM_ASSERT_EX(m_state == Active, m_state);
4,180✔
1499

1500
    if (!m_suspended)
4,180✔
1501
        return;
4,120✔
1502

1503
    m_suspended = false;
60✔
1504

1505
    logger.debug("Resumed"); // Throws
60✔
1506

1507
    if (unbind_process_complete())
60✔
1508
        initiate_rebind(); // Throws
36✔
1509

1510
    m_conn.one_more_active_unsuspended_session(); // Throws
60✔
1511
    if (m_try_again_activation_timer) {
60✔
1512
        m_try_again_activation_timer.reset();
8✔
1513
    }
8✔
1514

1515
    on_resumed(); // Throws
60✔
1516
}
60✔
1517

1518

1519
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1520
                                                 std::vector<ProtocolErrorInfo>* out)
1521
{
23,054✔
1522
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,054✔
1523
        return;
23,012✔
1524
    }
23,012✔
1525

1526
#ifdef REALM_DEBUG
42✔
1527
    REALM_ASSERT_DEBUG(
42✔
1528
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
42✔
1529
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
42✔
1530
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
42✔
1531
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
42✔
1532
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
42✔
1533
                       }));
42✔
1534
#endif
42✔
1535

1536
    while (!m_pending_compensating_write_errors.empty() &&
86✔
1537
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
86✔
1538
               changesets.back().version) {
44✔
1539
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1540
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1541
        out->push_back(std::move(cur_error));
44✔
1542
        m_pending_compensating_write_errors.pop_front();
44✔
1543
    }
44✔
1544
}
42✔
1545

1546

1547
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1548
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1549
                                   DownloadBatchState download_batch_state)
1550
{
45,264✔
1551
    auto& history = get_history();
45,264✔
1552
    if (received_changesets.empty()) {
45,264✔
1553
        if (download_batch_state == DownloadBatchState::MoreToCome) {
22,188✔
1554
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1555
                                       "received empty download message that was not the last in batch",
×
1556
                                       ProtocolError::bad_progress);
×
1557
        }
×
1558
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
22,188✔
1559
        return;
22,188✔
1560
    }
22,188✔
1561

1562
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,076✔
1563
    auto transact = get_db()->start_read();
23,076✔
1564
    history.integrate_server_changesets(
23,076✔
1565
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,076✔
1566
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
23,076✔
1567
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,054✔
1568
        }); // Throws
23,054✔
1569
    if (received_changesets.size() == 1) {
23,076✔
1570
        logger.debug("1 remote changeset integrated, producing client version %1",
15,678✔
1571
                     version_info.sync_version.version); // Throws
15,678✔
1572
    }
15,678✔
1573
    else {
7,398✔
1574
        logger.debug("%2 remote changesets integrated, producing client version %1",
7,398✔
1575
                     version_info.sync_version.version, received_changesets.size()); // Throws
7,398✔
1576
    }
7,398✔
1577

1578
    for (const auto& pending_error : pending_compensating_write_errors) {
23,076✔
1579
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1580
                    pending_error.compensating_write_rejected_client_version,
44✔
1581
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1582
        try {
44✔
1583
            on_connection_state_changed(
44✔
1584
                m_conn.get_state(),
44✔
1585
                SessionErrorInfo{pending_error,
44✔
1586
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1587
                                                          pending_error.message)});
44✔
1588
        }
44✔
1589
        catch (...) {
44✔
1590
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1591
        }
×
1592
    }
44✔
1593
}
23,076✔
1594

1595

1596
void Session::on_integration_failure(const IntegrationException& error)
1597
{
40✔
1598
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1599
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1600
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1601

1602
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1603
    m_error_to_send = true;
40✔
1604
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1605
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1606
    // Surface the error to the user otherwise is lost.
1607
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1608

1609
    // Since the deactivation process has not been initiated, the UNBIND
1610
    // message cannot have been sent unless an ERROR message was received.
1611
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1612
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1613
        ensure_enlisted_to_send(); // Throws
36✔
1614
    }
36✔
1615
}
40✔
1616

1617
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress,
1618
                                       bool changesets_integrated)
1619
{
47,162✔
1620
    REALM_ASSERT_EX(m_state == Active, m_state);
47,162✔
1621
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
47,162✔
1622
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
47,162✔
1623

1624
    m_download_progress = progress.download;
47,162✔
1625
    m_progress = progress;
47,162✔
1626

1627
    if (upload_progressed) {
47,162✔
1628
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
34,126✔
1629
            if (progress.upload.client_version > m_upload_progress.client_version)
14,216✔
1630
                m_upload_progress = progress.upload;
942✔
1631
            m_last_version_selected_for_upload = progress.upload.client_version;
14,216✔
1632
        }
14,216✔
1633

1634
        notify_upload_progress();
34,126✔
1635
        check_for_upload_completion();
34,126✔
1636
    }
34,126✔
1637

1638
    bool resume_upload = do_recognize_sync_version(client_version); // Allows upload process to resume
47,162✔
1639

1640
    // notify also when final DOWNLOAD received with no changesets
1641
    bool download_progressed = changesets_integrated || (!upload_progressed && resume_upload);
47,162✔
1642
    if (download_progressed)
47,162✔
1643
        notify_download_progress();
26,868✔
1644

1645
    check_for_download_completion(); // Throws
47,162✔
1646

1647
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1648
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,162✔
1649
        auto& flx_subscription_store = *get_flx_subscription_store();
3,310✔
1650
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,310✔
1651
    }
3,310✔
1652

1653
    // Since the deactivation process has not been initiated, the UNBIND
1654
    // message cannot have been sent unless an ERROR message was received.
1655
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
47,162✔
1656
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
47,162✔
1657
        ensure_enlisted_to_send(); // Throws
47,152✔
1658
    }
47,152✔
1659
}
47,162✔
1660

1661

1662
Session::~Session()
1663
{
10,304✔
1664
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1665
}
10,304✔
1666

1667

1668
std::string Session::make_logger_prefix(session_ident_type ident)
1669
{
10,318✔
1670
    std::ostringstream out;
10,318✔
1671
    out.imbue(std::locale::classic());
10,318✔
1672
    out << "Session[" << ident << "]: "; // Throws
10,318✔
1673
    return out.str();                    // Throws
10,318✔
1674
}
10,318✔
1675

1676

1677
void Session::activate()
1678
{
10,318✔
1679
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,318✔
1680

1681
    logger.debug("Activating"); // Throws
10,318✔
1682

1683
    bool has_pending_client_reset = false;
10,318✔
1684
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,318✔
1685
        bool file_exists = util::File::exists(get_realm_path());
10,318✔
1686
        m_performing_client_reset = get_client_reset_config().has_value();
10,318✔
1687

1688
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
10,318✔
1689
        if (!m_performing_client_reset) {
10,318✔
1690
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,950✔
1691
                                     &has_pending_client_reset); // Throws
9,950✔
1692
        }
9,950✔
1693
    }
10,318✔
1694
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,318✔
1695
                 m_client_file_ident.salt); // Throws
10,318✔
1696
    m_upload_progress = m_progress.upload;
10,318✔
1697
    m_last_version_selected_for_upload = m_upload_progress.client_version;
10,318✔
1698
    m_download_progress = m_progress.download;
10,318✔
1699
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,318✔
1700
    init_progress_handler();
10,318✔
1701

1702
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
10,318✔
1703
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,318✔
1704
    logger.debug("progress_download_client_version = %1",
10,318✔
1705
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,318✔
1706
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,318✔
1707
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,318✔
1708

1709
    reset_protocol_state();
10,318✔
1710
    m_state = Active;
10,318✔
1711

1712
    call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
10,318✔
1713
                    DownloadBatchState::SteadyState, 0);
10,318✔
1714

1715
    REALM_ASSERT(!m_suspended);
10,318✔
1716
    m_conn.one_more_active_unsuspended_session(); // Throws
10,318✔
1717

1718
    try {
10,318✔
1719
        process_pending_flx_bootstrap();
10,318✔
1720
    }
10,318✔
1721
    catch (const IntegrationException& error) {
10,318✔
1722
        on_integration_failure(error);
×
1723
    }
×
1724
    catch (...) {
10,318✔
1725
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1726
    }
4✔
1727

1728
    if (has_pending_client_reset) {
10,318✔
1729
        handle_pending_client_reset_acknowledgement();
18✔
1730
    }
18✔
1731
}
10,318✔
1732

1733

1734
// The caller (Connection) must discard the session if the session has become
1735
// deactivated upon return.
1736
void Session::initiate_deactivation()
1737
{
10,306✔
1738
    REALM_ASSERT_EX(m_state == Active, m_state);
10,306✔
1739

1740
    logger.debug("Initiating deactivation"); // Throws
10,306✔
1741

1742
    m_state = Deactivating;
10,306✔
1743

1744
    if (!m_suspended)
10,306✔
1745
        m_conn.one_less_active_unsuspended_session(); // Throws
9,706✔
1746

1747
    if (m_enlisted_to_send) {
10,306✔
1748
        REALM_ASSERT(!unbind_process_complete());
5,316✔
1749
        return;
5,316✔
1750
    }
5,316✔
1751

1752
    // Deactivate immediately if the BIND message has not yet been sent and the
1753
    // session is not enlisted to send, or if the unbinding process has already
1754
    // completed.
1755
    if (!m_bind_message_sent || unbind_process_complete()) {
4,990✔
1756
        complete_deactivation(); // Throws
1,100✔
1757
        // Life cycle state is now Deactivated
1758
        return;
1,100✔
1759
    }
1,100✔
1760

1761
    // Ready to send the UNBIND message, if it has not already been sent
1762
    if (!m_unbind_message_sent) {
3,890✔
1763
        enlist_to_send(); // Throws
3,680✔
1764
        return;
3,680✔
1765
    }
3,680✔
1766
}
3,890✔
1767

1768

1769
void Session::complete_deactivation()
1770
{
10,304✔
1771
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,304✔
1772
    m_state = Deactivated;
10,304✔
1773

1774
    logger.debug("Deactivation completed"); // Throws
10,304✔
1775
}
10,304✔
1776

1777

1778
// Called by the associated Connection object when this session is granted an
1779
// opportunity to send a message.
1780
//
1781
// The caller (Connection) must discard the session if the session has become
1782
// deactivated upon return.
1783
void Session::send_message()
1784
{
163,920✔
1785
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
163,920✔
1786
    REALM_ASSERT(m_enlisted_to_send);
163,920✔
1787
    m_enlisted_to_send = false;
163,920✔
1788
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
163,920✔
1789
        // Deactivation has been initiated. If the UNBIND message has not been
1790
        // sent yet, there is no point in sending it. Instead, we can let the
1791
        // deactivation process complete.
1792
        if (!m_bind_message_sent) {
9,220✔
1793
            return complete_deactivation(); // Throws
2,814✔
1794
            // Life cycle state is now Deactivated
1795
        }
2,814✔
1796

1797
        // Session life cycle state is Deactivating or the unbinding process has
1798
        // been initiated by a session specific ERROR message
1799
        if (!m_unbind_message_sent)
6,406✔
1800
            send_unbind_message(); // Throws
6,406✔
1801
        return;
6,406✔
1802
    }
9,220✔
1803

1804
    // Session life cycle state is Active and the unbinding process has
1805
    // not been initiated
1806
    REALM_ASSERT(!m_unbind_message_sent);
154,700✔
1807

1808
    if (!m_bind_message_sent)
154,700✔
1809
        return send_bind_message(); // Throws
8,924✔
1810

1811
    if (!m_ident_message_sent) {
145,776✔
1812
        if (have_client_file_ident())
7,472✔
1813
            send_ident_message(); // Throws
7,472✔
1814
        return;
7,472✔
1815
    }
7,472✔
1816

1817
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
138,304✔
1818
                                                      [](const PendingTestCommand& command) {
138,304✔
1819
                                                          return command.pending;
120✔
1820
                                                      });
120✔
1821
    if (has_pending_test_command) {
138,304✔
1822
        return send_test_command_message();
52✔
1823
    }
52✔
1824

1825
    if (m_error_to_send)
138,252✔
1826
        return send_json_error_message(); // Throws
32✔
1827

1828
    // Stop sending upload, mark and query messages when the client detects an error.
1829
    if (m_client_error) {
138,220✔
1830
        return;
14✔
1831
    }
14✔
1832

1833
    if (m_target_download_mark > m_last_download_mark_sent)
138,206✔
1834
        return send_mark_message(); // Throws
17,350✔
1835

1836
    auto is_upload_allowed = [&]() -> bool {
120,856✔
1837
        if (!m_is_flx_sync_session) {
120,856✔
1838
            return true;
105,510✔
1839
        }
105,510✔
1840

1841
        auto migration_store = get_migration_store();
15,346✔
1842
        if (!migration_store) {
15,346✔
1843
            return true;
×
1844
        }
×
1845

1846
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
15,346✔
1847
        if (!sentinel_query_version) {
15,346✔
1848
            return true;
15,318✔
1849
        }
15,318✔
1850

1851
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1852
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1853
    };
15,346✔
1854

1855
    if (!is_upload_allowed()) {
120,856✔
1856
        return;
16✔
1857
    }
16✔
1858

1859
    auto check_pending_flx_version = [&]() -> bool {
120,840✔
1860
        if (!m_is_flx_sync_session) {
120,840✔
1861
            return false;
105,510✔
1862
        }
105,510✔
1863

1864
        if (!m_allow_upload) {
15,330✔
1865
            return false;
3,176✔
1866
        }
3,176✔
1867

1868
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
12,154✔
1869

1870
        if (!m_pending_flx_sub_set) {
12,154✔
1871
            return false;
10,138✔
1872
        }
10,138✔
1873

1874
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,016✔
1875
    };
12,154✔
1876

1877
    if (check_pending_flx_version()) {
120,840✔
1878
        return send_query_change_message(); // throws
1,148✔
1879
    }
1,148✔
1880

1881
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
119,692✔
1882
        return send_upload_message(); // Throws
59,628✔
1883
    }
59,628✔
1884
}
119,692✔
1885

1886

1887
void Session::send_bind_message()
1888
{
8,924✔
1889
    REALM_ASSERT_EX(m_state == Active, m_state);
8,924✔
1890

1891
    session_ident_type session_ident = m_ident;
8,924✔
1892
    bool need_client_file_ident = !have_client_file_ident();
8,924✔
1893
    const bool is_subserver = false;
8,924✔
1894

1895

1896
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,924✔
1897
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,924✔
1898
    OutputBuffer& out = m_conn.get_output_buffer();
8,924✔
1899
    // Discard the token since it's ignored by the server.
1900
    std::string empty_access_token;
8,924✔
1901
    if (m_is_flx_sync_session) {
8,924✔
1902
        nlohmann::json bind_json_data;
1,472✔
1903
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,472✔
1904
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1905
        }
60✔
1906
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,472✔
1907
        auto schema_version = get_schema_version();
1,472✔
1908
        // Send 0 if schema is not versioned.
1909
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,472✔
1910
        if (logger.would_log(util::Logger::Level::debug)) {
1,472✔
1911
            std::string json_data_dump;
1,472✔
1912
            if (!bind_json_data.empty()) {
1,472✔
1913
                json_data_dump = bind_json_data.dump();
1,472✔
1914
            }
1,472✔
1915
            logger.debug(
1,472✔
1916
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,472✔
1917
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,472✔
1918
        }
1,472✔
1919
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,472✔
1920
                                       need_client_file_ident, is_subserver); // Throws
1,472✔
1921
    }
1,472✔
1922
    else {
7,452✔
1923
        std::string server_path = get_virt_path();
7,452✔
1924
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,452✔
1925
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,452✔
1926
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,452✔
1927
                                       need_client_file_ident, is_subserver); // Throws
7,452✔
1928
    }
7,452✔
1929
    m_conn.initiate_write_message(out, this); // Throws
8,924✔
1930

1931
    m_bind_message_sent = true;
8,924✔
1932
    call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
8,924✔
1933
                    DownloadBatchState::SteadyState, 0);
8,924✔
1934

1935
    // Ready to send the IDENT message if the file identifier pair is already
1936
    // available.
1937
    if (!need_client_file_ident)
8,924✔
1938
        enlist_to_send(); // Throws
3,940✔
1939
}
8,924✔
1940

1941

1942
void Session::send_ident_message()
1943
{
7,472✔
1944
    REALM_ASSERT_EX(m_state == Active, m_state);
7,472✔
1945
    REALM_ASSERT(m_bind_message_sent);
7,472✔
1946
    REALM_ASSERT(!m_unbind_message_sent);
7,472✔
1947
    REALM_ASSERT(have_client_file_ident());
7,472✔
1948

1949

1950
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,472✔
1951
    OutputBuffer& out = m_conn.get_output_buffer();
7,472✔
1952
    session_ident_type session_ident = m_ident;
7,472✔
1953

1954
    if (m_is_flx_sync_session) {
7,472✔
1955
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,394✔
1956
        const auto active_query_body = active_query_set.to_ext_json();
1,394✔
1957
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,394✔
1958
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,394✔
1959
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,394✔
1960
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,394✔
1961
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,394✔
1962
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,394✔
1963
                     active_query_body); // Throws
1,394✔
1964
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,394✔
1965
                                        active_query_set.version(), active_query_body); // Throws
1,394✔
1966
        m_last_sent_flx_query_version = active_query_set.version();
1,394✔
1967
    }
1,394✔
1968
    else {
6,078✔
1969
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,078✔
1970
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,078✔
1971
                     "latest_server_version_salt=%6)",
6,078✔
1972
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,078✔
1973
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,078✔
1974
                     m_progress.latest_server_version.salt);                                  // Throws
6,078✔
1975
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
6,078✔
1976
    }
6,078✔
1977
    m_conn.initiate_write_message(out, this); // Throws
7,472✔
1978

1979
    m_ident_message_sent = true;
7,472✔
1980

1981
    // Other messages may be waiting to be sent
1982
    enlist_to_send(); // Throws
7,472✔
1983
}
7,472✔
1984

1985
void Session::send_query_change_message()
1986
{
1,148✔
1987
    REALM_ASSERT_EX(m_state == Active, m_state);
1,148✔
1988
    REALM_ASSERT(m_ident_message_sent);
1,148✔
1989
    REALM_ASSERT(!m_unbind_message_sent);
1,148✔
1990
    REALM_ASSERT(m_pending_flx_sub_set);
1,148✔
1991
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,148✔
1992

1993
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,148✔
1994
        return;
×
1995
    }
×
1996

1997
    auto sub_store = get_flx_subscription_store();
1,148✔
1998
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,148✔
1999
    auto latest_queries = latest_sub_set.to_ext_json();
1,148✔
2000
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,148✔
2001
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,148✔
2002

2003
    OutputBuffer& out = m_conn.get_output_buffer();
1,148✔
2004
    session_ident_type session_ident = get_ident();
1,148✔
2005
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,148✔
2006
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,148✔
2007
    m_conn.initiate_write_message(out, this);
1,148✔
2008

2009
    m_last_sent_flx_query_version = latest_sub_set.version();
1,148✔
2010

2011
    request_download_completion_notification();
1,148✔
2012
}
1,148✔
2013

2014
void Session::send_upload_message()
2015
{
59,628✔
2016
    REALM_ASSERT_EX(m_state == Active, m_state);
59,628✔
2017
    REALM_ASSERT(m_ident_message_sent);
59,628✔
2018
    REALM_ASSERT(!m_unbind_message_sent);
59,628✔
2019

2020
    if (REALM_UNLIKELY(get_client().is_dry_run()))
59,628✔
2021
        return;
×
2022

2023
    version_type target_upload_version = m_last_version_available;
59,628✔
2024
    if (m_pending_flx_sub_set) {
59,628✔
2025
        REALM_ASSERT(m_is_flx_sync_session);
868✔
2026
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
868✔
2027
    }
868✔
2028

2029
    std::vector<UploadChangeset> uploadable_changesets;
59,628✔
2030
    version_type locked_server_version = 0;
59,628✔
2031
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
59,628✔
2032
                                             locked_server_version); // Throws
59,628✔
2033

2034
    if (uploadable_changesets.empty()) {
59,628✔
2035
        // Nothing more to upload right now
2036
        check_for_upload_completion(); // Throws
30,114✔
2037
        // If we need to limit upload up to some version other than the last client version available and there are no
2038
        // changes to upload, then there is no need to send an empty message.
2039
        if (m_pending_flx_sub_set) {
30,114✔
2040
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
272✔
2041
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
272✔
2042
            // Other messages may be waiting to be sent
2043
            return enlist_to_send(); // Throws
272✔
2044
        }
272✔
2045
    }
30,114✔
2046
    else {
29,514✔
2047
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
29,514✔
2048
    }
29,514✔
2049

2050
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
59,356✔
2051
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
596✔
2052
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
596✔
2053
    }
596✔
2054

2055
    version_type progress_client_version = m_upload_progress.client_version;
59,356✔
2056
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
59,356✔
2057

2058
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
59,356✔
2059
                 "locked_server_version=%3, num_changesets=%4)",
59,356✔
2060
                 progress_client_version, progress_server_version, locked_server_version,
59,356✔
2061
                 uploadable_changesets.size()); // Throws
59,356✔
2062

2063
    ClientProtocol& protocol = m_conn.get_client_protocol();
59,356✔
2064
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
59,356✔
2065

2066
    for (const UploadChangeset& uc : uploadable_changesets) {
59,356✔
2067
        logger.debug(util::LogCategory::changeset,
43,122✔
2068
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
43,122✔
2069
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
43,122✔
2070
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
43,122✔
2071
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
43,122✔
2072
        if (logger.would_log(util::Logger::Level::trace)) {
43,122✔
2073
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2074
            if (changeset_data.size() < 1024) {
×
2075
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2076
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2077
            }
×
2078
            else {
×
2079
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2080
                             protocol.compressed_hex_dump(changeset_data));
×
2081
            }
×
2082

2083
#if REALM_DEBUG
×
2084
            ChunkedBinaryInputStream in{changeset_data};
×
2085
            Changeset log;
×
2086
            try {
×
2087
                parse_changeset(in, log);
×
2088
                std::stringstream ss;
×
2089
                log.print(ss);
×
2090
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2091
            }
×
2092
            catch (const BadChangesetError& err) {
×
2093
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2094
            }
×
2095
#endif
×
2096
        }
×
2097

2098
#if 0 // Upload log compaction is currently not implemented
2099
        if (!get_client().m_disable_upload_compaction) {
2100
            ChangesetEncoder::Buffer encode_buffer;
2101

2102
            {
2103
                // Upload compaction only takes place within single changesets to
2104
                // avoid another client seeing inconsistent snapshots.
2105
                ChunkedBinaryInputStream stream{uc.changeset};
2106
                Changeset changeset;
2107
                parse_changeset(stream, changeset); // Throws
2108
                // FIXME: What is the point of setting these? How can compaction care about them?
2109
                changeset.version = uc.progress.client_version;
2110
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2111
                changeset.origin_timestamp = uc.origin_timestamp;
2112
                changeset.origin_file_ident = uc.origin_file_ident;
2113

2114
                compact_changesets(&changeset, 1);
2115
                encode_changeset(changeset, encode_buffer);
2116

2117
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2118
                             encode_buffer.size()); // Throws
2119
            }
2120

2121
            upload_message_builder.add_changeset(
2122
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2123
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2124
        }
2125
        else
2126
#endif
2127
        {
43,122✔
2128
            upload_message_builder.add_changeset(uc.progress.client_version,
43,122✔
2129
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
43,122✔
2130
                                                 uc.origin_file_ident,
43,122✔
2131
                                                 uc.changeset); // Throws
43,122✔
2132
        }
43,122✔
2133
    }
43,122✔
2134

2135
    int protocol_version = m_conn.get_negotiated_protocol_version();
59,356✔
2136
    OutputBuffer& out = m_conn.get_output_buffer();
59,356✔
2137
    session_ident_type session_ident = get_ident();
59,356✔
2138
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
59,356✔
2139
                                               progress_server_version,
59,356✔
2140
                                               locked_server_version); // Throws
59,356✔
2141
    m_conn.initiate_write_message(out, this);                          // Throws
59,356✔
2142

2143
    // Other messages may be waiting to be sent
2144
    enlist_to_send(); // Throws
59,356✔
2145
}
59,356✔
2146

2147

2148
void Session::send_mark_message()
2149
{
17,350✔
2150
    REALM_ASSERT_EX(m_state == Active, m_state);
17,350✔
2151
    REALM_ASSERT(m_ident_message_sent);
17,350✔
2152
    REALM_ASSERT(!m_unbind_message_sent);
17,350✔
2153
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,350✔
2154

2155
    request_ident_type request_ident = m_target_download_mark;
17,350✔
2156
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,350✔
2157

2158
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,350✔
2159
    OutputBuffer& out = m_conn.get_output_buffer();
17,350✔
2160
    session_ident_type session_ident = get_ident();
17,350✔
2161
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,350✔
2162
    m_conn.initiate_write_message(out, this);                      // Throws
17,350✔
2163

2164
    m_last_download_mark_sent = request_ident;
17,350✔
2165

2166
    // Other messages may be waiting to be sent
2167
    enlist_to_send(); // Throws
17,350✔
2168
}
17,350✔
2169

2170

2171
void Session::send_unbind_message()
2172
{
6,406✔
2173
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,406✔
2174
    REALM_ASSERT(m_bind_message_sent);
6,406✔
2175
    REALM_ASSERT(!m_unbind_message_sent);
6,406✔
2176

2177
    logger.debug("Sending: UNBIND"); // Throws
6,406✔
2178

2179
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,406✔
2180
    OutputBuffer& out = m_conn.get_output_buffer();
6,406✔
2181
    session_ident_type session_ident = get_ident();
6,406✔
2182
    protocol.make_unbind_message(out, session_ident); // Throws
6,406✔
2183
    m_conn.initiate_write_message(out, this);         // Throws
6,406✔
2184

2185
    m_unbind_message_sent = true;
6,406✔
2186
}
6,406✔
2187

2188

2189
void Session::send_json_error_message()
2190
{
32✔
2191
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2192
    REALM_ASSERT(m_ident_message_sent);
32✔
2193
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2194
    REALM_ASSERT(m_error_to_send);
32✔
2195
    REALM_ASSERT(m_client_error);
32✔
2196

2197
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2198
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2199
    session_ident_type session_ident = get_ident();
32✔
2200
    auto protocol_error = m_client_error->error_for_server;
32✔
2201

2202
    auto message = util::format("%1", m_client_error->to_status());
32✔
2203
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2204
                session_ident); // Throws
32✔
2205

2206
    nlohmann::json error_body_json;
32✔
2207
    error_body_json["message"] = std::move(message);
32✔
2208
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2209
                                     error_body_json.dump()); // Throws
32✔
2210
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2211

2212
    m_error_to_send = false;
32✔
2213
    enlist_to_send(); // Throws
32✔
2214
}
32✔
2215

2216

2217
void Session::send_test_command_message()
2218
{
52✔
2219
    REALM_ASSERT_EX(m_state == Active, m_state);
52✔
2220

2221
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2222
                           [](const PendingTestCommand& command) {
52✔
2223
                               return command.pending;
52✔
2224
                           });
52✔
2225
    REALM_ASSERT(it != m_pending_test_commands.end());
52✔
2226

2227
    ClientProtocol& protocol = m_conn.get_client_protocol();
52✔
2228
    OutputBuffer& out = m_conn.get_output_buffer();
52✔
2229
    auto session_ident = get_ident();
52✔
2230

2231
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
52✔
2232
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
52✔
2233

2234
    m_conn.initiate_write_message(out, this); // Throws;
52✔
2235
    it->pending = false;
52✔
2236

2237
    enlist_to_send();
52✔
2238
}
52✔
2239

2240
bool Session::client_reset_if_needed()
2241
{
3,654✔
2242
    // Regardless of what happens, once we return from this function we will
2243
    // no longer be in the middle of a client reset
2244
    m_performing_client_reset = false;
3,654✔
2245

2246
    // Even if we end up not actually performing a client reset, consume the
2247
    // config to ensure that the resources it holds are released
2248
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,654✔
2249
    if (!client_reset_config) {
3,654✔
2250
        return false;
3,286✔
2251
    }
3,286✔
2252

2253
    auto on_flx_version_complete = [this](int64_t version) {
368✔
2254
        this->on_flx_sync_version_complete(version);
292✔
2255
    };
292✔
2256
    bool did_reset = client_reset::perform_client_reset(
368✔
2257
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
368✔
2258
        std::move(client_reset_config->notify_before_client_reset),
368✔
2259
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
368✔
2260
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
368✔
2261
    if (!did_reset) {
368✔
2262
        return false;
×
2263
    }
×
2264

2265
    // The fresh Realm has been used to reset the state
2266
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
368✔
2267

2268
    SaltedFileIdent client_file_ident;
368✔
2269
    bool has_pending_client_reset = false;
368✔
2270
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
368✔
2271
                             &has_pending_client_reset); // Throws
368✔
2272
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
368✔
2273
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
368✔
2274
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
368✔
2275
                    m_progress.download.last_integrated_client_version);
368✔
2276
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
368✔
2277
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
368✔
2278

2279
    m_upload_progress = m_progress.upload;
368✔
2280
    m_download_progress = m_progress.download;
368✔
2281
    init_progress_handler();
368✔
2282
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2283
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2284
    // For both, we want to allow uploads again without needing external changes to download first.
2285
    m_allow_upload = true;
368✔
2286
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
368✔
2287

2288
    if (has_pending_client_reset) {
368✔
2289
        handle_pending_client_reset_acknowledgement();
288✔
2290
    }
288✔
2291

2292
    update_subscription_version_info();
368✔
2293

2294
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2295
    if (auto migration_store = get_migration_store()) {
368✔
2296
        migration_store->complete_migration_or_rollback();
260✔
2297
    }
260✔
2298

2299
    return true;
368✔
2300
}
368✔
2301

2302
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2303
{
3,678✔
2304
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,678✔
2305
                 client_file_ident.salt); // Throws
3,678✔
2306

2307
    // Ignore the message if the deactivation process has been initiated,
2308
    // because in that case, the associated Realm and SessionWrapper must
2309
    // not be accessed any longer.
2310
    if (m_state != Active)
3,678✔
2311
        return Status::OK(); // Success
24✔
2312

2313
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,656✔
2314
                               !m_unbound_message_received);
3,656✔
2315
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,654✔
2316
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2317
    }
×
2318
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,654✔
2319
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2320
    }
×
2321
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,654✔
2322
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2323
    }
×
2324

2325
    m_client_file_ident = client_file_ident;
3,654✔
2326

2327
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,654✔
2328
        // Ready to send the IDENT message
2329
        ensure_enlisted_to_send(); // Throws
×
2330
        return Status::OK();       // Success
×
2331
    }
×
2332

2333
    // if a client reset happens, it will take care of setting the file ident
2334
    // and if not, we do it here
2335
    bool did_client_reset = false;
3,654✔
2336
    try {
3,654✔
2337
        did_client_reset = client_reset_if_needed();
3,654✔
2338
    }
3,654✔
2339
    catch (const std::exception& e) {
3,654✔
2340
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
80✔
2341
        logger.error(err_msg.c_str());
80✔
2342
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2343
        suspend(err_info);
80✔
2344
        return Status::OK();
80✔
2345
    }
80✔
2346
    if (!did_client_reset) {
3,574✔
2347
        get_history().set_client_file_ident(client_file_ident,
3,284✔
2348
                                            m_fix_up_object_ids); // Throws
3,284✔
2349
        m_progress.download.last_integrated_client_version = 0;
3,284✔
2350
        m_progress.upload.client_version = 0;
3,284✔
2351
        m_last_version_selected_for_upload = 0;
3,284✔
2352
    }
3,284✔
2353

2354
    // Ready to send the IDENT message
2355
    ensure_enlisted_to_send(); // Throws
3,574✔
2356
    return Status::OK();       // Success
3,574✔
2357
}
3,654✔
2358

2359
Status Session::receive_download_message(const DownloadMessage& message)
2360
{
47,960✔
2361
    // Ignore the message if the deactivation process has been initiated,
2362
    // because in that case, the associated Realm and SessionWrapper must
2363
    // not be accessed any longer.
2364
    if (m_state != Active)
47,960✔
2365
        return Status::OK();
510✔
2366

2367
    bool is_flx = m_conn.is_flx_sync_connection();
47,450✔
2368
    int64_t query_version = is_flx ? *message.query_version : 0;
47,450✔
2369

2370
    if (!is_flx || query_version > 0)
47,450✔
2371
        enable_progress_notifications();
45,836✔
2372

2373
    // If this is a PBS connection, then every download message is its own complete batch.
2374
    bool last_in_batch = is_flx ? *message.last_in_batch : true;
47,450✔
2375
    auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
47,450✔
2376
    if (is_steady_state_download_message(batch_state, query_version))
47,450✔
2377
        batch_state = DownloadBatchState::SteadyState;
45,274✔
2378

2379
    auto&& progress = message.progress;
47,450✔
2380
    if (is_flx) {
47,450✔
2381
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
3,576✔
2382
                     "latest_server_version=%3, latest_server_version_salt=%4, "
3,576✔
2383
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
3,576✔
2384
                     "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
3,576✔
2385
                     progress.download.server_version, progress.download.last_integrated_client_version,
3,576✔
2386
                     progress.latest_server_version.version, progress.latest_server_version.salt,
3,576✔
2387
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
3,576✔
2388
                     message.progress_estimate, last_in_batch, query_version, message.changesets.size()); // Throws
3,576✔
2389
    }
3,576✔
2390
    else {
43,874✔
2391
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,874✔
2392
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,874✔
2393
                     "upload_client_version=%5, upload_server_version=%6, "
43,874✔
2394
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,874✔
2395
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,874✔
2396
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,874✔
2397
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,874✔
2398
                     message.downloadable_bytes, message.changesets.size()); // Throws
43,874✔
2399
    }
43,874✔
2400

2401
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2402
    // changeset over and over again.
2403
    if (m_client_error) {
47,450✔
2404
        logger.debug("Ignoring download message because the client detected an integration error");
×
2405
        return Status::OK();
×
2406
    }
×
2407

2408
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
47,454✔
2409
    if (REALM_UNLIKELY(!legal_at_this_time)) {
47,450✔
2410
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
2✔
2411
    }
2✔
2412
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
47,448✔
2413
        logger.error("Bad sync progress received (%1)", status);
×
2414
        return status;
×
2415
    }
×
2416

2417
    version_type server_version = m_progress.download.server_version;
47,448✔
2418
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
47,448✔
2419
    for (const RemoteChangeset& changeset : message.changesets) {
48,738✔
2420
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2421
        // version must be increasing, but can stay the same during bootstraps.
2422
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
45,112✔
2423
                                                         : (changeset.remote_version > server_version);
45,112✔
2424
        // Each server version cannot be greater than the one in the header of the download message.
2425
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
45,112✔
2426
        if (!good_server_version) {
45,112✔
2427
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2428
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2429
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2430
        }
×
2431
        server_version = changeset.remote_version;
45,112✔
2432
        // Check that per-changeset last integrated client version is "weakly"
2433
        // increasing.
2434
        bool good_client_version =
45,112✔
2435
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
45,112✔
2436
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
45,112✔
2437
        if (!good_client_version) {
45,112✔
2438
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2439
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2440
                                 "(%1, %2, %3)",
×
2441
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2442
                                 progress.download.last_integrated_client_version)};
×
2443
        }
×
2444
        last_integrated_client_version = changeset.last_integrated_local_version;
45,112✔
2445
        // Server shouldn't send our own changes, and zero is not a valid client
2446
        // file identifier.
2447
        bool good_file_ident =
45,112✔
2448
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
45,112✔
2449
        if (!good_file_ident) {
45,112✔
2450
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2451
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2452
                                 changeset.origin_file_ident)};
×
2453
        }
×
2454
    }
45,112✔
2455

2456
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
47,448✔
2457
                                       batch_state, message.changesets.size());
47,448✔
2458
    if (hook_action == SyncClientHookAction::EarlyReturn) {
47,448✔
2459
        return Status::OK();
16✔
2460
    }
16✔
2461
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
47,432✔
2462

2463
    if (is_flx)
47,432✔
2464
        update_download_estimate(message.progress_estimate);
3,562✔
2465

2466
    if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) {
47,432✔
2467
        clear_resumption_delay_state();
2,168✔
2468
        return Status::OK();
2,168✔
2469
    }
2,168✔
2470

2471
    uint64_t downloadable_bytes = is_flx ? 0 : message.downloadable_bytes;
45,264✔
2472
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, message.changesets); // Throws
45,264✔
2473

2474
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
45,264✔
2475
                                  batch_state, message.changesets.size());
45,264✔
2476
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,264✔
2477
        return Status::OK();
×
2478
    }
×
2479
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,264✔
2480

2481
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2482
    // after a retryable session error.
2483
    clear_resumption_delay_state();
45,264✔
2484
    return Status::OK();
45,264✔
2485
}
45,264✔
2486

2487
Status Session::receive_mark_message(request_ident_type request_ident)
2488
{
16,558✔
2489
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
16,558✔
2490

2491
    // Ignore the message if the deactivation process has been initiated,
2492
    // because in that case, the associated Realm and SessionWrapper must
2493
    // not be accessed any longer.
2494
    if (m_state != Active)
16,558✔
2495
        return Status::OK(); // Success
82✔
2496

2497
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,476✔
2498
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,476✔
2499
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
10✔
2500
    }
10✔
2501
    bool good_request_ident =
16,466✔
2502
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,466✔
2503
    if (REALM_UNLIKELY(!good_request_ident)) {
16,466✔
2504
        return {
×
2505
            ErrorCodes::SyncProtocolInvariantFailed,
×
2506
            util::format(
×
2507
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2508
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2509
    }
×
2510

2511
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,466✔
2512
    m_last_download_mark_received = request_ident;
16,466✔
2513
    check_for_download_completion(); // Throws
16,466✔
2514

2515
    return Status::OK(); // Success
16,466✔
2516
}
16,466✔
2517

2518

2519
// The caller (Connection) must discard the session if the session has become
2520
// deactivated upon return.
2521
Status Session::receive_unbound_message()
2522
{
4,272✔
2523
    logger.debug("Received: UNBOUND");
4,272✔
2524

2525
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,272✔
2526
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,272✔
2527
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2528
    }
×
2529

2530
    // The fact that the UNBIND message has been sent, but an ERROR message has
2531
    // not been received, implies that the deactivation process must have been
2532
    // initiated, so this session must be in the Deactivating state or the session
2533
    // has been suspended because of a client side error.
2534
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,272✔
2535

2536
    m_unbound_message_received = true;
4,272✔
2537

2538
    // Detect completion of the unbinding process
2539
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,272✔
2540
        // The deactivation process completes when the unbinding process
2541
        // completes.
2542
        complete_deactivation(); // Throws
4,270✔
2543
        // Life cycle state is now Deactivated
2544
    }
4,270✔
2545

2546
    return Status::OK(); // Success
4,272✔
2547
}
4,272✔
2548

2549

2550
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2551
{
20✔
2552
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2553
    // Ignore the message if the deactivation process has been initiated,
2554
    // because in that case, the associated Realm and SessionWrapper must
2555
    // not be accessed any longer.
2556
    if (m_state == Active) {
20✔
2557
        on_flx_sync_error(query_version, message); // throws
20✔
2558
    }
20✔
2559
    return Status::OK();
20✔
2560
}
20✔
2561

2562
// The caller (Connection) must discard the session if the session has become
2563
// deactivated upon return.
2564
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2565
{
702✔
2566
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
702✔
2567
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
702✔
2568

2569
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
702✔
2570
    if (REALM_UNLIKELY(!legal_at_this_time)) {
702✔
2571
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2572
    }
×
2573

2574
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
702✔
2575
    auto status = protocol_error_to_status(protocol_error, info.message);
702✔
2576
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
702✔
2577
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2578
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2579
                             info.raw_error_code)};
×
2580
    }
×
2581

2582
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2583
    // the SessionWrapper may not be available
2584
    if (m_state == Active) {
702✔
2585
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
700✔
2586
        if (debug_action == SyncClientHookAction::EarlyReturn) {
700✔
2587
            return Status::OK();
8✔
2588
        }
8✔
2589
    }
700✔
2590

2591
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2592
    // containing the compensating write has appeared in a download message.
2593
    if (status == ErrorCodes::SyncCompensatingWrite) {
694✔
2594
        // If the client is not active, the compensating writes will not be processed now, but will be
2595
        // sent again the next time the client connects
2596
        if (m_state == Active) {
44✔
2597
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2598
            m_pending_compensating_write_errors.push_back(info);
44✔
2599
        }
44✔
2600
        return Status::OK();
44✔
2601
    }
44✔
2602

2603
    if (protocol_error == ProtocolError::schema_version_changed) {
650✔
2604
        // Enable upload immediately if the session is still active.
2605
        if (m_state == Active) {
68✔
2606
            auto wt = get_db()->start_write();
68✔
2607
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
68✔
2608
            wt->commit();
68✔
2609
            // Notify SyncSession a schema migration is required.
2610
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
68✔
2611
        }
68✔
2612
        // Keep the session active to upload any unsynced changes.
2613
        return Status::OK();
68✔
2614
    }
68✔
2615

2616
    m_error_message_received = true;
582✔
2617
    suspend(SessionErrorInfo{info, std::move(status)});
582✔
2618
    return Status::OK();
582✔
2619
}
650✔
2620

2621
void Session::suspend(const SessionErrorInfo& info)
2622
{
662✔
2623
    REALM_ASSERT(!m_suspended);
662✔
2624
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
662✔
2625
    logger.debug("Suspended"); // Throws
662✔
2626

2627
    m_suspended = true;
662✔
2628

2629
    // Detect completion of the unbinding process
2630
    if (m_unbind_message_send_complete && m_error_message_received) {
662✔
2631
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2632
        // we received an ERROR message implies that the deactivation process must
2633
        // have been initiated, so this session must be in the Deactivating state.
2634
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
2✔
2635

2636
        // The deactivation process completes when the unbinding process
2637
        // completes.
2638
        complete_deactivation(); // Throws
2✔
2639
        // Life cycle state is now Deactivated
2640
    }
2✔
2641

2642
    // Notify the application of the suspension of the session if the session is
2643
    // still in the Active state
2644
    if (m_state == Active) {
662✔
2645
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
660✔
2646
        m_conn.one_less_active_unsuspended_session(); // Throws
660✔
2647
        on_suspended(info);                           // Throws
660✔
2648
    }
660✔
2649

2650
    if (!info.is_fatal) {
662✔
2651
        begin_resumption_delay(info);
56✔
2652
    }
56✔
2653

2654
    // Ready to send the UNBIND message, if it has not been sent already
2655
    if (!m_unbind_message_sent)
662✔
2656
        ensure_enlisted_to_send(); // Throws
660✔
2657
}
662✔
2658

2659
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2660
{
52✔
2661
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
52✔
2662
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2663
                           [&](const PendingTestCommand& command) {
52✔
2664
                               return command.id == ident;
52✔
2665
                           });
52✔
2666
    if (it == m_pending_test_commands.end()) {
52✔
2667
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2668
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2669
    }
×
2670

2671
    it->promise.emplace_value(std::string{body});
52✔
2672
    m_pending_test_commands.erase(it);
52✔
2673

2674
    return Status::OK();
52✔
2675
}
52✔
2676

2677
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2678
{
56✔
2679
    REALM_ASSERT(!m_try_again_activation_timer);
56✔
2680

2681
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
56✔
2682
                                  error_info.resumption_delay_interval);
56✔
2683
    auto try_again_interval = m_try_again_delay_info.delay_interval();
56✔
2684
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
56✔
2685
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2686
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2687
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2688
        try_again_interval = std::chrono::milliseconds{1000};
24✔
2689
    }
24✔
2690
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
56✔
2691
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
56✔
2692
        if (status == ErrorCodes::OperationAborted)
56✔
2693
            return;
12✔
2694
        else if (!status.is_ok())
44✔
2695
            throw Exception(status);
×
2696

2697
        m_try_again_activation_timer.reset();
44✔
2698
        cancel_resumption_delay();
44✔
2699
    });
44✔
2700
}
56✔
2701

2702
void Session::clear_resumption_delay_state()
2703
{
47,436✔
2704
    if (m_try_again_activation_timer) {
47,436✔
2705
        logger.debug("Clearing resumption delay state after successful download");
×
2706
        m_try_again_delay_info.reset();
×
2707
    }
×
2708
}
47,436✔
2709

2710
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2711
{
47,452✔
2712
    const SyncProgress& a = m_progress;
47,452✔
2713
    const SyncProgress& b = progress;
47,452✔
2714
    std::string message;
47,452✔
2715
    if (b.latest_server_version.version < a.latest_server_version.version) {
47,452✔
2716
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2717
                               "session (current: %1, received: %2)",
×
2718
                               a.latest_server_version.version, b.latest_server_version.version);
×
2719
    }
×
2720
    if (b.upload.client_version < a.upload.client_version) {
47,452✔
2721
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2722
                               "throughout a session (current: %1, received: %2)",
×
2723
                               a.upload.client_version, b.upload.client_version);
×
2724
    }
×
2725
    if (b.upload.client_version > m_last_version_available) {
47,452✔
2726
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2727
                               "version in existence (current: %1, received: %2)",
×
2728
                               m_last_version_available, b.upload.client_version);
×
2729
    }
×
2730
    if (b.download.server_version < a.download.server_version) {
47,452✔
2731
        message =
×
2732
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2733
                         a.download.server_version, b.download.server_version);
×
2734
    }
×
2735
    if (b.download.server_version > b.latest_server_version.version) {
47,452✔
2736
        message = util::format(
×
2737
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2738
            b.download.server_version, b.latest_server_version.version);
×
2739
    }
×
2740
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
47,452✔
2741
        message = util::format(
×
2742
            "Last integrated client version on the server at the position in the server's history of the download "
×
2743
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2744
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2745
    }
×
2746
    if (b.download.last_integrated_client_version > b.upload.client_version) {
47,452✔
2747
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2748
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2749
                               "on the server (download: %1, upload: %2)",
×
2750
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2751
    }
×
2752
    if (b.download.server_version < b.upload.last_integrated_server_version) {
47,452✔
2753
        message = util::format(
×
2754
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2755
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2756
            b.download.server_version, b.upload.last_integrated_server_version);
×
2757
    }
×
2758

2759
    if (message.empty()) {
47,452✔
2760
        return Status::OK();
47,446✔
2761
    }
47,446✔
2762
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
6✔
2763
}
47,452✔
2764

2765

2766
void Session::check_for_upload_completion()
2767
{
79,868✔
2768
    REALM_ASSERT_EX(m_state == Active, m_state);
79,868✔
2769
    if (!m_upload_completion_notification_requested) {
79,868✔
2770
        return;
47,810✔
2771
    }
47,810✔
2772

2773
    // during an ongoing client reset operation, we never upload anything
2774
    if (m_performing_client_reset)
32,058✔
2775
        return;
260✔
2776

2777
    // Upload process must have reached end of history
2778
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
31,798✔
2779
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
31,798✔
2780
    if (!scan_complete)
31,798✔
2781
        return;
5,264✔
2782

2783
    // All uploaded changesets must have been acknowledged by the server
2784
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
26,534✔
2785
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
26,534✔
2786
    if (!all_uploads_accepted)
26,534✔
2787
        return;
11,566✔
2788

2789
    m_upload_completion_notification_requested = false;
14,968✔
2790
    on_upload_completion(); // Throws
14,968✔
2791
}
14,968✔
2792

2793

2794
void Session::check_for_download_completion()
2795
{
63,628✔
2796
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
63,628✔
2797
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
63,628✔
2798
    if (m_last_download_mark_received == m_last_triggering_download_mark)
63,628✔
2799
        return;
46,908✔
2800
    if (m_last_download_mark_received < m_target_download_mark)
16,720✔
2801
        return;
418✔
2802
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,302✔
2803
        return;
×
2804
    m_last_triggering_download_mark = m_target_download_mark;
16,302✔
2805
    if (REALM_UNLIKELY(!m_allow_upload)) {
16,302✔
2806
        // Activate the upload process now, and enable immediate reactivation
2807
        // after a subsequent fast reconnect.
2808
        m_allow_upload = true;
4,646✔
2809
        ensure_enlisted_to_send(); // Throws
4,646✔
2810
    }
4,646✔
2811
    on_download_completion(); // Throws
16,302✔
2812
}
16,302✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc