• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2085

01 Mar 2024 12:26PM UTC coverage: 90.926% (-0.001%) from 90.927%
2085

push

Evergreen

jedelbo
Avoid doing unneeded logger work in Replication

Most of the replication log statements do some work including memory
allocations which are then thrown away if the log level it too high, so always
check the log level first. A few places don't actually benefit from this, but
it's easier to consistently check the log level every time.

93986 of 173116 branches covered (54.29%)

63 of 100 new or added lines in 2 files covered. (63.0%)

114 existing lines in 17 files now uncovered.

238379 of 262169 relevant lines covered (90.93%)

6007877.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.34
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,896✔
51
    m_backoff_state.reset();
1,896✔
52
    scheduled_reset = false;
1,896✔
53
}
1,896✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,554✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,554✔
60
}
3,554✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
5,768✔
65
    if (scheduled_reset) {
5,768✔
66
        reset();
4✔
67
    }
4✔
68

3,056✔
69
    if (!m_backoff_state.triggering_error) {
5,768✔
70
        return std::chrono::milliseconds::zero();
4,446✔
71
    }
4,446✔
72

740✔
73
    switch (*m_backoff_state.triggering_error) {
1,322✔
74
        case ConnectionTerminationReason::closed_voluntarily:
80✔
75
            return std::chrono::milliseconds::zero();
80✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,218✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,218✔
80
                return std::chrono::milliseconds::max();
978✔
81
            }
978✔
82

122✔
83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
240✔
84
            return m_backoff_state.delay_interval();
240✔
85
    }
1,322✔
86
}
1,322✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
3,776✔
92
    util::Uri uri(url); // Throws
3,776✔
93
    uri.canonicalize(); // Throws
3,776✔
94
    std::string userinfo, address_2, port_2;
3,776✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,776✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,776✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,776✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,776✔
99
    if (REALM_UNLIKELY(!good))
3,776✔
100
        return false;
1,714✔
101
    ProtocolEnvelope protocol_2;
3,776✔
102
    port_type port_3;
3,776✔
103
    if (realm_scheme) {
3,776✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
3,776✔
114
        REALM_ASSERT(ws_scheme);
3,776✔
115
        if (uri.get_scheme() == "ws:") {
3,776✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,764✔
117
            port_3 = 80;
3,764✔
118
        }
3,764✔
119
        else {
12✔
120
            protocol_2 = ProtocolEnvelope::wss;
12✔
121
            port_3 = 443;
12✔
122
        }
12✔
123
    }
3,776✔
124
    if (!port_2.empty()) {
3,776✔
125
        std::istringstream in(port_2);    // Throws
3,776✔
126
        in.imbue(std::locale::classic()); // Throws
3,776✔
127
        in >> port_3;
3,776✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,776✔
129
            return false;
1,714✔
130
    }
3,776✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
3,776✔
132

1,714✔
133
    protocol = protocol_2;
3,776✔
134
    address = std::move(address_2);
3,776✔
135
    port = port_3;
3,776✔
136
    path = std::move(path_2);
3,776✔
137
    return true;
3,776✔
138
}
3,776✔
139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
142
    , logger{*logger_ptr}
143
    , m_reconnect_mode{config.reconnect_mode}
144
    , m_connect_timeout{config.connect_timeout}
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
151
    , m_dry_run{config.dry_run}
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
156
    , m_socket_provider{std::move(config.socket_provider)}
157
    , m_client_protocol{} // Throws
158
    , m_one_connection_per_session{config.one_connection_per_session}
159
    , m_random{}
160
{
9,598✔
161
    // FIXME: Would be better if seeding was up to the application.
4,730✔
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,598✔
163

4,730✔
164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,598✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,598✔
166
                 get_current_protocol_version()); // Throws
9,598✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,598✔
168
    const char* build_mode;
9,598✔
169
#if REALM_DEBUG
9,598✔
170
    build_mode = "Debug";
9,598✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,598✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,598✔
176
                 config.one_connection_per_session); // Throws
9,598✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,598✔
178
                 config.connect_timeout); // Throws
9,598✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,598✔
180
                 config.connection_linger_time); // Throws
9,598✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,598✔
182
                 config.ping_keepalive_period); // Throws
9,598✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,598✔
184
                 config.pong_keepalive_timeout); // Throws
9,598✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,598✔
186
                 config.fast_reconnect_limit); // Throws
9,598✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,598✔
188
                 config.disable_upload_compaction); // Throws
9,598✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,598✔
190
                 config.disable_sync_to_disk); // Throws
9,598✔
191
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
9,598✔
192
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,598✔
193
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
9,598✔
194
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
9,598✔
195

4,730✔
196
    if (config.reconnect_mode != ReconnectMode::normal) {
9,598✔
197
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
768✔
198
                    "never do this in production!");
768✔
199
    }
768✔
200

4,730✔
201
    if (config.dry_run) {
9,598✔
202
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
203
                    "never do this in production!");
×
204
    }
×
205

4,730✔
206
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,598✔
207

4,730✔
208
    if (m_one_connection_per_session) {
9,598✔
209
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
210
                    "never do this in production");
4✔
211
    }
4✔
212

4,730✔
213
    if (config.disable_upload_activation_delay) {
9,598✔
214
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
215
                    "never do this in production");
×
216
    }
×
217

4,730✔
218
    if (config.disable_sync_to_disk) {
9,598✔
219
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
220
                    "never do this in production");
×
221
    }
×
222

4,730✔
223
    m_actualize_and_finalize = create_trigger([this](Status status) {
15,964✔
224
        if (status == ErrorCodes::OperationAborted)
15,964✔
225
            return;
×
226
        else if (!status.is_ok())
15,964✔
227
            throw Exception(status);
×
228
        actualize_and_finalize_session_wrappers(); // Throws
15,964✔
229
    });
15,964✔
230
}
9,598✔
231

232

233
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
234
{
176,670✔
235
    REALM_ASSERT(m_socket_provider);
176,670✔
236
    {
176,670✔
237
        std::lock_guard lock(m_drain_mutex);
176,670✔
238
        ++m_outstanding_posts;
176,670✔
239
        m_drained = false;
176,670✔
240
    }
176,670✔
241
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
176,668✔
242
        auto decr_guard = util::make_scope_exit([&]() noexcept {
176,670✔
243
            std::lock_guard lock(m_drain_mutex);
176,670✔
244
            REALM_ASSERT(m_outstanding_posts);
176,670✔
245
            --m_outstanding_posts;
176,670✔
246
            m_drain_cv.notify_all();
176,670✔
247
        });
176,670✔
248
        handler(status);
176,666✔
249
    });
176,666✔
250
}
176,670✔
251

252

253
void ClientImpl::drain_connections()
254
{
9,598✔
255
    logger.debug("Draining connections during sync client shutdown");
9,598✔
256
    for (auto& server_slot_pair : m_server_slots) {
6,062✔
257
        auto& server_slot = server_slot_pair.second;
2,530✔
258

1,198✔
259
        if (server_slot.connection) {
2,530✔
260
            auto& conn = server_slot.connection;
2,362✔
261
            conn->force_close();
2,362✔
262
        }
2,362✔
263
        else {
168✔
264
            for (auto& conn_pair : server_slot.alt_connections) {
84✔
UNCOV
265
                conn_pair.second->force_close();
×
UNCOV
266
            }
×
267
        }
168✔
268
    }
2,530✔
269
}
9,598✔
270

271

272
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
273
                                                       SyncSocketProvider::FunctionHandler&& handler)
274
{
16,838✔
275
    REALM_ASSERT(m_socket_provider);
16,838✔
276
    {
16,838✔
277
        std::lock_guard lock(m_drain_mutex);
16,838✔
278
        ++m_outstanding_posts;
16,838✔
279
        m_drained = false;
16,838✔
280
    }
16,838✔
281
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,840✔
282
        handler(status);
16,834✔
283

8,358✔
284
        std::lock_guard lock(m_drain_mutex);
16,834✔
285
        REALM_ASSERT(m_outstanding_posts);
16,834✔
286
        --m_outstanding_posts;
16,834✔
287
        m_drain_cv.notify_all();
16,834✔
288
    });
16,834✔
289
}
16,838✔
290

291

292
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
293
{
12,244✔
294
    REALM_ASSERT(m_socket_provider);
12,244✔
295
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,244✔
296
}
12,244✔
297

298
Connection::~Connection()
299
{
2,650✔
300
    if (m_websocket_sentinel) {
2,650✔
301
        m_websocket_sentinel->destroyed = true;
×
302
        m_websocket_sentinel.reset();
×
303
    }
×
304
}
2,650✔
305

306
void Connection::activate()
307
{
2,644✔
308
    REALM_ASSERT(m_on_idle);
2,644✔
309
    m_activated = true;
2,644✔
310
    if (m_num_active_sessions == 0)
2,644✔
311
        m_on_idle->trigger();
×
312
    // We cannot in general connect immediately, because a prior failure to
1,254✔
313
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,254✔
314
    initiate_reconnect_wait(); // Throws
2,644✔
315
}
2,644✔
316

317

318
void Connection::activate_session(std::unique_ptr<Session> sess)
319
{
10,038✔
320
    REALM_ASSERT(sess);
10,038✔
321
    REALM_ASSERT(&sess->m_conn == this);
10,038✔
322
    REALM_ASSERT(!m_force_closed);
10,038✔
323
    Session& sess_2 = *sess;
10,038✔
324
    session_ident_type ident = sess->m_ident;
10,038✔
325
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,038✔
326
    bool was_inserted = p.second;
10,038✔
327
    REALM_ASSERT(was_inserted);
10,038✔
328
    // Save the session ident to the historical list of session idents
4,842✔
329
    m_session_history.insert(ident);
10,038✔
330
    sess_2.activate(); // Throws
10,038✔
331
    if (m_state == ConnectionState::connected) {
10,038✔
332
        bool fast_reconnect = false;
6,972✔
333
        sess_2.connection_established(fast_reconnect); // Throws
6,972✔
334
    }
6,972✔
335
    ++m_num_active_sessions;
10,038✔
336
}
10,038✔
337

338

339
void Connection::initiate_session_deactivation(Session* sess)
340
{
10,040✔
341
    REALM_ASSERT(sess);
10,040✔
342
    REALM_ASSERT(&sess->m_conn == this);
10,040✔
343
    REALM_ASSERT(m_num_active_sessions);
10,040✔
344
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,844✔
345
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,844✔
346
    // decrementing the num active sessions value.
4,844✔
347
    sess->initiate_deactivation(); // Throws
10,040✔
348
    if (sess->m_state == Session::Deactivated) {
10,040✔
349
        finish_session_deactivation(sess);
1,006✔
350
    }
1,006✔
351
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,040✔
352
        if (m_activated && m_state == ConnectionState::disconnected)
4,366✔
353
            m_on_idle->trigger();
330✔
354
    }
4,366✔
355
}
10,040✔
356

357

358
void Connection::cancel_reconnect_delay()
359
{
2,108✔
360
    REALM_ASSERT(m_activated);
2,108✔
361

1,214✔
362
    if (m_reconnect_delay_in_progress) {
2,108✔
363
        if (m_nonzero_reconnect_delay)
1,888✔
364
            logger.detail("Canceling reconnect delay"); // Throws
948✔
365

1,104✔
366
        // Cancel the in-progress wait operation by destroying the timer
1,104✔
367
        // object. Destruction is needed in this case, because a new wait
1,104✔
368
        // operation might have to be initiated before the previous one
1,104✔
369
        // completes (its completion handler starts to execute), so the new wait
1,104✔
370
        // operation must be done on a new timer object.
1,104✔
371
        m_reconnect_disconnect_timer.reset();
1,888✔
372
        m_reconnect_delay_in_progress = false;
1,888✔
373
        m_reconnect_info.reset();
1,888✔
374
        initiate_reconnect_wait(); // Throws
1,888✔
375
        return;
1,888✔
376
    }
1,888✔
377

110✔
378
    // If we are not disconnected, then we need to make sure the next time we get disconnected
110✔
379
    // that we are allowed to re-connect as quickly as possible.
110✔
380
    //
110✔
381
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
110✔
382
    // backoff/delay state before calculating the next delay, unless a PONG message is received
110✔
383
    // for the urgent PING message we send below.
110✔
384
    //
110✔
385
    // If we get a PONG message for the urgent PING message sent below, then the connection is
110✔
386
    // healthy and we can calculate the next delay normally.
110✔
387
    if (m_state != ConnectionState::disconnected) {
220✔
388
        m_reconnect_info.scheduled_reset = true;
220✔
389
        m_ping_after_scheduled_reset_of_reconnect_info = false;
220✔
390

110✔
391
        schedule_urgent_ping(); // Throws
220✔
392
        return;
220✔
393
    }
220✔
394
    // Nothing to do in this case. The next reconnect attemp will be made as
110✔
395
    // soon as there are any sessions that are both active and unsuspended.
110✔
396
}
220✔
397

398
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
399
{
8,284✔
400
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,284✔
401
    auto ident = sess->m_ident;
8,284✔
402
    m_sessions.erase(ident);
8,284✔
403
    m_session_history.erase(ident);
8,284✔
404
}
8,284✔
405

406
void Connection::force_close()
407
{
2,362✔
408
    if (m_force_closed) {
2,362✔
409
        return;
×
410
    }
×
411

1,114✔
412
    m_force_closed = true;
2,362✔
413

1,114✔
414
    if (m_state != ConnectionState::disconnected) {
2,362✔
415
        voluntary_disconnect();
2,324✔
416
    }
2,324✔
417

1,114✔
418
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,362✔
419
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,362✔
420
        m_reconnect_disconnect_timer.reset();
40✔
421
        m_reconnect_delay_in_progress = false;
40✔
422
        m_disconnect_delay_in_progress = false;
40✔
423
    }
40✔
424

1,114✔
425
    // We must copy any session pointers we want to close to a vector because force_closing
1,114✔
426
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,114✔
427
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,114✔
428
    std::vector<Session*> to_close;
2,362✔
429
    for (auto& session_pair : m_sessions) {
1,166✔
430
        if (session_pair.second->m_state == Session::State::Active) {
106✔
431
            to_close.push_back(session_pair.second.get());
106✔
432
        }
106✔
433
    }
106✔
434

1,114✔
435
    for (auto& sess : to_close) {
1,166✔
436
        sess->force_close();
106✔
437
    }
106✔
438

1,114✔
439
    logger.debug("Force closed idle connection");
2,362✔
440
}
2,362✔
441

442

443
void Connection::websocket_connected_handler(const std::string& protocol)
444
{
3,442✔
445
    if (!protocol.empty()) {
3,442✔
446
        std::string_view expected_prefix =
3,442✔
447
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,108✔
448
        // FIXME: Use std::string_view::begins_with() in C++20.
1,736✔
449
        auto prefix_matches = [&](std::string_view other) {
3,442✔
450
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,442✔
451
        };
3,442✔
452
        if (prefix_matches(expected_prefix)) {
3,442✔
453
            util::MemoryInputStream in;
3,442✔
454
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,442✔
455
            in.imbue(std::locale::classic());
3,442✔
456
            in.unsetf(std::ios_base::skipws);
3,442✔
457
            int value_2 = 0;
3,442✔
458
            in >> value_2;
3,442✔
459
            if (in && in.eof() && value_2 >= 0) {
3,442✔
460
                bool good_version =
3,442✔
461
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,442✔
462
                if (good_version) {
3,442✔
463
                    logger.detail("Negotiated protocol version: %1", value_2);
3,442✔
464
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,736✔
465
                    // will provide the appservices connection ID via a log message.
1,736✔
466
                    // TODO: Remove once the server starts sending the connection ID
1,736✔
467
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,442✔
468
                    m_negotiated_protocol_version = value_2;
3,442✔
469
                    handle_connection_established(); // Throws
3,442✔
470
                    return;
3,442✔
471
                }
3,442✔
472
            }
×
473
        }
3,442✔
474
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
475
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
476
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
477
    }
×
478
    else {
×
479
        close_due_to_client_side_error(
×
480
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
481
            ConnectionTerminationReason::bad_headers_in_http_response);
×
482
    }
×
483
}
3,442✔
484

485

486
bool Connection::websocket_binary_message_received(util::Span<const char> data)
487
{
82,216✔
488
    if (m_force_closed) {
82,216✔
489
        logger.debug("Received binary message after connection was force closed");
×
490
        return false;
×
491
    }
×
492

38,546✔
493
    using sf = SimulatedFailure;
82,216✔
494
    if (sf::check_trigger(sf::sync_client__read_head)) {
82,216✔
495
        close_due_to_client_side_error(
478✔
496
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
478✔
497
            ConnectionTerminationReason::read_or_write_error);
478✔
498
        return bool(m_websocket);
478✔
499
    }
478✔
500

38,242✔
501
    handle_message_received(data);
81,738✔
502
    return bool(m_websocket);
81,738✔
503
}
81,738✔
504

505

506
void Connection::websocket_error_handler()
507
{
556✔
508
    m_websocket_error_received = true;
556✔
509
}
556✔
510

511
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
512
{
668✔
513
    if (m_force_closed) {
668✔
514
        logger.debug("Received websocket close message after connection was force closed");
×
515
        return false;
×
516
    }
×
517
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
668✔
518

346✔
519
    switch (error_code) {
668✔
520
        case WebSocketError::websocket_ok:
60✔
521
            break;
60✔
522
        case WebSocketError::websocket_resolve_failed:
4✔
523
            [[fallthrough]];
4✔
524
        case WebSocketError::websocket_connection_failed: {
20✔
525
            SessionErrorInfo error_info(
20✔
526
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
20✔
527
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
20✔
528
            break;
20✔
529
        }
4✔
530
        case WebSocketError::websocket_read_error:
526✔
531
            [[fallthrough]];
526✔
532
        case WebSocketError::websocket_write_error: {
526✔
533
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
526✔
534
                                         ConnectionTerminationReason::read_or_write_error);
526✔
535
            break;
526✔
536
        }
526✔
537
        case WebSocketError::websocket_going_away:
276✔
538
            [[fallthrough]];
×
539
        case WebSocketError::websocket_protocol_error:
✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_unsupported_data:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_invalid_payload_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_policy_violation:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_reserved:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_no_status_received:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_invalid_extension: {
✔
552
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
553
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
554
            break;
×
555
        }
×
556
        case WebSocketError::websocket_message_too_big: {
4✔
557
            auto message = util::format(
4✔
558
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
559
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
560
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
561
            involuntary_disconnect(std::move(error_info),
4✔
562
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
563
            break;
4✔
564
        }
×
565
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
566
            close_due_to_client_side_error(
10✔
567
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
568
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
569
            break;
10✔
570
        }
×
571
        case WebSocketError::websocket_client_too_old:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_client_too_new:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_protocol_mismatch: {
✔
576
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
577
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
578
            break;
×
579
        }
×
580
        case WebSocketError::websocket_fatal_error: {
✔
581
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
582
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
583
            break;
×
584
        }
×
585
        case WebSocketError::websocket_forbidden: {
✔
586
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
587
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
588
            involuntary_disconnect(std::move(error_info),
×
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
590
            break;
×
591
        }
×
592
        case WebSocketError::websocket_unauthorized: {
36✔
593
            SessionErrorInfo error_info(
36✔
594
                {ErrorCodes::AuthError,
36✔
595
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
596
                IsFatal{false});
36✔
597
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
598
            involuntary_disconnect(std::move(error_info),
36✔
599
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
600
            break;
36✔
601
        }
×
602
        case WebSocketError::websocket_moved_permanently: {
12✔
603
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
605
            involuntary_disconnect(std::move(error_info),
12✔
606
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
607
            break;
12✔
608
        }
×
609
        case WebSocketError::websocket_abnormal_closure: {
✔
610
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
612
            involuntary_disconnect(std::move(error_info),
×
613
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_internal_server_error:
✔
617
            [[fallthrough]];
×
618
        case WebSocketError::websocket_retry_error: {
✔
619
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
621
            break;
×
622
        }
668✔
623
    }
668✔
624

346✔
625
    return bool(m_websocket);
668✔
626
}
668✔
627

628
// Guarantees that handle_reconnect_wait() is never called from within the
629
// execution of initiate_reconnect_wait() (no callback reentrance).
630
void Connection::initiate_reconnect_wait()
631
{
8,088✔
632
    REALM_ASSERT(m_activated);
8,088✔
633
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,088✔
634
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,088✔
635

4,146✔
636
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
4,146✔
637
    if (m_force_closed) {
8,088✔
638
        return;
2,318✔
639
    }
2,318✔
640

3,056✔
641
    m_reconnect_delay_in_progress = true;
5,770✔
642
    auto delay = m_reconnect_info.delay_interval();
5,770✔
643
    if (delay == std::chrono::milliseconds::max()) {
5,770✔
644
        logger.detail("Reconnection delayed indefinitely"); // Throws
996✔
645
        // Not actually starting a timer corresponds to an infinite wait
578✔
646
        m_nonzero_reconnect_delay = true;
996✔
647
        return;
996✔
648
    }
996✔
649

2,478✔
650
    if (delay == std::chrono::milliseconds::zero()) {
4,774✔
651
        m_nonzero_reconnect_delay = false;
4,528✔
652
    }
4,528✔
653
    else {
246✔
654
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
246✔
655
        m_nonzero_reconnect_delay = true;
246✔
656
    }
246✔
657

2,478✔
658
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,478✔
659
    // we need it to be cancelable in case the connection is terminated before the timer
2,478✔
660
    // callback is run.
2,478✔
661
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,772✔
662
        // If the operation is aborted, the connection object may have been
2,478✔
663
        // destroyed.
2,478✔
664
        if (status != ErrorCodes::OperationAborted)
4,772✔
665
            handle_reconnect_wait(status); // Throws
3,552✔
666
    });                                    // Throws
4,772✔
667
}
4,774✔
668

669

670
void Connection::handle_reconnect_wait(Status status)
671
{
3,554✔
672
    if (!status.is_ok()) {
3,554✔
673
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
674
        throw Exception(status);
×
675
    }
×
676

1,790✔
677
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,554✔
678
    m_reconnect_delay_in_progress = false;
3,554✔
679

1,790✔
680
    if (m_num_active_unsuspended_sessions > 0)
3,554✔
681
        initiate_reconnect(); // Throws
3,546✔
682
}
3,554✔
683

684
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
685
    explicit WebSocketObserverShim(Connection* conn)
686
        : conn(conn)
687
        , sentinel(conn->m_websocket_sentinel)
688
    {
3,554✔
689
    }
3,554✔
690

691
    Connection* conn;
692
    util::bind_ptr<LifecycleSentinel> sentinel;
693

694
    void websocket_connected_handler(const std::string& protocol) override
695
    {
3,442✔
696
        if (sentinel->destroyed) {
3,442✔
697
            return;
×
698
        }
×
699

1,736✔
700
        return conn->websocket_connected_handler(protocol);
3,442✔
701
    }
3,442✔
702

703
    void websocket_error_handler() override
704
    {
556✔
705
        if (sentinel->destroyed) {
556✔
706
            return;
×
707
        }
×
708

292✔
709
        conn->websocket_error_handler();
556✔
710
    }
556✔
711

712
    bool websocket_binary_message_received(util::Span<const char> data) override
713
    {
82,214✔
714
        if (sentinel->destroyed) {
82,214✔
715
            return false;
×
716
        }
×
717

38,544✔
718
        return conn->websocket_binary_message_received(data);
82,214✔
719
    }
82,214✔
720

721
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
722
    {
668✔
723
        if (sentinel->destroyed) {
668✔
724
            return true;
×
725
        }
×
726

346✔
727
        return conn->websocket_closed_handler(was_clean, error_code, msg);
668✔
728
    }
668✔
729
};
730

731
void Connection::initiate_reconnect()
732
{
3,554✔
733
    REALM_ASSERT(m_activated);
3,554✔
734

1,790✔
735
    m_state = ConnectionState::connecting;
3,554✔
736
    report_connection_state_change(ConnectionState::connecting); // Throws
3,554✔
737
    if (m_websocket_sentinel) {
3,554✔
738
        m_websocket_sentinel->destroyed = true;
×
739
    }
×
740
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,554✔
741
    m_websocket.reset();
3,554✔
742

1,790✔
743
    // Watchdog
1,790✔
744
    initiate_connect_wait(); // Throws
3,554✔
745

1,790✔
746
    std::vector<std::string> sec_websocket_protocol;
3,554✔
747
    {
3,554✔
748
        auto protocol_prefix =
3,554✔
749
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,204✔
750
        int min = get_oldest_supported_protocol_version();
3,554✔
751
        int max = get_current_protocol_version();
3,554✔
752
        REALM_ASSERT_3(min, <=, max);
3,554✔
753
        // List protocol version in descending order to ensure that the server
1,790✔
754
        // selects the highest possible version.
1,790✔
755
        for (int version = max; version >= min; --version) {
39,078✔
756
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
35,524✔
757
        }
35,524✔
758
    }
3,554✔
759

1,790✔
760
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,554✔
761
                m_server_endpoint.port, m_http_request_path_prefix);
3,554✔
762

1,790✔
763
    m_websocket_error_received = false;
3,554✔
764
    m_websocket =
3,554✔
765
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,554✔
766
                                            WebSocketEndpoint{
3,554✔
767
                                                m_server_endpoint.address,
3,554✔
768
                                                m_server_endpoint.port,
3,554✔
769
                                                get_http_request_path(),
3,554✔
770
                                                std::move(sec_websocket_protocol),
3,554✔
771
                                                is_ssl(m_server_endpoint.envelope),
3,554✔
772
                                                /// DEPRECATED - The following will be removed in a future release
1,790✔
773
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,554✔
774
                                                m_verify_servers_ssl_certificate,
3,554✔
775
                                                m_ssl_trust_certificate_path,
3,554✔
776
                                                m_ssl_verify_callback,
3,554✔
777
                                                m_proxy_config,
3,554✔
778
                                            });
3,554✔
779
}
3,554✔
780

781

782
void Connection::initiate_connect_wait()
783
{
3,554✔
784
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,790✔
785
    // fully establish the connection (including SSL and WebSocket
1,790✔
786
    // handshakes). Without such a watchdog, connect operations could take very
1,790✔
787
    // long, or even indefinite time.
1,790✔
788
    milliseconds_type time = m_client.m_connect_timeout;
3,554✔
789

1,790✔
790
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,554✔
791
        // If the operation is aborted, the connection object may have been
1,790✔
792
        // destroyed.
1,790✔
793
        if (status != ErrorCodes::OperationAborted)
3,554✔
794
            handle_connect_wait(status); // Throws
×
795
    });                                  // Throws
3,554✔
796
}
3,554✔
797

798

799
void Connection::handle_connect_wait(Status status)
800
{
×
801
    if (!status.is_ok()) {
×
802
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
803
        throw Exception(status);
×
804
    }
×
805

806
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
807
    logger.info("Connect timeout"); // Throws
×
808
    involuntary_disconnect(
×
809
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
810
                         IsFatal{false}},
×
811
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
812
}
×
813

814

815
void Connection::handle_connection_established()
816
{
3,442✔
817
    // Cancel connect timeout watchdog
1,736✔
818
    m_connect_timer.reset();
3,442✔
819

1,736✔
820
    m_state = ConnectionState::connected;
3,442✔
821

1,736✔
822
    milliseconds_type now = monotonic_clock_now();
3,442✔
823
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,442✔
824
    initiate_ping_delay(now);     // Throws
3,442✔
825

1,736✔
826
    bool fast_reconnect = false;
3,442✔
827
    if (m_disconnect_has_occurred) {
3,442✔
828
        milliseconds_type time = now - m_disconnect_time;
1,018✔
829
        if (time <= m_client.m_fast_reconnect_limit)
1,018✔
830
            fast_reconnect = true;
1,018✔
831
    }
1,018✔
832

1,736✔
833
    for (auto& p : m_sessions) {
4,540✔
834
        Session& sess = *p.second;
4,540✔
835
        sess.connection_established(fast_reconnect); // Throws
4,540✔
836
    }
4,540✔
837

1,736✔
838
    report_connection_state_change(ConnectionState::connected); // Throws
3,442✔
839
}
3,442✔
840

841

842
void Connection::schedule_urgent_ping()
843
{
220✔
844
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
220✔
845
    if (m_ping_delay_in_progress) {
220✔
846
        m_heartbeat_timer.reset();
212✔
847
        m_ping_delay_in_progress = false;
212✔
848
        m_minimize_next_ping_delay = true;
212✔
849
        milliseconds_type now = monotonic_clock_now();
212✔
850
        initiate_ping_delay(now); // Throws
212✔
851
        return;
212✔
852
    }
212✔
853
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
8!
854
    if (!m_send_ping)
8!
855
        m_minimize_next_ping_delay = true;
8✔
856
}
8✔
857

858

859
void Connection::initiate_ping_delay(milliseconds_type now)
860
{
3,842✔
861
    REALM_ASSERT(!m_ping_delay_in_progress);
3,842✔
862
    REALM_ASSERT(!m_waiting_for_pong);
3,842✔
863
    REALM_ASSERT(!m_send_ping);
3,842✔
864

1,914✔
865
    milliseconds_type delay = 0;
3,842✔
866
    if (!m_minimize_next_ping_delay) {
3,842✔
867
        delay = m_client.m_ping_keepalive_period;
3,628✔
868
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,810✔
869
        // the first PING message to be sent since the connection was
1,810✔
870
        // established. The purpose of this randomized deduction is to reduce
1,810✔
871
        // the risk of many connections sending PING messages simultaneously to
1,810✔
872
        // the server.
1,810✔
873
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,516✔
874
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,628✔
875
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,628✔
876
        delay -= randomized_deduction;
3,628✔
877
        // Deduct the time spent waiting for PONG
1,810✔
878
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,628✔
879
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,628✔
880
        if (spent_time < delay) {
3,628✔
881
            delay -= spent_time;
3,620✔
882
        }
3,620✔
883
        else {
8✔
884
            delay = 0;
8✔
885
        }
8✔
886
    }
3,628✔
887
    else {
214✔
888
        m_minimize_next_ping_delay = false;
214✔
889
    }
214✔
890

1,914✔
891

1,914✔
892
    m_ping_delay_in_progress = true;
3,842✔
893

1,914✔
894
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,842✔
895
        if (status == ErrorCodes::OperationAborted)
3,840✔
896
            return;
3,632✔
897
        else if (!status.is_ok())
208✔
898
            throw Exception(status);
×
899

82✔
900
        handle_ping_delay();                                    // Throws
208✔
901
    });                                                         // Throws
208✔
902
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,842✔
903
}
3,842✔
904

905

906
void Connection::handle_ping_delay()
907
{
208✔
908
    REALM_ASSERT(m_ping_delay_in_progress);
208✔
909
    m_ping_delay_in_progress = false;
208✔
910
    m_send_ping = true;
208✔
911

82✔
912
    initiate_pong_timeout(); // Throws
208✔
913

82✔
914
    if (m_state == ConnectionState::connected && !m_sending)
208✔
915
        send_next_message(); // Throws
172✔
916
}
208✔
917

918

919
void Connection::initiate_pong_timeout()
920
{
208✔
921
    REALM_ASSERT(!m_ping_delay_in_progress);
208✔
922
    REALM_ASSERT(!m_waiting_for_pong);
208✔
923
    REALM_ASSERT(m_send_ping);
208✔
924

82✔
925
    m_waiting_for_pong = true;
208✔
926
    m_pong_wait_started_at = monotonic_clock_now();
208✔
927

82✔
928
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
208✔
929
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
208✔
930
        if (status == ErrorCodes::OperationAborted)
208✔
931
            return;
196✔
932
        else if (!status.is_ok())
12✔
933
            throw Exception(status);
×
934

6✔
935
        handle_pong_timeout(); // Throws
12✔
936
    });                        // Throws
12✔
937
}
208✔
938

939

940
void Connection::handle_pong_timeout()
941
{
12✔
942
    REALM_ASSERT(m_waiting_for_pong);
12✔
943
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
944
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
945
                                 ConnectionTerminationReason::pong_timeout);
12✔
946
}
12✔
947

948

949
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
950
{
105,292✔
951
    // Stop sending messages if an websocket error was received.
48,580✔
952
    if (m_websocket_error_received)
105,292✔
953
        return;
×
954

48,580✔
955
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
105,292✔
956
        if (sentinel->destroyed) {
105,204✔
957
            return;
1,412✔
958
        }
1,412✔
959
        if (!status.is_ok()) {
103,792✔
960
            if (status != ErrorCodes::Error::OperationAborted) {
×
961
                // Write errors will be handled by the websocket_write_error_handler() callback
962
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
963
            }
×
964
            return;
×
965
        }
×
966
        handle_write_message(); // Throws
103,792✔
967
    });                         // Throws
103,792✔
968
    m_sending_session = sess;
105,292✔
969
    m_sending = true;
105,292✔
970
}
105,292✔
971

972

973
void Connection::handle_write_message()
974
{
103,792✔
975
    m_sending_session->message_sent(); // Throws
103,792✔
976
    if (m_sending_session->m_state == Session::Deactivated) {
103,792✔
977
        finish_session_deactivation(m_sending_session);
142✔
978
    }
142✔
979
    m_sending_session = nullptr;
103,792✔
980
    m_sending = false;
103,792✔
981
    send_next_message(); // Throws
103,792✔
982
}
103,792✔
983

984

985
void Connection::send_next_message()
986
{
166,502✔
987
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
166,502✔
988
    REALM_ASSERT(!m_sending_session);
166,502✔
989
    REALM_ASSERT(!m_sending);
166,502✔
990
    if (m_send_ping) {
166,502✔
991
        send_ping(); // Throws
196✔
992
        return;
196✔
993
    }
196✔
994
    while (!m_sessions_enlisted_to_send.empty()) {
230,274✔
995
        // The state of being connected is not supposed to be able to change
78,768✔
996
        // across this loop thanks to the "no callback reentrance" guarantee
78,768✔
997
        // provided by Websocket::async_write_text(), and friends.
78,768✔
998
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
169,464✔
999

78,768✔
1000
        Session& sess = *m_sessions_enlisted_to_send.front();
169,464✔
1001
        m_sessions_enlisted_to_send.pop_front();
169,464✔
1002
        sess.send_message(); // Throws
169,464✔
1003

78,768✔
1004
        if (sess.m_state == Session::Deactivated) {
169,464✔
1005
            finish_session_deactivation(&sess);
1,458✔
1006
        }
1,458✔
1007

78,768✔
1008
        // An enlisted session may choose to not send a message. In that case,
78,768✔
1009
        // we should pass the opportunity to the next enlisted session.
78,768✔
1010
        if (m_sending)
169,464✔
1011
            break;
105,496✔
1012
    }
169,464✔
1013
}
166,306✔
1014

1015

1016
void Connection::send_ping()
1017
{
196✔
1018
    REALM_ASSERT(!m_ping_delay_in_progress);
196✔
1019
    REALM_ASSERT(m_waiting_for_pong);
196✔
1020
    REALM_ASSERT(m_send_ping);
196✔
1021

76✔
1022
    m_send_ping = false;
196✔
1023
    if (m_reconnect_info.scheduled_reset)
196✔
1024
        m_ping_after_scheduled_reset_of_reconnect_info = true;
180✔
1025

76✔
1026
    m_last_ping_sent_at = monotonic_clock_now();
196✔
1027
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
196✔
1028
                 m_previous_ping_rtt); // Throws
196✔
1029

76✔
1030
    ClientProtocol& protocol = get_client_protocol();
196✔
1031
    OutputBuffer& out = get_output_buffer();
196✔
1032
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
196✔
1033
    initiate_write_ping(out);                                          // Throws
196✔
1034
    m_ping_sent = true;
196✔
1035
}
196✔
1036

1037

1038
void Connection::initiate_write_ping(const OutputBuffer& out)
1039
{
196✔
1040
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
196✔
1041
        if (sentinel->destroyed) {
196✔
1042
            return;
×
1043
        }
×
1044
        if (!status.is_ok()) {
196✔
1045
            if (status != ErrorCodes::Error::OperationAborted) {
×
1046
                // Write errors will be handled by the websocket_write_error_handler() callback
1047
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1048
            }
×
1049
            return;
×
1050
        }
×
1051
        handle_write_ping(); // Throws
196✔
1052
    });                      // Throws
196✔
1053
    m_sending = true;
196✔
1054
}
196✔
1055

1056

1057
void Connection::handle_write_ping()
1058
{
196✔
1059
    REALM_ASSERT(m_sending);
196✔
1060
    REALM_ASSERT(!m_sending_session);
196✔
1061
    m_sending = false;
196✔
1062
    send_next_message(); // Throws
196✔
1063
}
196✔
1064

1065

1066
void Connection::handle_message_received(util::Span<const char> data)
1067
{
81,736✔
1068
    // parse_message_received() parses the message and calls the proper handler
38,242✔
1069
    // on the Connection object (this).
38,242✔
1070
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
81,736✔
1071
}
81,736✔
1072

1073

1074
void Connection::initiate_disconnect_wait()
1075
{
4,400✔
1076
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,400✔
1077

2,072✔
1078
    if (m_disconnect_delay_in_progress) {
4,400✔
1079
        m_reconnect_disconnect_timer.reset();
1,990✔
1080
        m_disconnect_delay_in_progress = false;
1,990✔
1081
    }
1,990✔
1082

2,072✔
1083
    milliseconds_type time = m_client.m_connection_linger_time;
4,400✔
1084

2,072✔
1085
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,402✔
1086
        // If the operation is aborted, the connection object may have been
2,068✔
1087
        // destroyed.
2,068✔
1088
        if (status != ErrorCodes::OperationAborted)
4,398✔
1089
            handle_disconnect_wait(status); // Throws
12✔
1090
    });                                     // Throws
4,398✔
1091
    m_disconnect_delay_in_progress = true;
4,400✔
1092
}
4,400✔
1093

1094

1095
void Connection::handle_disconnect_wait(Status status)
1096
{
12✔
1097
    if (!status.is_ok()) {
12✔
1098
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1099
        throw Exception(status);
×
1100
    }
×
1101

6✔
1102
    m_disconnect_delay_in_progress = false;
12✔
1103

6✔
1104
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1105
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1106
        if (m_client.m_connection_linger_time > 0)
12✔
UNCOV
1107
            logger.detail("Linger time expired"); // Throws
×
1108
        voluntary_disconnect();                   // Throws
12✔
1109
        logger.info("Disconnected");              // Throws
12✔
1110
    }
12✔
1111
}
12✔
1112

1113

1114
void Connection::close_due_to_protocol_error(Status status)
1115
{
20✔
1116
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
20✔
1117
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
20✔
1118
    involuntary_disconnect(std::move(error_info),
20✔
1119
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
20✔
1120
}
20✔
1121

1122

1123
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1124
{
488✔
1125
    logger.info("Connection closed due to error: %1", status); // Throws
488✔
1126

310✔
1127
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
488✔
1128
}
488✔
1129

1130

1131
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1132
{
538✔
1133
    logger.info("Connection closed due to transient error: %1", status); // Throws
538✔
1134
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
538✔
1135
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
538✔
1136

282✔
1137
    involuntary_disconnect(std::move(error_info), reason); // Throw
538✔
1138
}
538✔
1139

1140

1141
// Close connection due to error discovered on the server-side, and then
1142
// reported to the client by way of a connection-level ERROR message.
1143
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1144
{
64✔
1145
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
64✔
1146
                int(error_code)); // Throws
64✔
1147

34✔
1148
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
44✔
1149
                                      : ConnectionTerminationReason::server_said_try_again_later;
54✔
1150
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
64✔
1151
                           reason); // Throws
64✔
1152
}
64✔
1153

1154

1155
void Connection::disconnect(const SessionErrorInfo& info)
1156
{
3,554✔
1157
    // Cancel connect timeout watchdog
1,790✔
1158
    m_connect_timer.reset();
3,554✔
1159

1,790✔
1160
    if (m_state == ConnectionState::connected) {
3,554✔
1161
        m_disconnect_time = monotonic_clock_now();
3,438✔
1162
        m_disconnect_has_occurred = true;
3,438✔
1163

1,736✔
1164
        // Sessions that are in the Deactivating state at this time can be
1,736✔
1165
        // immediately discarded, in part because they are no longer enlisted to
1,736✔
1166
        // send. Such sessions will be taken to the Deactivated state by
1,736✔
1167
        // Session::connection_lost(), and then they will be removed from
1,736✔
1168
        // `m_sessions`.
1,736✔
1169
        auto i = m_sessions.begin(), end = m_sessions.end();
3,438✔
1170
        while (i != end) {
7,328✔
1171
            // Prevent invalidation of the main iterator when erasing elements
2,074✔
1172
            auto j = i++;
3,890✔
1173
            Session& sess = *j->second;
3,890✔
1174
            sess.connection_lost(); // Throws
3,890✔
1175
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,892✔
1176
                m_sessions.erase(j);
1,756✔
1177
        }
3,890✔
1178
    }
3,438✔
1179

1,790✔
1180
    change_state_to_disconnected();
3,554✔
1181

1,790✔
1182
    m_ping_delay_in_progress = false;
3,554✔
1183
    m_waiting_for_pong = false;
3,554✔
1184
    m_send_ping = false;
3,554✔
1185
    m_minimize_next_ping_delay = false;
3,554✔
1186
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,554✔
1187
    m_ping_sent = false;
3,554✔
1188
    m_heartbeat_timer.reset();
3,554✔
1189
    m_previous_ping_rtt = 0;
3,554✔
1190

1,790✔
1191
    m_websocket_sentinel->destroyed = true;
3,554✔
1192
    m_websocket_sentinel.reset();
3,554✔
1193
    m_websocket.reset();
3,554✔
1194
    m_input_body_buffer.reset();
3,554✔
1195
    m_sending_session = nullptr;
3,554✔
1196
    m_sessions_enlisted_to_send.clear();
3,554✔
1197
    m_sending = false;
3,554✔
1198

1,790✔
1199
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,554✔
1200
    initiate_reconnect_wait();                                           // Throws
3,554✔
1201
}
3,554✔
1202

1203
bool Connection::is_flx_sync_connection() const noexcept
1204
{
108,292✔
1205
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
108,292✔
1206
}
108,292✔
1207

1208
void Connection::receive_pong(milliseconds_type timestamp)
1209
{
188✔
1210
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
188✔
1211

76✔
1212
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
188✔
1213
    if (REALM_UNLIKELY(!legal_at_this_time)) {
188✔
1214
        close_due_to_protocol_error(
×
1215
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1216
        return;
×
1217
    }
×
1218

76✔
1219
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
188✔
1220
        close_due_to_protocol_error(
×
1221
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1222
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1223
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1224
        return;
×
1225
    }
×
1226

76✔
1227
    milliseconds_type now = monotonic_clock_now();
188✔
1228
    milliseconds_type round_trip_time = now - timestamp;
188✔
1229
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
188✔
1230
    m_previous_ping_rtt = round_trip_time;
188✔
1231

76✔
1232
    // If this PONG message is a response to a PING mesage that was sent after
76✔
1233
    // the last invocation of cancel_reconnect_delay(), then the connection is
76✔
1234
    // still good, and we do not have to skip the next reconnect delay.
76✔
1235
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
188✔
1236
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
176✔
1237
        m_ping_after_scheduled_reset_of_reconnect_info = false;
176✔
1238
        m_reconnect_info.scheduled_reset = false;
176✔
1239
    }
176✔
1240

76✔
1241
    m_heartbeat_timer.reset();
188✔
1242
    m_waiting_for_pong = false;
188✔
1243

76✔
1244
    initiate_ping_delay(now); // Throws
188✔
1245

76✔
1246
    if (m_client.m_roundtrip_time_handler)
188✔
1247
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1248
}
188✔
1249

1250
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1251
{
73,696✔
1252
    if (session_ident == 0) {
73,696✔
1253
        return nullptr;
×
1254
    }
×
1255

35,140✔
1256
    auto* sess = get_session(session_ident);
73,696✔
1257
    if (REALM_LIKELY(sess)) {
73,696✔
1258
        return sess;
73,692✔
1259
    }
73,692✔
1260
    // Check the history to see if the message received was for a previous session
2✔
1261
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
4✔
1262
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1263
        close_due_to_protocol_error(
×
1264
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1265
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1266
                          session_ident)});
×
1267
    }
×
1268
    else {
4✔
1269
        logger.error("Received %1 message for closed session, session_ident = %2", message,
4✔
1270
                     session_ident); // Throws
4✔
1271
    }
4✔
1272
    return nullptr;
4✔
1273
}
4✔
1274

1275
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1276
{
758✔
1277
    Session* sess = nullptr;
758✔
1278
    if (session_ident != 0) {
758✔
1279
        sess = find_and_validate_session(session_ident, "ERROR");
690✔
1280
        if (REALM_UNLIKELY(!sess)) {
690✔
1281
            return;
×
1282
        }
×
1283
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
690✔
1284
            close_due_to_protocol_error(std::move(status)); // Throws
×
1285
            return;
×
1286
        }
×
1287

338✔
1288
        if (sess->m_state == Session::Deactivated) {
690✔
UNCOV
1289
            finish_session_deactivation(sess);
×
UNCOV
1290
        }
×
1291
        return;
690✔
1292
    }
690✔
1293

36✔
1294
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
68✔
1295
                info.message, info.raw_error_code, info.is_fatal, session_ident,
68✔
1296
                info.server_requests_action); // Throws
68✔
1297

36✔
1298
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
68✔
1299
    if (REALM_LIKELY(known_error_code)) {
68✔
1300
        ProtocolError error_code = ProtocolError(info.raw_error_code);
64✔
1301
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
64✔
1302
            close_due_to_server_side_error(error_code, info); // Throws
64✔
1303
            return;
64✔
1304
        }
64✔
1305
        close_due_to_protocol_error(
×
1306
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1307
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1308
                          info.raw_error_code)});
×
1309
    }
×
1310
    else {
4✔
1311
        close_due_to_protocol_error(
4✔
1312
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1313
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1314
    }
4✔
1315
}
68✔
1316

1317

1318
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1319
                                             session_ident_type session_ident)
1320
{
20✔
1321
    if (session_ident == 0) {
20✔
1322
        return close_due_to_protocol_error(
×
1323
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1324
    }
×
1325

10✔
1326
    if (!is_flx_sync_connection()) {
20✔
1327
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1328
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1329
    }
×
1330

10✔
1331
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1332
    if (REALM_UNLIKELY(!sess)) {
20✔
1333
        return;
×
1334
    }
×
1335

10✔
1336
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1337
        close_due_to_protocol_error(std::move(status));
×
1338
    }
×
1339
}
20✔
1340

1341

1342
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1343
{
3,502✔
1344
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,502✔
1345
    if (REALM_UNLIKELY(!sess)) {
3,502✔
1346
        return;
×
1347
    }
×
1348

1,696✔
1349
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,502✔
1350
        close_due_to_protocol_error(std::move(status)); // Throws
×
1351
}
3,502✔
1352

1353
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1354
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1355
                                          DownloadBatchState batch_state,
1356
                                          const ReceivedChangesets& received_changesets)
1357
{
45,624✔
1358
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
45,624✔
1359
    if (REALM_UNLIKELY(!sess)) {
45,624✔
1360
        return;
×
1361
    }
×
1362

23,014✔
1363
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
45,624✔
1364
                                                     received_changesets);
45,624✔
1365
        !status.is_ok()) {
45,624✔
1366
        close_due_to_protocol_error(std::move(status));
×
1367
    }
×
1368
}
45,624✔
1369

1370
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1371
{
18,128✔
1372
    Session* sess = find_and_validate_session(session_ident, "MARK");
18,128✔
1373
    if (REALM_UNLIKELY(!sess)) {
18,128✔
1374
        return;
×
1375
    }
×
1376

7,994✔
1377
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
18,128✔
1378
        close_due_to_protocol_error(std::move(status)); // Throws
16✔
1379
}
18,128✔
1380

1381

1382
void Connection::receive_unbound_message(session_ident_type session_ident)
1383
{
5,678✔
1384
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
5,678✔
1385
    if (REALM_UNLIKELY(!sess)) {
5,678✔
1386
        return;
×
1387
    }
×
1388

2,062✔
1389
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
5,678✔
1390
        close_due_to_protocol_error(std::move(status)); // Throws
×
1391
        return;
×
1392
    }
×
1393

2,062✔
1394
    if (sess->m_state == Session::Deactivated) {
5,678✔
1395
        finish_session_deactivation(sess);
5,678✔
1396
    }
5,678✔
1397
}
5,678✔
1398

1399

1400
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1401
                                               std::string_view body)
1402
{
52✔
1403
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
52✔
1404
    if (REALM_UNLIKELY(!sess)) {
52✔
1405
        return;
×
1406
    }
×
1407

26✔
1408
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
52✔
1409
        close_due_to_protocol_error(std::move(status));
×
1410
    }
×
1411
}
52✔
1412

1413

1414
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1415
                                            std::string_view message)
1416
{
7,790✔
1417
    std::string prefix;
7,790✔
1418
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
7,790✔
1419
        prefix = util::format("Server[%1]", m_appservices_coid);
7,790✔
1420
    }
7,790✔
1421
    else {
×
1422
        prefix = "Server";
×
1423
    }
×
1424

2,994✔
1425
    if (session_ident != 0) {
7,790✔
1426
        if (auto sess = get_session(session_ident)) {
5,872✔
1427
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
5,872✔
1428
            return;
5,872✔
1429
        }
5,872✔
1430

1431
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
×
1432
                   message);
×
1433
        return;
×
1434
    }
×
1435

984✔
1436
    logger.log(level, "%1 log: %2", prefix, message);
1,918✔
1437
}
1,918✔
1438

1439

1440
void Connection::receive_appservices_request_id(std::string_view coid)
1441
{
5,360✔
1442
    // Only set once per connection
2,718✔
1443
    if (!coid.empty() && m_appservices_coid.empty()) {
5,360✔
1444
        m_appservices_coid = coid;
2,406✔
1445
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,406✔
1446
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,406✔
1447
    }
2,406✔
1448
}
5,360✔
1449

1450

1451
void Connection::handle_protocol_error(Status status)
1452
{
×
1453
    close_due_to_protocol_error(std::move(status));
×
1454
}
×
1455

1456

1457
// Sessions are guaranteed to be granted the opportunity to send a message in
1458
// the order that they enlist. Note that this is important to ensure
1459
// nonoverlapping communication with the server for consecutive sessions
1460
// associated with the same Realm file.
1461
//
1462
// CAUTION: The specified session may get destroyed before this function
1463
// returns, but only if its Session::send_message() puts it into the Deactivated
1464
// state.
1465
void Connection::enlist_to_send(Session* sess)
1466
{
171,066✔
1467
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
171,066✔
1468
    m_sessions_enlisted_to_send.push_back(sess); // Throws
171,066✔
1469
    if (!m_sending)
171,066✔
1470
        send_next_message(); // Throws
62,342✔
1471
}
171,066✔
1472

1473

1474
std::string Connection::get_active_appservices_connection_id()
1475
{
72✔
1476
    return m_appservices_coid;
72✔
1477
}
72✔
1478

1479
void Session::cancel_resumption_delay()
1480
{
4,034✔
1481
    REALM_ASSERT_EX(m_state == Active, m_state);
4,034✔
1482

2,334✔
1483
    if (!m_suspended)
4,034✔
1484
        return;
3,968✔
1485

30✔
1486
    m_suspended = false;
66✔
1487

30✔
1488
    logger.debug("Resumed"); // Throws
66✔
1489

30✔
1490
    if (unbind_process_complete())
66✔
1491
        initiate_rebind(); // Throws
34✔
1492

30✔
1493
    m_conn.one_more_active_unsuspended_session(); // Throws
66✔
1494
    if (m_try_again_activation_timer) {
66✔
1495
        m_try_again_activation_timer.reset();
8✔
1496
    }
8✔
1497

30✔
1498
    on_resumed(); // Throws
66✔
1499
}
66✔
1500

1501

1502
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1503
                                                 std::vector<ProtocolErrorInfo>* out)
1504
{
20,640✔
1505
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
20,640✔
1506
        return;
20,596✔
1507
    }
20,596✔
1508

22✔
1509
#ifdef REALM_DEBUG
44✔
1510
    REALM_ASSERT_DEBUG(
44✔
1511
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
44✔
1512
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
44✔
1513
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
44✔
1514
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1515
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1516
                       }));
44✔
1517
#endif
44✔
1518

22✔
1519
    while (!m_pending_compensating_write_errors.empty() &&
88✔
1520
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1521
               changesets.back().version) {
44✔
1522
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1523
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1524
        out->push_back(std::move(cur_error));
44✔
1525
        m_pending_compensating_write_errors.pop_front();
44✔
1526
    }
44✔
1527
}
44✔
1528

1529

1530
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1531
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1532
                                   DownloadBatchState download_batch_state)
1533
{
43,210✔
1534
    auto& history = get_history();
43,210✔
1535
    if (received_changesets.empty()) {
43,210✔
1536
        if (download_batch_state == DownloadBatchState::MoreToCome) {
22,548✔
1537
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1538
                                       "received empty download message that was not the last in batch",
×
1539
                                       ProtocolError::bad_progress);
×
1540
        }
×
1541
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
22,548✔
1542
        return;
22,548✔
1543
    }
22,548✔
1544

10,916✔
1545
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
20,662✔
1546
    auto transact = get_db()->start_read();
20,662✔
1547
    history.integrate_server_changesets(
20,662✔
1548
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
20,662✔
1549
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
20,652✔
1550
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
20,640✔
1551
        }); // Throws
20,640✔
1552
    if (received_changesets.size() == 1) {
20,662✔
1553
        logger.debug("1 remote changeset integrated, producing client version %1",
14,764✔
1554
                     version_info.sync_version.version); // Throws
14,764✔
1555
    }
14,764✔
1556
    else {
5,898✔
1557
        logger.debug("%2 remote changesets integrated, producing client version %1",
5,898✔
1558
                     version_info.sync_version.version, received_changesets.size()); // Throws
5,898✔
1559
    }
5,898✔
1560

10,916✔
1561
    for (const auto& pending_error : pending_compensating_write_errors) {
10,938✔
1562
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1563
                    pending_error.compensating_write_rejected_client_version,
44✔
1564
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1565
        try {
44✔
1566
            on_connection_state_changed(
44✔
1567
                m_conn.get_state(),
44✔
1568
                SessionErrorInfo{pending_error,
44✔
1569
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1570
                                                          pending_error.message)});
44✔
1571
        }
44✔
1572
        catch (...) {
22✔
1573
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1574
        }
×
1575
    }
44✔
1576
}
20,662✔
1577

1578

1579
void Session::on_integration_failure(const IntegrationException& error)
1580
{
48✔
1581
    REALM_ASSERT_EX(m_state == Active, m_state);
48✔
1582
    REALM_ASSERT(!m_client_error && !m_error_to_send);
48✔
1583
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
48✔
1584

24✔
1585
    m_client_error = util::make_optional<IntegrationException>(error);
48✔
1586
    m_error_to_send = true;
48✔
1587
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
48✔
1588
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
48✔
1589
    // Surface the error to the user otherwise is lost.
24✔
1590
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
48✔
1591

24✔
1592
    // Since the deactivation process has not been initiated, the UNBIND
24✔
1593
    // message cannot have been sent unless an ERROR message was received.
24✔
1594
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
48✔
1595
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
48✔
1596
        ensure_enlisted_to_send(); // Throws
44✔
1597
    }
44✔
1598
}
48✔
1599

1600
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1601
{
44,942✔
1602
    REALM_ASSERT_EX(m_state == Active, m_state);
44,942✔
1603
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
44,942✔
1604
    m_download_progress = progress.download;
44,942✔
1605
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
44,942✔
1606
    m_progress = progress;
44,942✔
1607
    if (upload_progressed) {
44,942✔
1608
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
33,384✔
1609
            if (progress.upload.client_version > m_upload_progress.client_version)
14,998✔
1610
                m_upload_progress = progress.upload;
1,914✔
1611
            m_last_version_selected_for_upload = progress.upload.client_version;
14,998✔
1612
        }
14,998✔
1613

16,690✔
1614
        check_for_upload_completion();
33,384✔
1615
    }
33,384✔
1616

22,588✔
1617
    do_recognize_sync_version(client_version); // Allows upload process to resume
44,942✔
1618
    check_for_download_completion();           // Throws
44,942✔
1619

22,588✔
1620
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
22,588✔
1621
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
44,942✔
1622
        auto& flx_subscription_store = *get_flx_subscription_store();
2,782✔
1623
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,782✔
1624
    }
2,782✔
1625

22,588✔
1626
    // Since the deactivation process has not been initiated, the UNBIND
22,588✔
1627
    // message cannot have been sent unless an ERROR message was received.
22,588✔
1628
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
44,942✔
1629
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
44,942✔
1630
        ensure_enlisted_to_send(); // Throws
44,934✔
1631
    }
44,934✔
1632
}
44,942✔
1633

1634

1635
Session::~Session()
1636
{
10,038✔
1637
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,844✔
1638
}
10,038✔
1639

1640

1641
std::string Session::make_logger_prefix(session_ident_type ident)
1642
{
10,040✔
1643
    std::ostringstream out;
10,040✔
1644
    out.imbue(std::locale::classic());
10,040✔
1645
    out << "Session[" << ident << "]: "; // Throws
10,040✔
1646
    return out.str();                    // Throws
10,040✔
1647
}
10,040✔
1648

1649

1650
void Session::activate()
1651
{
10,040✔
1652
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,040✔
1653

4,844✔
1654
    logger.debug("Activating"); // Throws
10,040✔
1655

4,844✔
1656
    bool has_pending_client_reset = false;
10,040✔
1657
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,040✔
1658
        bool file_exists = util::File::exists(get_realm_path());
10,036✔
1659
        m_performing_client_reset = get_client_reset_config().has_value();
10,036✔
1660

4,842✔
1661
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
10,036✔
1662
        if (!m_performing_client_reset) {
10,036✔
1663
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,674✔
1664
                                     &has_pending_client_reset); // Throws
9,674✔
1665
        }
9,674✔
1666
    }
10,036✔
1667
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,040✔
1668
                 m_client_file_ident.salt); // Throws
10,040✔
1669
    m_upload_progress = m_progress.upload;
10,040✔
1670
    m_last_version_selected_for_upload = m_upload_progress.client_version;
10,040✔
1671
    m_download_progress = m_progress.download;
10,040✔
1672
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,040✔
1673

4,844✔
1674
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
10,040✔
1675
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,040✔
1676
    logger.debug("progress_download_client_version = %1",
10,040✔
1677
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,040✔
1678
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,040✔
1679
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,040✔
1680

4,844✔
1681
    reset_protocol_state();
10,040✔
1682
    m_state = Active;
10,040✔
1683

4,844✔
1684
    call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
10,040✔
1685
                    DownloadBatchState::SteadyState, 0);
10,040✔
1686

4,844✔
1687
    REALM_ASSERT(!m_suspended);
10,040✔
1688
    m_conn.one_more_active_unsuspended_session(); // Throws
10,040✔
1689

4,844✔
1690
    try {
10,040✔
1691
        process_pending_flx_bootstrap();
10,040✔
1692
    }
10,040✔
1693
    catch (const IntegrationException& error) {
4,844✔
1694
        on_integration_failure(error);
×
1695
    }
×
1696
    catch (...) {
4,846✔
1697
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1698
    }
4✔
1699

4,844✔
1700
    if (has_pending_client_reset) {
10,038✔
1701
        handle_pending_client_reset_acknowledgement();
18✔
1702
    }
18✔
1703
}
10,038✔
1704

1705

1706
// The caller (Connection) must discard the session if the session has become
1707
// deactivated upon return.
1708
void Session::initiate_deactivation()
1709
{
10,040✔
1710
    REALM_ASSERT_EX(m_state == Active, m_state);
10,040✔
1711

4,844✔
1712
    logger.debug("Initiating deactivation"); // Throws
10,040✔
1713

4,844✔
1714
    m_state = Deactivating;
10,040✔
1715

4,844✔
1716
    if (!m_suspended)
10,040✔
1717
        m_conn.one_less_active_unsuspended_session(); // Throws
9,430✔
1718

4,844✔
1719
    if (m_enlisted_to_send) {
10,040✔
1720
        REALM_ASSERT(!unbind_process_complete());
4,888✔
1721
        return;
4,888✔
1722
    }
4,888✔
1723

2,276✔
1724
    // Deactivate immediately if the BIND message has not yet been sent and the
2,276✔
1725
    // session is not enlisted to send, or if the unbinding process has already
2,276✔
1726
    // completed.
2,276✔
1727
    if (!m_bind_message_sent || unbind_process_complete()) {
5,152✔
1728
        complete_deactivation(); // Throws
1,006✔
1729
        // Life cycle state is now Deactivated
424✔
1730
        return;
1,006✔
1731
    }
1,006✔
1732

1,852✔
1733
    // Ready to send the UNBIND message, if it has not already been sent
1,852✔
1734
    if (!m_unbind_message_sent) {
4,146✔
1735
        enlist_to_send(); // Throws
3,920✔
1736
        return;
3,920✔
1737
    }
3,920✔
1738
}
4,146✔
1739

1740

1741
void Session::complete_deactivation()
1742
{
10,038✔
1743
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,038✔
1744
    m_state = Deactivated;
10,038✔
1745

4,844✔
1746
    logger.debug("Deactivation completed"); // Throws
10,038✔
1747
}
10,038✔
1748

1749

1750
// Called by the associated Connection object when this session is granted an
1751
// opportunity to send a message.
1752
//
1753
// The caller (Connection) must discard the session if the session has become
1754
// deactivated upon return.
1755
void Session::send_message()
1756
{
169,458✔
1757
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
169,458✔
1758
    REALM_ASSERT(m_enlisted_to_send);
169,458✔
1759
    m_enlisted_to_send = false;
169,458✔
1760
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
169,458✔
1761
        // Deactivation has been initiated. If the UNBIND message has not been
4,466✔
1762
        // sent yet, there is no point in sending it. Instead, we can let the
4,466✔
1763
        // deactivation process complete.
4,466✔
1764
        if (!m_bind_message_sent) {
9,138✔
1765
            return complete_deactivation(); // Throws
1,458✔
1766
            // Life cycle state is now Deactivated
1,448✔
1767
        }
1,458✔
1768

3,018✔
1769
        // Session life cycle state is Deactivating or the unbinding process has
3,018✔
1770
        // been initiated by a session specific ERROR message
3,018✔
1771
        if (!m_unbind_message_sent)
7,680✔
1772
            send_unbind_message(); // Throws
7,680✔
1773
        return;
7,680✔
1774
    }
7,680✔
1775

74,298✔
1776
    // Session life cycle state is Active and the unbinding process has
74,298✔
1777
    // not been initiated
74,298✔
1778
    REALM_ASSERT(!m_unbind_message_sent);
160,320✔
1779

74,298✔
1780
    if (!m_bind_message_sent)
160,320✔
1781
        return send_bind_message(); // Throws
10,070✔
1782

69,934✔
1783
    if (!m_ident_message_sent) {
150,250✔
1784
        if (have_client_file_ident())
9,086✔
1785
            send_ident_message(); // Throws
9,086✔
1786
        return;
9,086✔
1787
    }
9,086✔
1788

66,312✔
1789
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
141,164✔
1790
                                                      [](const PendingTestCommand& command) {
66,366✔
1791
                                                          return command.pending;
108✔
1792
                                                      });
108✔
1793
    if (has_pending_test_command) {
141,164✔
1794
        return send_test_command_message();
52✔
1795
    }
52✔
1796

66,286✔
1797
    if (m_error_to_send)
141,112✔
1798
        return send_json_error_message(); // Throws
42✔
1799

66,266✔
1800
    // Stop sending upload, mark and query messages when the client detects an error.
66,266✔
1801
    if (m_client_error) {
141,070✔
1802
        return;
24✔
1803
    }
24✔
1804

66,254✔
1805
    if (m_target_download_mark > m_last_download_mark_sent)
141,046✔
1806
        return send_mark_message(); // Throws
18,890✔
1807

57,876✔
1808
    auto is_upload_allowed = [&]() -> bool {
122,172✔
1809
        if (!m_is_flx_sync_session) {
122,172✔
1810
            return true;
109,490✔
1811
        }
109,490✔
1812

6,356✔
1813
        auto migration_store = get_migration_store();
12,682✔
1814
        if (!migration_store) {
12,682✔
1815
            return true;
×
1816
        }
×
1817

6,356✔
1818
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
12,682✔
1819
        if (!sentinel_query_version) {
12,682✔
1820
            return true;
12,660✔
1821
        }
12,660✔
1822

10✔
1823
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
10✔
1824
        return m_last_sent_flx_query_version != *sentinel_query_version;
22✔
1825
    };
22✔
1826

57,876✔
1827
    if (!is_upload_allowed()) {
122,156✔
1828
        return;
12✔
1829
    }
12✔
1830

57,870✔
1831
    auto check_pending_flx_version = [&]() -> bool {
122,156✔
1832
        if (!m_is_flx_sync_session) {
122,156✔
1833
            return false;
109,486✔
1834
        }
109,486✔
1835

6,350✔
1836
        if (!m_allow_upload) {
12,670✔
1837
            return false;
2,302✔
1838
        }
2,302✔
1839

5,188✔
1840
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
10,368✔
1841

5,188✔
1842
        if (!m_pending_flx_sub_set) {
10,368✔
1843
            return false;
8,524✔
1844
        }
8,524✔
1845

920✔
1846
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,844✔
1847
    };
1,844✔
1848

57,870✔
1849
    if (check_pending_flx_version()) {
122,144✔
1850
        return send_query_change_message(); // throws
1,058✔
1851
    }
1,058✔
1852

57,342✔
1853
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
121,086✔
1854
        return send_upload_message(); // Throws
58,630✔
1855
    }
58,630✔
1856
}
121,086✔
1857

1858

1859
void Session::send_bind_message()
1860
{
10,070✔
1861
    REALM_ASSERT_EX(m_state == Active, m_state);
10,070✔
1862

4,364✔
1863
    session_ident_type session_ident = m_ident;
10,070✔
1864
    bool need_client_file_ident = !have_client_file_ident();
10,070✔
1865
    const bool is_subserver = false;
10,070✔
1866

4,364✔
1867

4,364✔
1868
    ClientProtocol& protocol = m_conn.get_client_protocol();
10,070✔
1869
    int protocol_version = m_conn.get_negotiated_protocol_version();
10,070✔
1870
    OutputBuffer& out = m_conn.get_output_buffer();
10,070✔
1871
    // Discard the token since it's ignored by the server.
4,364✔
1872
    std::string empty_access_token;
10,070✔
1873
    if (m_is_flx_sync_session) {
10,070✔
1874
        nlohmann::json bind_json_data;
1,354✔
1875
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,354✔
1876
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1877
        }
60✔
1878
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,354✔
1879
        auto schema_version = get_schema_version();
1,354✔
1880
        // Send 0 if schema is not versioned.
676✔
1881
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,352✔
1882
        if (logger.would_log(util::Logger::Level::debug)) {
1,354✔
1883
            std::string json_data_dump;
1,354✔
1884
            if (!bind_json_data.empty()) {
1,354✔
1885
                json_data_dump = bind_json_data.dump();
1,354✔
1886
            }
1,354✔
1887
            logger.debug(
1,354✔
1888
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,354✔
1889
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,354✔
1890
        }
1,354✔
1891
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,354✔
1892
                                       need_client_file_ident, is_subserver); // Throws
1,354✔
1893
    }
1,354✔
1894
    else {
8,716✔
1895
        std::string server_path = get_virt_path();
8,716✔
1896
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
8,716✔
1897
                     session_ident, need_client_file_ident, is_subserver, server_path);
8,716✔
1898
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
8,716✔
1899
                                       need_client_file_ident, is_subserver); // Throws
8,716✔
1900
    }
8,716✔
1901
    m_conn.initiate_write_message(out, this); // Throws
10,070✔
1902

4,364✔
1903
    m_bind_message_sent = true;
10,070✔
1904
    call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
10,070✔
1905
                    DownloadBatchState::SteadyState, 0);
10,070✔
1906

4,364✔
1907
    // Ready to send the IDENT message if the file identifier pair is already
4,364✔
1908
    // available.
4,364✔
1909
    if (!need_client_file_ident)
10,070✔
1910
        enlist_to_send(); // Throws
5,768✔
1911
}
10,070✔
1912

1913

1914
void Session::send_ident_message()
1915
{
9,086✔
1916
    REALM_ASSERT_EX(m_state == Active, m_state);
9,086✔
1917
    REALM_ASSERT(m_bind_message_sent);
9,086✔
1918
    REALM_ASSERT(!m_unbind_message_sent);
9,086✔
1919
    REALM_ASSERT(have_client_file_ident());
9,086✔
1920

3,622✔
1921

3,622✔
1922
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,086✔
1923
    OutputBuffer& out = m_conn.get_output_buffer();
9,086✔
1924
    session_ident_type session_ident = m_ident;
9,086✔
1925

3,622✔
1926
    if (m_is_flx_sync_session) {
9,086✔
1927
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,278✔
1928
        const auto active_query_body = active_query_set.to_ext_json();
1,278✔
1929
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,278✔
1930
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,278✔
1931
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,278✔
1932
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,278✔
1933
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,278✔
1934
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,278✔
1935
                     active_query_body); // Throws
1,278✔
1936
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,278✔
1937
                                        active_query_set.version(), active_query_body); // Throws
1,278✔
1938
        m_last_sent_flx_query_version = active_query_set.version();
1,278✔
1939
    }
1,278✔
1940
    else {
7,808✔
1941
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
7,808✔
1942
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
7,808✔
1943
                     "latest_server_version_salt=%6)",
7,808✔
1944
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
7,808✔
1945
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
7,808✔
1946
                     m_progress.latest_server_version.salt);                                  // Throws
7,808✔
1947
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
7,808✔
1948
    }
7,808✔
1949
    m_conn.initiate_write_message(out, this); // Throws
9,086✔
1950

3,622✔
1951
    m_ident_message_sent = true;
9,086✔
1952

3,622✔
1953
    // Other messages may be waiting to be sent
3,622✔
1954
    enlist_to_send(); // Throws
9,086✔
1955
}
9,086✔
1956

1957
void Session::send_query_change_message()
1958
{
1,058✔
1959
    REALM_ASSERT_EX(m_state == Active, m_state);
1,058✔
1960
    REALM_ASSERT(m_ident_message_sent);
1,058✔
1961
    REALM_ASSERT(!m_unbind_message_sent);
1,058✔
1962
    REALM_ASSERT(m_pending_flx_sub_set);
1,058✔
1963
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,058✔
1964

528✔
1965
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,058✔
1966
        return;
×
1967
    }
×
1968

528✔
1969
    auto sub_store = get_flx_subscription_store();
1,058✔
1970
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,058✔
1971
    auto latest_queries = latest_sub_set.to_ext_json();
1,058✔
1972
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,058✔
1973
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,058✔
1974

528✔
1975
    OutputBuffer& out = m_conn.get_output_buffer();
1,058✔
1976
    session_ident_type session_ident = get_ident();
1,058✔
1977
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,058✔
1978
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,058✔
1979
    m_conn.initiate_write_message(out, this);
1,058✔
1980

528✔
1981
    m_last_sent_flx_query_version = latest_sub_set.version();
1,058✔
1982

528✔
1983
    request_download_completion_notification();
1,058✔
1984
}
1,058✔
1985

1986
void Session::send_upload_message()
1987
{
58,632✔
1988
    REALM_ASSERT_EX(m_state == Active, m_state);
58,632✔
1989
    REALM_ASSERT(m_ident_message_sent);
58,632✔
1990
    REALM_ASSERT(!m_unbind_message_sent);
58,632✔
1991

28,734✔
1992
    if (REALM_UNLIKELY(get_client().is_dry_run()))
58,632✔
1993
        return;
28,734✔
1994

28,734✔
1995
    version_type target_upload_version = m_last_version_available;
58,632✔
1996
    if (m_pending_flx_sub_set) {
58,632✔
1997
        REALM_ASSERT(m_is_flx_sync_session);
788✔
1998
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
788✔
1999
    }
788✔
2000

28,734✔
2001
    std::vector<UploadChangeset> uploadable_changesets;
58,632✔
2002
    version_type locked_server_version = 0;
58,632✔
2003
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
58,632✔
2004
                                             locked_server_version); // Throws
58,632✔
2005

28,734✔
2006
    if (uploadable_changesets.empty()) {
58,632✔
2007
        // Nothing more to upload right now
14,208✔
2008
        check_for_upload_completion(); // Throws
30,450✔
2009
        // If we need to limit upload up to some version other than the last client version available and there are no
14,208✔
2010
        // changes to upload, then there is no need to send an empty message.
14,208✔
2011
        if (m_pending_flx_sub_set) {
30,450✔
2012
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
216✔
2013
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
216✔
2014
            // Other messages may be waiting to be sent
108✔
2015
            return enlist_to_send(); // Throws
216✔
2016
        }
216✔
2017
    }
28,182✔
2018
    else {
28,182✔
2019
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
28,182✔
2020
    }
28,182✔
2021

28,734✔
2022
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
58,524✔
2023
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
572✔
2024
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
572✔
2025
    }
572✔
2026

28,626✔
2027
    version_type progress_client_version = m_upload_progress.client_version;
58,416✔
2028
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
58,416✔
2029

28,626✔
2030
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
58,416✔
2031
                 "locked_server_version=%3, num_changesets=%4)",
58,416✔
2032
                 progress_client_version, progress_server_version, locked_server_version,
58,416✔
2033
                 uploadable_changesets.size()); // Throws
58,416✔
2034

28,626✔
2035
    ClientProtocol& protocol = m_conn.get_client_protocol();
58,416✔
2036
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
58,416✔
2037

28,626✔
2038
    for (const UploadChangeset& uc : uploadable_changesets) {
50,664✔
2039
        logger.debug(util::LogCategory::changeset,
42,810✔
2040
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
42,810✔
2041
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
42,810✔
2042
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
42,810✔
2043
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
42,810✔
2044
        if (logger.would_log(util::Logger::Level::trace)) {
42,810✔
2045
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2046
            if (changeset_data.size() < 1024) {
×
2047
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2048
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2049
            }
×
2050
            else {
×
2051
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2052
                             protocol.compressed_hex_dump(changeset_data));
×
2053
            }
×
2054

2055
#if REALM_DEBUG
×
2056
            ChunkedBinaryInputStream in{changeset_data};
×
2057
            Changeset log;
×
2058
            try {
×
2059
                parse_changeset(in, log);
×
2060
                std::stringstream ss;
×
2061
                log.print(ss);
×
2062
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2063
            }
×
2064
            catch (const BadChangesetError& err) {
×
2065
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2066
            }
×
2067
#endif
×
2068
        }
×
2069

20,772✔
2070
#if 0 // Upload log compaction is currently not implemented
2071
        if (!get_client().m_disable_upload_compaction) {
2072
            ChangesetEncoder::Buffer encode_buffer;
2073

2074
            {
2075
                // Upload compaction only takes place within single changesets to
2076
                // avoid another client seeing inconsistent snapshots.
2077
                ChunkedBinaryInputStream stream{uc.changeset};
2078
                Changeset changeset;
2079
                parse_changeset(stream, changeset); // Throws
2080
                // FIXME: What is the point of setting these? How can compaction care about them?
2081
                changeset.version = uc.progress.client_version;
2082
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2083
                changeset.origin_timestamp = uc.origin_timestamp;
2084
                changeset.origin_file_ident = uc.origin_file_ident;
2085

2086
                compact_changesets(&changeset, 1);
2087
                encode_changeset(changeset, encode_buffer);
2088

2089
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2090
                             encode_buffer.size()); // Throws
2091
            }
2092

2093
            upload_message_builder.add_changeset(
2094
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2095
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2096
        }
2097
        else
2098
#endif
2099
        {
42,810✔
2100
            upload_message_builder.add_changeset(uc.progress.client_version,
42,810✔
2101
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
42,810✔
2102
                                                 uc.origin_file_ident,
42,810✔
2103
                                                 uc.changeset); // Throws
42,810✔
2104
        }
42,810✔
2105
    }
42,810✔
2106

28,626✔
2107
    int protocol_version = m_conn.get_negotiated_protocol_version();
58,416✔
2108
    OutputBuffer& out = m_conn.get_output_buffer();
58,416✔
2109
    session_ident_type session_ident = get_ident();
58,416✔
2110
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
58,416✔
2111
                                               progress_server_version,
58,416✔
2112
                                               locked_server_version); // Throws
58,416✔
2113
    m_conn.initiate_write_message(out, this);                          // Throws
58,416✔
2114

28,626✔
2115
    // Other messages may be waiting to be sent
28,626✔
2116
    enlist_to_send(); // Throws
58,416✔
2117
}
58,416✔
2118

2119

2120
void Session::send_mark_message()
2121
{
18,888✔
2122
    REALM_ASSERT_EX(m_state == Active, m_state);
18,888✔
2123
    REALM_ASSERT(m_ident_message_sent);
18,888✔
2124
    REALM_ASSERT(!m_unbind_message_sent);
18,888✔
2125
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
18,888✔
2126

8,378✔
2127
    request_ident_type request_ident = m_target_download_mark;
18,888✔
2128
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
18,888✔
2129

8,378✔
2130
    ClientProtocol& protocol = m_conn.get_client_protocol();
18,888✔
2131
    OutputBuffer& out = m_conn.get_output_buffer();
18,888✔
2132
    session_ident_type session_ident = get_ident();
18,888✔
2133
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
18,888✔
2134
    m_conn.initiate_write_message(out, this);                      // Throws
18,888✔
2135

8,378✔
2136
    m_last_download_mark_sent = request_ident;
18,888✔
2137

8,378✔
2138
    // Other messages may be waiting to be sent
8,378✔
2139
    enlist_to_send(); // Throws
18,888✔
2140
}
18,888✔
2141

2142

2143
void Session::send_unbind_message()
2144
{
7,680✔
2145
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
7,680✔
2146
    REALM_ASSERT(m_bind_message_sent);
7,680✔
2147
    REALM_ASSERT(!m_unbind_message_sent);
7,680✔
2148

3,018✔
2149
    logger.debug("Sending: UNBIND"); // Throws
7,680✔
2150

3,018✔
2151
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,680✔
2152
    OutputBuffer& out = m_conn.get_output_buffer();
7,680✔
2153
    session_ident_type session_ident = get_ident();
7,680✔
2154
    protocol.make_unbind_message(out, session_ident); // Throws
7,680✔
2155
    m_conn.initiate_write_message(out, this);         // Throws
7,680✔
2156

3,018✔
2157
    m_unbind_message_sent = true;
7,680✔
2158
}
7,680✔
2159

2160

2161
void Session::send_json_error_message()
2162
{
42✔
2163
    REALM_ASSERT_EX(m_state == Active, m_state);
42✔
2164
    REALM_ASSERT(m_ident_message_sent);
42✔
2165
    REALM_ASSERT(!m_unbind_message_sent);
42✔
2166
    REALM_ASSERT(m_error_to_send);
42✔
2167
    REALM_ASSERT(m_client_error);
42✔
2168

20✔
2169
    ClientProtocol& protocol = m_conn.get_client_protocol();
42✔
2170
    OutputBuffer& out = m_conn.get_output_buffer();
42✔
2171
    session_ident_type session_ident = get_ident();
42✔
2172
    auto protocol_error = m_client_error->error_for_server;
42✔
2173

20✔
2174
    auto message = util::format("%1", m_client_error->to_status());
42✔
2175
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
42✔
2176
                session_ident); // Throws
42✔
2177

20✔
2178
    nlohmann::json error_body_json;
42✔
2179
    error_body_json["message"] = std::move(message);
42✔
2180
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
42✔
2181
                                     error_body_json.dump()); // Throws
42✔
2182
    m_conn.initiate_write_message(out, this);                 // Throws
42✔
2183

20✔
2184
    m_error_to_send = false;
42✔
2185
    enlist_to_send(); // Throws
42✔
2186
}
42✔
2187

2188

2189
void Session::send_test_command_message()
2190
{
52✔
2191
    REALM_ASSERT_EX(m_state == Active, m_state);
52✔
2192

26✔
2193
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2194
                           [](const PendingTestCommand& command) {
52✔
2195
                               return command.pending;
52✔
2196
                           });
52✔
2197
    REALM_ASSERT(it != m_pending_test_commands.end());
52✔
2198

26✔
2199
    ClientProtocol& protocol = m_conn.get_client_protocol();
52✔
2200
    OutputBuffer& out = m_conn.get_output_buffer();
52✔
2201
    auto session_ident = get_ident();
52✔
2202

26✔
2203
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
52✔
2204
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
52✔
2205

26✔
2206
    m_conn.initiate_write_message(out, this); // Throws;
52✔
2207
    it->pending = false;
52✔
2208

26✔
2209
    enlist_to_send();
52✔
2210
}
52✔
2211

2212
bool Session::client_reset_if_needed()
2213
{
3,444✔
2214
    // Regardless of what happens, once we return from this function we will
1,640✔
2215
    // no longer be in the middle of a client reset
1,640✔
2216
    m_performing_client_reset = false;
3,444✔
2217

1,640✔
2218
    // Even if we end up not actually performing a client reset, consume the
1,640✔
2219
    // config to ensure that the resources it holds are released
1,640✔
2220
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,444✔
2221
    if (!client_reset_config) {
3,444✔
2222
        return false;
3,080✔
2223
    }
3,080✔
2224

182✔
2225
    auto on_flx_version_complete = [this](int64_t version) {
364✔
2226
        this->on_flx_sync_version_complete(version);
288✔
2227
    };
288✔
2228
    bool did_reset = client_reset::perform_client_reset(
364✔
2229
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
364✔
2230
        std::move(client_reset_config->notify_before_client_reset),
364✔
2231
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
364✔
2232
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
364✔
2233
    if (!did_reset) {
364✔
2234
        return false;
×
2235
    }
×
2236

182✔
2237
    // The fresh Realm has been used to reset the state
182✔
2238
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
364✔
2239

182✔
2240
    SaltedFileIdent client_file_ident;
364✔
2241
    bool has_pending_client_reset = false;
364✔
2242
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
364✔
2243
                             &has_pending_client_reset); // Throws
364✔
2244
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
364✔
2245
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
364✔
2246
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
364✔
2247
                    m_progress.download.last_integrated_client_version);
364✔
2248
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
364✔
2249
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
364✔
2250

182✔
2251
    m_upload_progress = m_progress.upload;
364✔
2252
    m_download_progress = m_progress.download;
364✔
2253
    // In recovery mode, there may be new changesets to upload and nothing left to download.
182✔
2254
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
182✔
2255
    // For both, we want to allow uploads again without needing external changes to download first.
182✔
2256
    m_allow_upload = true;
364✔
2257
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
364✔
2258

182✔
2259
    if (has_pending_client_reset) {
364✔
2260
        handle_pending_client_reset_acknowledgement();
284✔
2261
    }
284✔
2262

182✔
2263
    update_subscription_version_info();
364✔
2264

182✔
2265
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
182✔
2266
    if (auto migration_store = get_migration_store()) {
364✔
2267
        migration_store->complete_migration_or_rollback();
256✔
2268
    }
256✔
2269

182✔
2270
    return true;
364✔
2271
}
364✔
2272

2273
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2274
{
3,502✔
2275
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,502✔
2276
                 client_file_ident.salt); // Throws
3,502✔
2277

1,696✔
2278
    // Ignore the message if the deactivation process has been initiated,
1,696✔
2279
    // because in that case, the associated Realm and SessionWrapper must
1,696✔
2280
    // not be accessed any longer.
1,696✔
2281
    if (m_state != Active)
3,502✔
2282
        return Status::OK(); // Success
58✔
2283

1,640✔
2284
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,444✔
2285
                               !m_unbound_message_received);
3,444✔
2286
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,444✔
2287
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2288
    }
×
2289
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,444✔
2290
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2291
    }
×
2292
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,444✔
2293
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2294
    }
×
2295

1,640✔
2296
    m_client_file_ident = client_file_ident;
3,444✔
2297

1,640✔
2298
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,444✔
2299
        // Ready to send the IDENT message
2300
        ensure_enlisted_to_send(); // Throws
×
2301
        return Status::OK();       // Success
×
2302
    }
×
2303

1,640✔
2304
    // if a client reset happens, it will take care of setting the file ident
1,640✔
2305
    // and if not, we do it here
1,640✔
2306
    bool did_client_reset = false;
3,444✔
2307
    try {
3,444✔
2308
        did_client_reset = client_reset_if_needed();
3,444✔
2309
    }
3,444✔
2310
    catch (const std::exception& e) {
1,680✔
2311
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
80✔
2312
        logger.error(err_msg.c_str());
80✔
2313
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2314
        suspend(err_info);
80✔
2315
        return Status::OK();
80✔
2316
    }
80✔
2317
    if (!did_client_reset) {
3,364✔
2318
        get_history().set_client_file_ident(client_file_ident,
3,080✔
2319
                                            m_fix_up_object_ids); // Throws
3,080✔
2320
        m_progress.download.last_integrated_client_version = 0;
3,080✔
2321
        m_progress.upload.client_version = 0;
3,080✔
2322
        m_last_version_selected_for_upload = 0;
3,080✔
2323
    }
3,080✔
2324

1,600✔
2325
    // Ready to send the IDENT message
1,600✔
2326
    ensure_enlisted_to_send(); // Throws
3,364✔
2327
    return Status::OK();       // Success
3,364✔
2328
}
3,364✔
2329

2330
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2331
                                         DownloadBatchState batch_state, int64_t query_version,
2332
                                         const ReceivedChangesets& received_changesets)
2333
{
45,624✔
2334
    REALM_ASSERT_EX(query_version >= 0, query_version);
45,624✔
2335
    // Ignore the message if the deactivation process has been initiated,
23,012✔
2336
    // because in that case, the associated Realm and SessionWrapper must
23,012✔
2337
    // not be accessed any longer.
23,012✔
2338
    if (m_state != Active)
45,624✔
2339
        return Status::OK();
442✔
2340

22,708✔
2341
    if (is_steady_state_download_message(batch_state, query_version)) {
45,182✔
2342
        batch_state = DownloadBatchState::SteadyState;
43,222✔
2343
    }
43,222✔
2344

22,708✔
2345
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
45,182✔
2346
                 "latest_server_version=%3, latest_server_version_salt=%4, "
45,182✔
2347
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
45,182✔
2348
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
45,182✔
2349
                 progress.download.server_version, progress.download.last_integrated_client_version,
45,182✔
2350
                 progress.latest_server_version.version, progress.latest_server_version.salt,
45,182✔
2351
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
45,182✔
2352
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
45,182✔
2353

22,708✔
2354
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
22,708✔
2355
    // changeset over and over again.
22,708✔
2356
    if (m_client_error) {
45,182✔
2357
        logger.debug("Ignoring download message because the client detected an integration error");
×
2358
        return Status::OK();
×
2359
    }
×
2360

22,708✔
2361
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
45,182✔
2362
    if (REALM_UNLIKELY(!legal_at_this_time)) {
45,182✔
2363
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2364
    }
×
2365
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
45,182✔
2366
        logger.error("Bad sync progress received (%1)", status);
×
2367
        return status;
×
2368
    }
×
2369

22,708✔
2370
    version_type server_version = m_progress.download.server_version;
45,182✔
2371
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
45,182✔
2372
    for (const RemoteChangeset& changeset : received_changesets) {
43,568✔
2373
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,062✔
2374
        // version must be increasing, but can stay the same during bootstraps.
22,062✔
2375
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,078✔
2376
                                                         : (changeset.remote_version > server_version);
41,906✔
2377
        // Each server version cannot be greater than the one in the header of the download message.
22,062✔
2378
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
42,922✔
2379
        if (!good_server_version) {
42,922✔
2380
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2381
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2382
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2383
        }
×
2384
        server_version = changeset.remote_version;
42,922✔
2385
        // Check that per-changeset last integrated client version is "weakly"
22,062✔
2386
        // increasing.
22,062✔
2387
        bool good_client_version =
42,922✔
2388
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
42,922✔
2389
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
42,924✔
2390
        if (!good_client_version) {
42,922✔
2391
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2392
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2393
                                 "(%1, %2, %3)",
×
2394
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2395
                                 progress.download.last_integrated_client_version)};
×
2396
        }
×
2397
        last_integrated_client_version = changeset.last_integrated_local_version;
42,922✔
2398
        // Server shouldn't send our own changes, and zero is not a valid client
22,062✔
2399
        // file identifier.
22,062✔
2400
        bool good_file_ident =
42,922✔
2401
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
42,924✔
2402
        if (!good_file_ident) {
42,922✔
2403
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2404
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2405
                                 changeset.origin_file_ident)};
×
2406
        }
×
2407
    }
42,922✔
2408

22,708✔
2409
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
45,182✔
2410
                                       batch_state, received_changesets.size());
45,182✔
2411
    if (hook_action == SyncClientHookAction::EarlyReturn) {
45,182✔
2412
        return Status::OK();
16✔
2413
    }
16✔
2414
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
45,166✔
2415

22,700✔
2416
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
45,166✔
2417
        clear_resumption_delay_state();
1,948✔
2418
        return Status::OK();
1,948✔
2419
    }
1,948✔
2420

21,726✔
2421
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
43,218✔
2422

21,726✔
2423
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
43,218✔
2424
                                  batch_state, received_changesets.size());
43,218✔
2425
    if (hook_action == SyncClientHookAction::EarlyReturn) {
43,218✔
2426
        return Status::OK();
×
2427
    }
×
2428
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
43,218✔
2429

21,726✔
2430
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
21,726✔
2431
    // after a retryable session error.
21,726✔
2432
    clear_resumption_delay_state();
43,218✔
2433
    return Status::OK();
43,218✔
2434
}
43,218✔
2435

2436
Status Session::receive_mark_message(request_ident_type request_ident)
2437
{
18,128✔
2438
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
18,128✔
2439

7,994✔
2440
    // Ignore the message if the deactivation process has been initiated,
7,994✔
2441
    // because in that case, the associated Realm and SessionWrapper must
7,994✔
2442
    // not be accessed any longer.
7,994✔
2443
    if (m_state != Active)
18,128✔
2444
        return Status::OK(); // Success
632✔
2445

7,946✔
2446
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
17,496✔
2447
    if (REALM_UNLIKELY(!legal_at_this_time)) {
17,496✔
2448
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
16✔
2449
    }
16✔
2450
    bool good_request_ident =
17,480✔
2451
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
17,480✔
2452
    if (REALM_UNLIKELY(!good_request_ident)) {
17,480✔
2453
        return {
×
2454
            ErrorCodes::SyncProtocolInvariantFailed,
×
2455
            util::format(
×
2456
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2457
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2458
    }
×
2459

7,936✔
2460
    m_server_version_at_last_download_mark = m_progress.download.server_version;
17,480✔
2461
    m_last_download_mark_received = request_ident;
17,480✔
2462
    check_for_download_completion(); // Throws
17,480✔
2463

7,936✔
2464
    return Status::OK(); // Success
17,480✔
2465
}
17,480✔
2466

2467

2468
// The caller (Connection) must discard the session if the session has become
2469
// deactivated upon return.
2470
Status Session::receive_unbound_message()
2471
{
5,678✔
2472
    logger.debug("Received: UNBOUND");
5,678✔
2473

2,062✔
2474
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
5,678✔
2475
    if (REALM_UNLIKELY(!legal_at_this_time)) {
5,678✔
2476
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2477
    }
×
2478

2,062✔
2479
    // The fact that the UNBIND message has been sent, but an ERROR message has
2,062✔
2480
    // not been received, implies that the deactivation process must have been
2,062✔
2481
    // initiated, so this session must be in the Deactivating state or the session
2,062✔
2482
    // has been suspended because of a client side error.
2,062✔
2483
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
5,678!
2484

2,062✔
2485
    m_unbound_message_received = true;
5,678✔
2486

2,062✔
2487
    // Detect completion of the unbinding process
2,062✔
2488
    if (m_unbind_message_send_complete && m_state == Deactivating) {
5,678✔
2489
        // The deactivation process completes when the unbinding process
2,062✔
2490
        // completes.
2,062✔
2491
        complete_deactivation(); // Throws
5,678✔
2492
        // Life cycle state is now Deactivated
2,062✔
2493
    }
5,678✔
2494

2,062✔
2495
    return Status::OK(); // Success
5,678✔
2496
}
5,678✔
2497

2498

2499
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2500
{
20✔
2501
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2502
    // Ignore the message if the deactivation process has been initiated,
10✔
2503
    // because in that case, the associated Realm and SessionWrapper must
10✔
2504
    // not be accessed any longer.
10✔
2505
    if (m_state == Active) {
20✔
2506
        on_flx_sync_error(query_version, message); // throws
20✔
2507
    }
20✔
2508
    return Status::OK();
20✔
2509
}
20✔
2510

2511
// The caller (Connection) must discard the session if the session has become
2512
// deactivated upon return.
2513
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2514
{
702✔
2515
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
702✔
2516
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
702✔
2517

344✔
2518
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
702✔
2519
    if (REALM_UNLIKELY(!legal_at_this_time)) {
702✔
2520
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2521
    }
×
2522

344✔
2523
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
702✔
2524
    auto status = protocol_error_to_status(protocol_error, info.message);
702✔
2525
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
702✔
2526
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2527
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2528
                             info.raw_error_code)};
×
2529
    }
×
2530

344✔
2531
    // Can't process debug hook actions once the Session is undergoing deactivation, since
344✔
2532
    // the SessionWrapper may not be available
344✔
2533
    if (m_state == Active) {
702✔
2534
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
700✔
2535
        if (debug_action == SyncClientHookAction::EarlyReturn) {
700✔
2536
            return Status::OK();
8✔
2537
        }
8✔
2538
    }
694✔
2539

340✔
2540
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
340✔
2541
    // containing the compensating write has appeared in a download message.
340✔
2542
    if (status == ErrorCodes::SyncCompensatingWrite) {
694✔
2543
        // If the client is not active, the compensating writes will not be processed now, but will be
22✔
2544
        // sent again the next time the client connects
22✔
2545
        if (m_state == Active) {
44✔
2546
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2547
            m_pending_compensating_write_errors.push_back(info);
44✔
2548
        }
44✔
2549
        return Status::OK();
44✔
2550
    }
44✔
2551

318✔
2552
    if (protocol_error == ProtocolError::schema_version_changed) {
650✔
2553
        // Enable upload immediately if the session is still active.
26✔
2554
        if (m_state == Active) {
54✔
2555
            auto wt = get_db()->start_write();
54✔
2556
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
54✔
2557
            wt->commit();
54✔
2558
            // Notify SyncSession a schema migration is required.
26✔
2559
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
54✔
2560
        }
54✔
2561
        // Keep the session active to upload any unsynced changes.
26✔
2562
        return Status::OK();
54✔
2563
    }
54✔
2564

292✔
2565
    m_error_message_received = true;
596✔
2566
    suspend(SessionErrorInfo{info, std::move(status)});
596✔
2567
    return Status::OK();
596✔
2568
}
596✔
2569

2570
void Session::suspend(const SessionErrorInfo& info)
2571
{
676✔
2572
    REALM_ASSERT(!m_suspended);
676✔
2573
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
676✔
2574
    logger.debug("Suspended"); // Throws
676✔
2575

332✔
2576
    m_suspended = true;
676✔
2577

332✔
2578
    // Detect completion of the unbinding process
332✔
2579
    if (m_unbind_message_send_complete && m_error_message_received) {
676!
2580
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2581
        // we received an ERROR message implies that the deactivation process must
2582
        // have been initiated, so this session must be in the Deactivating state.
UNCOV
2583
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2584

2585
        // The deactivation process completes when the unbinding process
2586
        // completes.
UNCOV
2587
        complete_deactivation(); // Throws
×
2588
        // Life cycle state is now Deactivated
UNCOV
2589
    }
×
2590

332✔
2591
    // Notify the application of the suspension of the session if the session is
332✔
2592
    // still in the Active state
332✔
2593
    if (m_state == Active) {
676✔
2594
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
674✔
2595
        m_conn.one_less_active_unsuspended_session(); // Throws
674✔
2596
        on_suspended(info);                           // Throws
674✔
2597
    }
674✔
2598

332✔
2599
    if (!info.is_fatal) {
676✔
2600
        begin_resumption_delay(info);
62✔
2601
    }
62✔
2602

332✔
2603
    // Ready to send the UNBIND message, if it has not been sent already
332✔
2604
    if (!m_unbind_message_sent)
676✔
2605
        ensure_enlisted_to_send(); // Throws
674✔
2606
}
676✔
2607

2608
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2609
{
52✔
2610
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
52✔
2611
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2612
                           [&](const PendingTestCommand& command) {
52✔
2613
                               return command.id == ident;
52✔
2614
                           });
52✔
2615
    if (it == m_pending_test_commands.end()) {
52✔
2616
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2617
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2618
    }
×
2619

26✔
2620
    it->promise.emplace_value(std::string{body});
52✔
2621
    m_pending_test_commands.erase(it);
52✔
2622

26✔
2623
    return Status::OK();
52✔
2624
}
52✔
2625

2626
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2627
{
62✔
2628
    REALM_ASSERT(!m_try_again_activation_timer);
62✔
2629

28✔
2630
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
62✔
2631
                                  error_info.resumption_delay_interval);
62✔
2632
    auto try_again_interval = m_try_again_delay_info.delay_interval();
62✔
2633
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
62✔
2634
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
14✔
2635
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
14✔
2636
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
14✔
2637
        try_again_interval = std::chrono::milliseconds{1000};
32✔
2638
    }
32✔
2639
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
62✔
2640
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
62✔
2641
        if (status == ErrorCodes::OperationAborted)
62✔
2642
            return;
12✔
2643
        else if (!status.is_ok())
50✔
2644
            throw Exception(status);
×
2645

22✔
2646
        m_try_again_activation_timer.reset();
50✔
2647
        cancel_resumption_delay();
50✔
2648
    });
50✔
2649
}
62✔
2650

2651
void Session::clear_resumption_delay_state()
2652
{
45,164✔
2653
    if (m_try_again_activation_timer) {
45,164✔
2654
        logger.debug("Clearing resumption delay state after successful download");
×
2655
        m_try_again_delay_info.reset();
×
2656
    }
×
2657
}
45,164✔
2658

2659
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2660
{
45,176✔
2661
    const SyncProgress& a = m_progress;
45,176✔
2662
    const SyncProgress& b = progress;
45,176✔
2663
    std::string message;
45,176✔
2664
    if (b.latest_server_version.version < a.latest_server_version.version) {
45,176✔
2665
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2666
                               "session (current: %1, received: %2)",
×
2667
                               a.latest_server_version.version, b.latest_server_version.version);
×
2668
    }
×
2669
    if (b.upload.client_version < a.upload.client_version) {
45,176✔
2670
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2671
                               "throughout a session (current: %1, received: %2)",
×
2672
                               a.upload.client_version, b.upload.client_version);
×
2673
    }
×
2674
    if (b.upload.client_version > m_last_version_available) {
45,176✔
2675
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2676
                               "version in existence (current: %1, received: %2)",
×
2677
                               m_last_version_available, b.upload.client_version);
×
2678
    }
×
2679
    if (b.download.server_version < a.download.server_version) {
45,176✔
2680
        message =
×
2681
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2682
                         a.download.server_version, b.download.server_version);
×
2683
    }
×
2684
    if (b.download.server_version > b.latest_server_version.version) {
45,176✔
2685
        message = util::format(
×
2686
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2687
            b.download.server_version, b.latest_server_version.version);
×
2688
    }
×
2689
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
45,176✔
2690
        message = util::format(
×
2691
            "Last integrated client version on the server at the position in the server's history of the download "
×
2692
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2693
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2694
    }
×
2695
    if (b.download.last_integrated_client_version > b.upload.client_version) {
45,176✔
2696
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2697
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2698
                               "on the server (download: %1, upload: %2)",
×
2699
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2700
    }
×
2701
    if (b.download.server_version < b.upload.last_integrated_server_version) {
45,176✔
2702
        message = util::format(
×
2703
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2704
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2705
            b.download.server_version, b.upload.last_integrated_server_version);
×
2706
    }
×
2707

22,702✔
2708
    if (message.empty()) {
45,176✔
2709
        return Status::OK();
45,174✔
2710
    }
45,174✔
2711
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2712
}
2✔
2713

2714

2715
void Session::check_for_upload_completion()
2716
{
79,288✔
2717
    REALM_ASSERT_EX(m_state == Active, m_state);
79,288✔
2718
    if (!m_upload_completion_notification_requested) {
79,288✔
2719
        return;
46,308✔
2720
    }
46,308✔
2721

15,194✔
2722
    // during an ongoing client reset operation, we never upload anything
15,194✔
2723
    if (m_performing_client_reset)
32,980✔
2724
        return;
260✔
2725

15,064✔
2726
    // Upload process must have reached end of history
15,064✔
2727
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
32,720✔
2728
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
32,720✔
2729
    if (!scan_complete)
32,720✔
2730
        return;
7,062✔
2731

12,498✔
2732
    // All uploaded changesets must have been acknowledged by the server
12,498✔
2733
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,658✔
2734
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,658✔
2735
    if (!all_uploads_accepted)
25,658✔
2736
        return;
10,854✔
2737

7,318✔
2738
    m_upload_completion_notification_requested = false;
14,804✔
2739
    on_upload_completion(); // Throws
14,804✔
2740
}
14,804✔
2741

2742

2743
void Session::check_for_download_completion()
2744
{
62,420✔
2745
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
62,420✔
2746
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
62,420✔
2747
    if (m_last_download_mark_received == m_last_triggering_download_mark)
62,420✔
2748
        return;
44,746✔
2749
    if (m_last_download_mark_received < m_target_download_mark)
17,674✔
2750
        return;
358✔
2751
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
17,316✔
2752
        return;
×
2753
    m_last_triggering_download_mark = m_target_download_mark;
17,316✔
2754
    if (REALM_UNLIKELY(!m_allow_upload)) {
17,316✔
2755
        // Activate the upload process now, and enable immediate reactivation
2,134✔
2756
        // after a subsequent fast reconnect.
2,134✔
2757
        m_allow_upload = true;
5,876✔
2758
        ensure_enlisted_to_send(); // Throws
5,876✔
2759
    }
5,876✔
2760
    on_download_completion(); // Throws
17,316✔
2761
}
17,316✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc