• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_281750

30 Oct 2023 03:37PM UTC coverage: 90.528% (-1.0%) from 91.571%
github_pull_request_281750

Pull #6073

Evergreen

jedelbo
Log free space and history sizes when opening file
Pull Request #6073: Merge next-major

95488 of 175952 branches covered (0.0%)

8973 of 12277 new or added lines in 149 files covered. (73.09%)

622 existing lines in 51 files now uncovered.

233503 of 257934 relevant lines covered (90.53%)

6533720.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.31
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <system_error>
2
#include <sstream>
3

4
#include <realm/util/assert.hpp>
5
#include <realm/util/safe_int_ops.hpp>
6
#include <realm/util/random.hpp>
7
#include <realm/util/memory_stream.hpp>
8
#include <realm/util/basic_system_errors.hpp>
9
#include <realm/util/scope_exit.hpp>
10
#include <realm/util/to_string.hpp>
11
#include <realm/util/uri.hpp>
12
#include <realm/util/platform_info.hpp>
13
#include <realm/sync/impl/clock.hpp>
14
#include <realm/impl/simulated_failure.hpp>
15
#include <realm/sync/network/http.hpp>
16
#include <realm/sync/network/websocket.hpp>
17
#include <realm/sync/noinst/client_history_impl.hpp>
18
#include <realm/sync/noinst/client_impl_base.hpp>
19
#include <realm/sync/noinst/compact_changesets.hpp>
20
#include <realm/sync/protocol.hpp>
21
#include <realm/version.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,744✔
48
    m_backoff_state.reset();
1,744✔
49
    scheduled_reset = false;
1,744✔
50
}
1,744✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,298✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,298✔
57
}
3,298✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
5,334✔
62
    if (scheduled_reset) {
5,334✔
63
        reset();
4✔
64
    }
4✔
65

2,800✔
66
    if (!m_backoff_state.triggering_error) {
5,334✔
67
        return std::chrono::milliseconds::zero();
4,110✔
68
    }
4,110✔
69

680✔
70
    switch (*m_backoff_state.triggering_error) {
1,224✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,126✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,126✔
77
                return std::chrono::milliseconds::max();
902✔
78
            }
902✔
79

112✔
80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
224✔
81
            return m_backoff_state.delay_interval();
224✔
82
    }
1,224✔
83
}
1,224✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
3,332✔
89
    util::Uri uri(url); // Throws
3,332✔
90
    uri.canonicalize(); // Throws
3,332✔
91
    std::string userinfo, address_2, port_2;
3,332✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,332✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,332✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,332✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,332✔
96
    if (REALM_UNLIKELY(!good))
3,332✔
97
        return false;
1,492✔
98
    ProtocolEnvelope protocol_2;
3,332✔
99
    port_type port_3;
3,332✔
100
    if (realm_scheme) {
3,332✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
3,332✔
111
        REALM_ASSERT(ws_scheme);
3,332✔
112
        if (uri.get_scheme() == "ws:") {
3,332✔
113
            protocol_2 = ProtocolEnvelope::ws;
3,328✔
114
            port_3 = 80;
3,328✔
115
        }
3,328✔
116
        else {
4✔
117
            protocol_2 = ProtocolEnvelope::wss;
4✔
118
            port_3 = 443;
4✔
119
        }
4✔
120
    }
3,332✔
121
    if (!port_2.empty()) {
3,332✔
122
        std::istringstream in(port_2);    // Throws
3,332✔
123
        in.imbue(std::locale::classic()); // Throws
3,332✔
124
        in >> port_3;
3,332✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,332✔
126
            return false;
1,492✔
127
    }
3,332✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
3,332✔
129

1,492✔
130
    protocol = protocol_2;
3,332✔
131
    address = std::move(address_2);
3,332✔
132
    port = port_3;
3,332✔
133
    path = std::move(path_2);
3,332✔
134
    return true;
3,332✔
135
}
3,332✔
136

137
ClientImpl::ClientImpl(ClientConfig config)
138
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
139
    , logger{*logger_ptr}
140
    , m_reconnect_mode{config.reconnect_mode}
141
    , m_connect_timeout{config.connect_timeout}
142
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
143
    , m_ping_keepalive_period{config.ping_keepalive_period}
144
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
145
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
146
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
147
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
148
    , m_dry_run{config.dry_run}
149
    , m_enable_default_port_hack{config.enable_default_port_hack}
150
    , m_disable_upload_compaction{config.disable_upload_compaction}
151
    , m_fix_up_object_ids{config.fix_up_object_ids}
152
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
153
    , m_socket_provider{std::move(config.socket_provider)}
154
    , m_client_protocol{} // Throws
155
    , m_one_connection_per_session{config.one_connection_per_session}
156
    , m_random{}
157
{
8,970✔
158
    // FIXME: Would be better if seeding was up to the application.
4,416✔
159
    util::seed_prng_nondeterministically(m_random); // Throws
8,970✔
160

4,416✔
161
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,970✔
162
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,970✔
163
                 get_current_protocol_version()); // Throws
8,970✔
164
    logger.info("Platform: %1", util::get_platform_info());
8,970✔
165
    const char* build_mode;
8,970✔
166
#if REALM_DEBUG
8,970✔
167
    build_mode = "Debug";
8,970✔
168
#else
169
    build_mode = "Release";
170
#endif
171
    logger.debug("Build mode: %1", build_mode);
8,970✔
172
    logger.debug("Config param: one_connection_per_session = %1",
8,970✔
173
                 config.one_connection_per_session); // Throws
8,970✔
174
    logger.debug("Config param: connect_timeout = %1 ms",
8,970✔
175
                 config.connect_timeout); // Throws
8,970✔
176
    logger.debug("Config param: connection_linger_time = %1 ms",
8,970✔
177
                 config.connection_linger_time); // Throws
8,970✔
178
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,970✔
179
                 config.ping_keepalive_period); // Throws
8,970✔
180
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,970✔
181
                 config.pong_keepalive_timeout); // Throws
8,970✔
182
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,970✔
183
                 config.fast_reconnect_limit); // Throws
8,970✔
184
    logger.debug("Config param: disable_upload_compaction = %1",
8,970✔
185
                 config.disable_upload_compaction); // Throws
8,970✔
186
    logger.debug("Config param: disable_sync_to_disk = %1",
8,970✔
187
                 config.disable_sync_to_disk); // Throws
8,970✔
188
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,970✔
189
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,970✔
190
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,970✔
191
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
8,970✔
192

4,416✔
193
    if (config.reconnect_mode != ReconnectMode::normal) {
8,970✔
194
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
768✔
195
                    "never do this in production!");
768✔
196
    }
768✔
197

4,416✔
198
    if (config.dry_run) {
8,970✔
199
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
200
                    "never do this in production!");
×
201
    }
×
202

4,416✔
203
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
8,970✔
204

4,416✔
205
    if (m_one_connection_per_session) {
8,970✔
206
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
207
        // multiplexing.
2✔
208
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
209
        //            "never do this in production");
2✔
210
    }
4✔
211

4,416✔
212
    if (config.disable_upload_activation_delay) {
8,970✔
213
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
214
                    "never do this in production");
×
215
    }
×
216

4,416✔
217
    if (config.disable_sync_to_disk) {
8,970✔
218
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
219
                    "never do this in production");
×
220
    }
×
221

4,416✔
222
    m_actualize_and_finalize = create_trigger([this](Status status) {
13,844✔
223
        if (status == ErrorCodes::OperationAborted)
13,844✔
224
            return;
×
225
        else if (!status.is_ok())
13,844✔
226
            throw Exception(status);
×
227
        actualize_and_finalize_session_wrappers(); // Throws
13,844✔
228
    });
13,844✔
229
}
8,970✔
230

231

232
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
233
{
158,824✔
234
    REALM_ASSERT(m_socket_provider);
158,824✔
235
    {
158,824✔
236
        std::lock_guard lock(m_drain_mutex);
158,824✔
237
        ++m_outstanding_posts;
158,824✔
238
        m_drained = false;
158,824✔
239
    }
158,824✔
240
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
158,824✔
241
        auto decr_guard = util::make_scope_exit([&]() noexcept {
158,822✔
242
            std::lock_guard lock(m_drain_mutex);
158,822✔
243
            REALM_ASSERT(m_outstanding_posts);
158,822✔
244
            --m_outstanding_posts;
158,822✔
245
            m_drain_cv.notify_all();
158,822✔
246
        });
158,822✔
247
        handler(status);
158,822✔
248
    });
158,822✔
249
}
158,824✔
250

251

252
void ClientImpl::drain_connections()
253
{
8,970✔
254
    logger.debug("Draining connections during sync client shutdown");
8,970✔
255
    for (auto& server_slot_pair : m_server_slots) {
5,656✔
256
        auto& server_slot = server_slot_pair.second;
2,342✔
257

1,102✔
258
        if (server_slot.connection) {
2,342✔
259
            auto& conn = server_slot.connection;
2,246✔
260
            conn->force_close();
2,246✔
261
        }
2,246✔
262
        else {
96✔
263
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
264
                conn_pair.second->force_close();
4✔
265
            }
4✔
266
        }
96✔
267
    }
2,342✔
268
}
8,970✔
269

270

271
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
272
                                                       SyncSocketProvider::FunctionHandler&& handler)
273
{
16,164✔
274
    REALM_ASSERT(m_socket_provider);
16,164✔
275
    {
16,164✔
276
        std::lock_guard lock(m_drain_mutex);
16,164✔
277
        ++m_outstanding_posts;
16,164✔
278
        m_drained = false;
16,164✔
279
    }
16,164✔
280
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,168✔
281
        handler(status);
16,168✔
282

7,998✔
283
        std::lock_guard lock(m_drain_mutex);
16,168✔
284
        REALM_ASSERT(m_outstanding_posts);
16,168✔
285
        --m_outstanding_posts;
16,168✔
286
        m_drain_cv.notify_all();
16,168✔
287
    });
16,168✔
288
}
16,164✔
289

290

291
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
292
{
11,428✔
293
    REALM_ASSERT(m_socket_provider);
11,428✔
294
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,428✔
295
}
11,428✔
296

297
Connection::~Connection()
298
{
2,458✔
299
    if (m_websocket_sentinel) {
2,458✔
300
        m_websocket_sentinel->destroyed = true;
×
301
        m_websocket_sentinel.reset();
×
302
    }
×
303
}
2,458✔
304

305
void Connection::activate()
306
{
2,458✔
307
    REALM_ASSERT(m_on_idle);
2,458✔
308
    m_activated = true;
2,458✔
309
    if (m_num_active_sessions == 0)
2,458✔
310
        m_on_idle->trigger();
×
311
    // We cannot in general connect immediately, because a prior failure to
1,158✔
312
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,158✔
313
    initiate_reconnect_wait(); // Throws
2,458✔
314
}
2,458✔
315

316

317
void Connection::activate_session(std::unique_ptr<Session> sess)
318
{
9,606✔
319
    REALM_ASSERT(sess);
9,606✔
320
    REALM_ASSERT(&sess->m_conn == this);
9,606✔
321
    REALM_ASSERT(!m_force_closed);
9,606✔
322
    Session& sess_2 = *sess;
9,606✔
323
    session_ident_type ident = sess->m_ident;
9,606✔
324
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,606✔
325
    bool was_inserted = p.second;
9,606✔
326
    REALM_ASSERT(was_inserted);
9,606✔
327
    // Save the session ident to the historical list of session idents
4,626✔
328
    m_session_history.insert(ident);
9,606✔
329
    sess_2.activate(); // Throws
9,606✔
330
    if (m_state == ConnectionState::connected) {
9,606✔
331
        bool fast_reconnect = false;
6,720✔
332
        sess_2.connection_established(fast_reconnect); // Throws
6,720✔
333
    }
6,720✔
334
    ++m_num_active_sessions;
9,606✔
335
}
9,606✔
336

337

338
void Connection::initiate_session_deactivation(Session* sess)
339
{
9,606✔
340
    REALM_ASSERT(sess);
9,606✔
341
    REALM_ASSERT(&sess->m_conn == this);
9,606✔
342
    REALM_ASSERT(m_num_active_sessions);
9,606✔
343
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,626✔
344
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,626✔
345
    // decrementing the num active sessions value.
4,626✔
346
    sess->initiate_deactivation(); // Throws
9,606✔
347
    if (sess->m_state == Session::Deactivated) {
9,606✔
348
        finish_session_deactivation(sess);
970✔
349
    }
970✔
350
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,606✔
351
        if (m_activated && m_state == ConnectionState::disconnected)
3,954✔
352
            m_on_idle->trigger();
338✔
353
    }
3,954✔
354
}
9,606✔
355

356

357
void Connection::cancel_reconnect_delay()
358
{
1,948✔
359
    REALM_ASSERT(m_activated);
1,948✔
360

1,110✔
361
    if (m_reconnect_delay_in_progress) {
1,948✔
362
        if (m_nonzero_reconnect_delay)
1,736✔
363
            logger.detail("Canceling reconnect delay"); // Throws
872✔
364

1,004✔
365
        // Cancel the in-progress wait operation by destroying the timer
1,004✔
366
        // object. Destruction is needed in this case, because a new wait
1,004✔
367
        // operation might have to be initiated before the previous one
1,004✔
368
        // completes (its completion handler starts to execute), so the new wait
1,004✔
369
        // operation must be done on a new timer object.
1,004✔
370
        m_reconnect_disconnect_timer.reset();
1,736✔
371
        m_reconnect_delay_in_progress = false;
1,736✔
372
        m_reconnect_info.reset();
1,736✔
373
        initiate_reconnect_wait(); // Throws
1,736✔
374
        return;
1,736✔
375
    }
1,736✔
376

106✔
377
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
378
    // that we are allowed to re-connect as quickly as possible.
106✔
379
    //
106✔
380
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
106✔
381
    // backoff/delay state before calculating the next delay, unless a PONG message is received
106✔
382
    // for the urgent PING message we send below.
106✔
383
    //
106✔
384
    // If we get a PONG message for the urgent PING message sent below, then the connection is
106✔
385
    // healthy and we can calculate the next delay normally.
106✔
386
    if (m_state != ConnectionState::disconnected) {
212✔
387
        m_reconnect_info.scheduled_reset = true;
212✔
388
        m_ping_after_scheduled_reset_of_reconnect_info = false;
212✔
389

106✔
390
        schedule_urgent_ping(); // Throws
212✔
391
        return;
212✔
392
    }
212✔
393
    // Nothing to do in this case. The next reconnect attemp will be made as
106✔
394
    // soon as there are any sessions that are both active and unsuspended.
106✔
395
}
212✔
396

397
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
398
{
7,842✔
399
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,842✔
400
    auto ident = sess->m_ident;
7,842✔
401
    m_sessions.erase(ident);
7,842✔
402
    m_session_history.erase(ident);
7,842✔
403
}
7,842✔
404

405
void Connection::force_close()
406
{
2,250✔
407
    if (m_force_closed) {
2,250✔
408
        return;
×
409
    }
×
410

1,058✔
411
    m_force_closed = true;
2,250✔
412

1,058✔
413
    if (m_state != ConnectionState::disconnected) {
2,250✔
414
        voluntary_disconnect();
2,158✔
415
    }
2,158✔
416

1,058✔
417
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,250✔
418
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,250✔
419
        m_reconnect_disconnect_timer.reset();
92✔
420
        m_reconnect_delay_in_progress = false;
92✔
421
        m_disconnect_delay_in_progress = false;
92✔
422
    }
92✔
423

1,058✔
424
    // We must copy any session pointers we want to close to a vector because force_closing
1,058✔
425
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,058✔
426
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,058✔
427
    std::vector<Session*> to_close;
2,250✔
428
    for (auto& session_pair : m_sessions) {
1,136✔
429
        if (session_pair.second->m_state == Session::State::Active) {
154✔
430
            to_close.push_back(session_pair.second.get());
154✔
431
        }
154✔
432
    }
154✔
433

1,058✔
434
    for (auto& sess : to_close) {
1,136✔
435
        sess->force_close();
154✔
436
    }
154✔
437

1,058✔
438
    logger.debug("Force closed idle connection");
2,250✔
439
}
2,250✔
440

441

442
void Connection::websocket_connected_handler(const std::string& protocol)
443
{
3,178✔
444
    if (!protocol.empty()) {
3,178✔
445
        std::string_view expected_prefix =
3,178✔
446
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,912✔
447
        // FIXME: Use std::string_view::begins_with() in C++20.
1,588✔
448
        auto prefix_matches = [&](std::string_view other) {
3,178✔
449
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,178✔
450
        };
3,178✔
451
        if (prefix_matches(expected_prefix)) {
3,178✔
452
            util::MemoryInputStream in;
3,178✔
453
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,178✔
454
            in.imbue(std::locale::classic());
3,178✔
455
            in.unsetf(std::ios_base::skipws);
3,178✔
456
            int value_2 = 0;
3,178✔
457
            in >> value_2;
3,178✔
458
            if (in && in.eof() && value_2 >= 0) {
3,178✔
459
                bool good_version =
3,178✔
460
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,178✔
461
                if (good_version) {
3,178✔
462
                    logger.detail("Negotiated protocol version: %1", value_2);
3,178✔
463
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,588✔
464
                    // will provide the appservices connection ID via a log message.
1,588✔
465
                    // TODO: Remove once the server starts sending the connection ID
1,588✔
466
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,178✔
467
                    m_negotiated_protocol_version = value_2;
3,178✔
468
                    handle_connection_established(); // Throws
3,178✔
469
                    return;
3,178✔
470
                }
3,178✔
471
            }
×
472
        }
3,178✔
473
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
474
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
475
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
476
    }
×
477
    else {
×
478
        close_due_to_client_side_error(
×
479
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
480
            ConnectionTerminationReason::bad_headers_in_http_response);
×
481
    }
×
482
}
3,178✔
483

484

485
bool Connection::websocket_binary_message_received(util::Span<const char> data)
486
{
74,388✔
487
    if (m_force_closed) {
74,388✔
488
        logger.debug("Received binary message after connection was force closed");
×
489
        return false;
×
490
    }
×
491

38,612✔
492
    using sf = SimulatedFailure;
74,388✔
493
    if (sf::check_trigger(sf::sync_client__read_head)) {
74,388✔
494
        close_due_to_client_side_error(
374✔
495
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
374✔
496
            ConnectionTerminationReason::read_or_write_error);
374✔
497
        return bool(m_websocket);
374✔
498
    }
374✔
499

38,384✔
500
    handle_message_received(data);
74,014✔
501
    return bool(m_websocket);
74,014✔
502
}
74,014✔
503

504

505
void Connection::websocket_error_handler()
506
{
584✔
507
    m_websocket_error_received = true;
584✔
508
}
584✔
509

510
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
511
{
712✔
512
    if (m_force_closed) {
712✔
513
        logger.debug("Received websocket close message after connection was force closed");
×
514
        return false;
×
515
    }
×
516
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
712✔
517

384✔
518
    switch (error_code) {
712✔
519
        case WebSocketError::websocket_ok:
76✔
520
            break;
76✔
521
        case WebSocketError::websocket_resolve_failed:
4✔
522
            [[fallthrough]];
4✔
523
        case WebSocketError::websocket_connection_failed: {
4✔
524
            SessionErrorInfo error_info(
4✔
525
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
4✔
526
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
4✔
527
            break;
4✔
528
        }
4✔
529
        case WebSocketError::websocket_read_error:
570✔
530
            [[fallthrough]];
570✔
531
        case WebSocketError::websocket_write_error: {
570✔
532
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
570✔
533
                                         ConnectionTerminationReason::read_or_write_error);
570✔
534
            break;
570✔
535
        }
570✔
536
        case WebSocketError::websocket_going_away:
312✔
537
            [[fallthrough]];
×
538
        case WebSocketError::websocket_protocol_error:
✔
539
            [[fallthrough]];
×
540
        case WebSocketError::websocket_unsupported_data:
✔
541
            [[fallthrough]];
×
542
        case WebSocketError::websocket_invalid_payload_data:
✔
543
            [[fallthrough]];
×
544
        case WebSocketError::websocket_policy_violation:
✔
545
            [[fallthrough]];
×
546
        case WebSocketError::websocket_reserved:
✔
547
            [[fallthrough]];
×
548
        case WebSocketError::websocket_no_status_received:
✔
549
            [[fallthrough]];
×
550
        case WebSocketError::websocket_invalid_extension: {
✔
551
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
552
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
553
            break;
×
554
        }
×
555
        case WebSocketError::websocket_message_too_big: {
4✔
556
            auto message = util::format(
4✔
557
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
558
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
559
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
560
            involuntary_disconnect(std::move(error_info),
4✔
561
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
562
            break;
4✔
563
        }
×
564
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
565
            close_due_to_client_side_error(
10✔
566
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
567
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
568
            break;
10✔
569
        }
×
570
        case WebSocketError::websocket_client_too_old:
✔
571
            [[fallthrough]];
×
572
        case WebSocketError::websocket_client_too_new:
✔
573
            [[fallthrough]];
×
574
        case WebSocketError::websocket_protocol_mismatch: {
✔
575
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
576
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
577
            break;
×
578
        }
×
579
        case WebSocketError::websocket_fatal_error: {
✔
580
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
581
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
582
            break;
×
583
        }
×
584
        case WebSocketError::websocket_forbidden: {
✔
585
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
586
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
587
            involuntary_disconnect(std::move(error_info),
×
588
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
589
            break;
×
590
        }
×
591
        case WebSocketError::websocket_unauthorized: {
36✔
592
            SessionErrorInfo error_info(
36✔
593
                {ErrorCodes::AuthError,
36✔
594
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
595
                IsFatal{false});
36✔
596
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
597
            involuntary_disconnect(std::move(error_info),
36✔
598
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
599
            break;
36✔
600
        }
×
601
        case WebSocketError::websocket_moved_permanently: {
12✔
602
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
603
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
604
            involuntary_disconnect(std::move(error_info),
12✔
605
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
606
            break;
12✔
607
        }
×
608
        case WebSocketError::websocket_abnormal_closure: {
✔
609
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
610
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
611
            involuntary_disconnect(std::move(error_info),
×
612
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
613
            break;
×
614
        }
×
615
        case WebSocketError::websocket_internal_server_error:
✔
616
            [[fallthrough]];
×
617
        case WebSocketError::websocket_retry_error: {
✔
618
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
619
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
620
            break;
×
621
        }
712✔
622
    }
712✔
623

384✔
624
    return bool(m_websocket);
712✔
625
}
712✔
626

627
// Guarantees that handle_reconnect_wait() is never called from within the
628
// execution of initiate_reconnect_wait() (no callback reentrance).
629
void Connection::initiate_reconnect_wait()
630
{
7,490✔
631
    REALM_ASSERT(m_activated);
7,490✔
632
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,490✔
633
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,490✔
634

3,810✔
635
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,810✔
636
    if (m_force_closed) {
7,490✔
637
        return;
2,158✔
638
    }
2,158✔
639

2,798✔
640
    m_reconnect_delay_in_progress = true;
5,332✔
641
    auto delay = m_reconnect_info.delay_interval();
5,332✔
642
    if (delay == std::chrono::milliseconds::max()) {
5,332✔
643
        logger.detail("Reconnection delayed indefinitely"); // Throws
920✔
644
        // Not actually starting a timer corresponds to an infinite wait
528✔
645
        m_nonzero_reconnect_delay = true;
920✔
646
        return;
920✔
647
    }
920✔
648

2,270✔
649
    if (delay == std::chrono::milliseconds::zero()) {
4,412✔
650
        m_nonzero_reconnect_delay = false;
4,190✔
651
    }
4,190✔
652
    else {
222✔
653
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
222✔
654
        m_nonzero_reconnect_delay = true;
222✔
655
    }
222✔
656

2,270✔
657
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,270✔
658
    // we need it to be cancelable in case the connection is terminated before the timer
2,270✔
659
    // callback is run.
2,270✔
660
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,414✔
661
        // If the operation is aborted, the connection object may have been
2,272✔
662
        // destroyed.
2,272✔
663
        if (status != ErrorCodes::OperationAborted)
4,414✔
664
            handle_reconnect_wait(status); // Throws
3,298✔
665
    });                                    // Throws
4,414✔
666
}
4,412✔
667

668

669
void Connection::handle_reconnect_wait(Status status)
670
{
3,298✔
671
    if (!status.is_ok()) {
3,298✔
672
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
673
        throw Exception(status);
×
674
    }
×
675

1,650✔
676
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,298✔
677
    m_reconnect_delay_in_progress = false;
3,298✔
678

1,650✔
679
    if (m_num_active_unsuspended_sessions > 0)
3,298✔
680
        initiate_reconnect(); // Throws
3,298✔
681
}
3,298✔
682

683
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
684
    explicit WebSocketObserverShim(Connection* conn)
685
        : conn(conn)
686
        , sentinel(conn->m_websocket_sentinel)
687
    {
3,298✔
688
    }
3,298✔
689

690
    Connection* conn;
691
    util::bind_ptr<LifecycleSentinel> sentinel;
692

693
    void websocket_connected_handler(const std::string& protocol) override
694
    {
3,178✔
695
        if (sentinel->destroyed) {
3,178✔
696
            return;
×
697
        }
×
698

1,588✔
699
        return conn->websocket_connected_handler(protocol);
3,178✔
700
    }
3,178✔
701

702
    void websocket_error_handler() override
703
    {
584✔
704
        if (sentinel->destroyed) {
584✔
705
            return;
×
706
        }
×
707

320✔
708
        conn->websocket_error_handler();
584✔
709
    }
584✔
710

711
    bool websocket_binary_message_received(util::Span<const char> data) override
712
    {
74,382✔
713
        if (sentinel->destroyed) {
74,382✔
714
            return false;
×
715
        }
×
716

38,610✔
717
        return conn->websocket_binary_message_received(data);
74,382✔
718
    }
74,382✔
719

720
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
721
    {
712✔
722
        if (sentinel->destroyed) {
712✔
723
            return true;
×
724
        }
×
725

384✔
726
        return conn->websocket_closed_handler(was_clean, error_code, msg);
712✔
727
    }
712✔
728
};
729

730
void Connection::initiate_reconnect()
731
{
3,298✔
732
    REALM_ASSERT(m_activated);
3,298✔
733

1,650✔
734
    m_state = ConnectionState::connecting;
3,298✔
735
    report_connection_state_change(ConnectionState::connecting); // Throws
3,298✔
736
    if (m_websocket_sentinel) {
3,298✔
737
        m_websocket_sentinel->destroyed = true;
×
738
    }
×
739
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,298✔
740
    m_websocket.reset();
3,298✔
741

1,650✔
742
    // Watchdog
1,650✔
743
    initiate_connect_wait(); // Throws
3,298✔
744

1,650✔
745
    std::vector<std::string> sec_websocket_protocol;
3,298✔
746
    {
3,298✔
747
        auto protocol_prefix =
3,298✔
748
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,026✔
749
        int min = get_oldest_supported_protocol_version();
3,298✔
750
        int max = get_current_protocol_version();
3,298✔
751
        REALM_ASSERT_3(min, <=, max);
3,298✔
752
        // List protocol version in descending order to ensure that the server
1,650✔
753
        // selects the highest possible version.
1,650✔
754
        for (int version = max; version >= min; --version) {
32,968✔
755
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
29,670✔
756
        }
29,670✔
757
    }
3,298✔
758

1,650✔
759
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,298✔
760
                m_server_endpoint.port, m_http_request_path_prefix);
3,298✔
761

1,650✔
762
    m_websocket_error_received = false;
3,298✔
763
    m_websocket =
3,298✔
764
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,298✔
765
                                            WebSocketEndpoint{
3,298✔
766
                                                m_server_endpoint.address,
3,298✔
767
                                                m_server_endpoint.port,
3,298✔
768
                                                get_http_request_path(),
3,298✔
769
                                                std::move(sec_websocket_protocol),
3,298✔
770
                                                is_ssl(m_server_endpoint.envelope),
3,298✔
771
                                                /// DEPRECATED - The following will be removed in a future release
1,650✔
772
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,298✔
773
                                                m_verify_servers_ssl_certificate,
3,298✔
774
                                                m_ssl_trust_certificate_path,
3,298✔
775
                                                m_ssl_verify_callback,
3,298✔
776
                                                m_proxy_config,
3,298✔
777
                                            });
3,298✔
778
}
3,298✔
779

780

781
void Connection::initiate_connect_wait()
782
{
3,296✔
783
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,650✔
784
    // fully establish the connection (including SSL and WebSocket
1,650✔
785
    // handshakes). Without such a watchdog, connect operations could take very
1,650✔
786
    // long, or even indefinite time.
1,650✔
787
    milliseconds_type time = m_client.m_connect_timeout;
3,296✔
788

1,650✔
789
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,298✔
790
        // If the operation is aborted, the connection object may have been
1,650✔
791
        // destroyed.
1,650✔
792
        if (status != ErrorCodes::OperationAborted)
3,298✔
793
            handle_connect_wait(status); // Throws
×
794
    });                                  // Throws
3,298✔
795
}
3,296✔
796

797

798
void Connection::handle_connect_wait(Status status)
799
{
×
800
    if (!status.is_ok()) {
×
801
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
802
        throw Exception(status);
×
803
    }
×
804

805
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
806
    logger.info("Connect timeout"); // Throws
×
807
    involuntary_disconnect(
×
808
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
809
                         IsFatal{false}},
×
810
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
811
}
×
812

813

814
void Connection::handle_connection_established()
815
{
3,178✔
816
    // Cancel connect timeout watchdog
1,588✔
817
    m_connect_timer.reset();
3,178✔
818

1,588✔
819
    m_state = ConnectionState::connected;
3,178✔
820

1,588✔
821
    milliseconds_type now = monotonic_clock_now();
3,178✔
822
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,178✔
823
    initiate_ping_delay(now);     // Throws
3,178✔
824

1,588✔
825
    bool fast_reconnect = false;
3,178✔
826
    if (m_disconnect_has_occurred) {
3,178✔
827
        milliseconds_type time = now - m_disconnect_time;
928✔
828
        if (time <= m_client.m_fast_reconnect_limit)
928✔
829
            fast_reconnect = true;
928✔
830
    }
928✔
831

1,588✔
832
    for (auto& p : m_sessions) {
4,208✔
833
        Session& sess = *p.second;
4,208✔
834
        sess.connection_established(fast_reconnect); // Throws
4,208✔
835
    }
4,208✔
836

1,588✔
837
    report_connection_state_change(ConnectionState::connected); // Throws
3,178✔
838
}
3,178✔
839

840

841
void Connection::schedule_urgent_ping()
842
{
212✔
843
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
212✔
844
    if (m_ping_delay_in_progress) {
212✔
845
        m_heartbeat_timer.reset();
194✔
846
        m_ping_delay_in_progress = false;
194✔
847
        m_minimize_next_ping_delay = true;
194✔
848
        milliseconds_type now = monotonic_clock_now();
194✔
849
        initiate_ping_delay(now); // Throws
194✔
850
        return;
194✔
851
    }
194✔
852
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
18!
853
    if (!m_send_ping)
18!
854
        m_minimize_next_ping_delay = true;
18✔
855
}
18✔
856

857

858
void Connection::initiate_ping_delay(milliseconds_type now)
859
{
3,558✔
860
    REALM_ASSERT(!m_ping_delay_in_progress);
3,558✔
861
    REALM_ASSERT(!m_waiting_for_pong);
3,558✔
862
    REALM_ASSERT(!m_send_ping);
3,558✔
863

1,740✔
864
    milliseconds_type delay = 0;
3,558✔
865
    if (!m_minimize_next_ping_delay) {
3,558✔
866
        delay = m_client.m_ping_keepalive_period;
3,360✔
867
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,648✔
868
        // the first PING message to be sent since the connection was
1,648✔
869
        // established. The purpose of this randomized deduction is to reduce
1,648✔
870
        // the risk of many connections sending PING messages simultaneously to
1,648✔
871
        // the server.
1,648✔
872
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,238✔
873
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,360✔
874
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,360✔
875
        delay -= randomized_deduction;
3,360✔
876
        // Deduct the time spent waiting for PONG
1,648✔
877
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,360✔
878
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,360✔
879
        if (spent_time < delay) {
3,360✔
880
            delay -= spent_time;
3,352✔
881
        }
3,352✔
882
        else {
8✔
883
            delay = 0;
8✔
884
        }
8✔
885
    }
3,360✔
886
    else {
198✔
887
        m_minimize_next_ping_delay = false;
198✔
888
    }
198✔
889

1,740✔
890

1,740✔
891
    m_ping_delay_in_progress = true;
3,558✔
892

1,740✔
893
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,558✔
894
        if (status == ErrorCodes::OperationAborted)
3,558✔
895
            return;
3,350✔
896
        else if (!status.is_ok())
208✔
897
            throw Exception(status);
×
898

76✔
899
        handle_ping_delay();                                    // Throws
208✔
900
    });                                                         // Throws
208✔
901
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,558✔
902
}
3,558✔
903

904

905
void Connection::handle_ping_delay()
906
{
208✔
907
    REALM_ASSERT(m_ping_delay_in_progress);
208✔
908
    m_ping_delay_in_progress = false;
208✔
909
    m_send_ping = true;
208✔
910

76✔
911
    initiate_pong_timeout(); // Throws
208✔
912

76✔
913
    if (m_state == ConnectionState::connected && !m_sending)
208✔
914
        send_next_message(); // Throws
174✔
915
}
208✔
916

917

918
void Connection::initiate_pong_timeout()
919
{
208✔
920
    REALM_ASSERT(!m_ping_delay_in_progress);
208✔
921
    REALM_ASSERT(!m_waiting_for_pong);
208✔
922
    REALM_ASSERT(m_send_ping);
208✔
923

76✔
924
    m_waiting_for_pong = true;
208✔
925
    m_pong_wait_started_at = monotonic_clock_now();
208✔
926

76✔
927
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
208✔
928
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
208✔
929
        if (status == ErrorCodes::OperationAborted)
208✔
930
            return;
196✔
931
        else if (!status.is_ok())
12✔
932
            throw Exception(status);
×
933

6✔
934
        handle_pong_timeout(); // Throws
12✔
935
    });                        // Throws
12✔
936
}
208✔
937

938

939
void Connection::handle_pong_timeout()
940
{
12✔
941
    REALM_ASSERT(m_waiting_for_pong);
12✔
942
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
943
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
944
                                 ConnectionTerminationReason::pong_timeout);
12✔
945
}
12✔
946

947

948
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
949
{
93,946✔
950
    // Stop sending messages if an websocket error was received.
46,940✔
951
    if (m_websocket_error_received)
93,946✔
952
        return;
×
953

46,940✔
954
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
93,946✔
955
        if (sentinel->destroyed) {
93,840✔
956
            return;
1,432✔
957
        }
1,432✔
958
        if (!status.is_ok()) {
92,408✔
959
            if (status != ErrorCodes::Error::OperationAborted) {
×
960
                // Write errors will be handled by the websocket_write_error_handler() callback
961
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
962
            }
×
963
            return;
×
964
        }
×
965
        handle_write_message(); // Throws
92,408✔
966
    });                         // Throws
92,408✔
967
    m_sending_session = sess;
93,946✔
968
    m_sending = true;
93,946✔
969
}
93,946✔
970

971

972
void Connection::handle_write_message()
973
{
92,412✔
974
    m_sending_session->message_sent(); // Throws
92,412✔
975
    if (m_sending_session->m_state == Session::Deactivated) {
92,412✔
976
        finish_session_deactivation(m_sending_session);
102✔
977
    }
102✔
978
    m_sending_session = nullptr;
92,412✔
979
    m_sending = false;
92,412✔
980
    send_next_message(); // Throws
92,412✔
981
}
92,412✔
982

983

984
void Connection::send_next_message()
985
{
151,072✔
986
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
151,072✔
987
    REALM_ASSERT(!m_sending_session);
151,072✔
988
    REALM_ASSERT(!m_sending);
151,072✔
989
    if (m_send_ping) {
151,072✔
990
        send_ping(); // Throws
196✔
991
        return;
196✔
992
    }
196✔
993
    while (!m_sessions_enlisted_to_send.empty()) {
211,956✔
994
        // The state of being connected is not supposed to be able to change
77,784✔
995
        // across this loop thanks to the "no callback reentrance" guarantee
77,784✔
996
        // provided by Websocket::async_write_text(), and friends.
77,784✔
997
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
155,230✔
998

77,784✔
999
        Session& sess = *m_sessions_enlisted_to_send.front();
155,230✔
1000
        m_sessions_enlisted_to_send.pop_front();
155,230✔
1001
        sess.send_message(); // Throws
155,230✔
1002

77,784✔
1003
        if (sess.m_state == Session::Deactivated) {
155,230✔
1004
            finish_session_deactivation(&sess);
2,660✔
1005
        }
2,660✔
1006

77,784✔
1007
        // An enlisted session may choose to not send a message. In that case,
77,784✔
1008
        // we should pass the opportunity to the next enlisted session.
77,784✔
1009
        if (m_sending)
155,230✔
1010
            break;
94,150✔
1011
    }
155,230✔
1012
}
150,876✔
1013

1014

1015
void Connection::send_ping()
1016
{
196✔
1017
    REALM_ASSERT(!m_ping_delay_in_progress);
196✔
1018
    REALM_ASSERT(m_waiting_for_pong);
196✔
1019
    REALM_ASSERT(m_send_ping);
196✔
1020

70✔
1021
    m_send_ping = false;
196✔
1022
    if (m_reconnect_info.scheduled_reset)
196✔
1023
        m_ping_after_scheduled_reset_of_reconnect_info = true;
156✔
1024

70✔
1025
    m_last_ping_sent_at = monotonic_clock_now();
196✔
1026
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
196✔
1027
                 m_previous_ping_rtt); // Throws
196✔
1028

70✔
1029
    ClientProtocol& protocol = get_client_protocol();
196✔
1030
    OutputBuffer& out = get_output_buffer();
196✔
1031
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
196✔
1032
    initiate_write_ping(out);                                          // Throws
196✔
1033
    m_ping_sent = true;
196✔
1034
}
196✔
1035

1036

1037
void Connection::initiate_write_ping(const OutputBuffer& out)
1038
{
196✔
1039
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
196✔
1040
        if (sentinel->destroyed) {
196✔
1041
            return;
2✔
1042
        }
2✔
1043
        if (!status.is_ok()) {
194✔
1044
            if (status != ErrorCodes::Error::OperationAborted) {
×
1045
                // Write errors will be handled by the websocket_write_error_handler() callback
1046
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1047
            }
×
1048
            return;
×
1049
        }
×
1050
        handle_write_ping(); // Throws
194✔
1051
    });                      // Throws
194✔
1052
    m_sending = true;
196✔
1053
}
196✔
1054

1055

1056
void Connection::handle_write_ping()
1057
{
194✔
1058
    REALM_ASSERT(m_sending);
194✔
1059
    REALM_ASSERT(!m_sending_session);
194✔
1060
    m_sending = false;
194✔
1061
    send_next_message(); // Throws
194✔
1062
}
194✔
1063

1064

1065
void Connection::handle_message_received(util::Span<const char> data)
1066
{
74,012✔
1067
    // parse_message_received() parses the message and calls the proper handler
38,382✔
1068
    // on the Connection object (this).
38,382✔
1069
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
74,012✔
1070
}
74,012✔
1071

1072

1073
void Connection::initiate_disconnect_wait()
1074
{
4,290✔
1075
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,290✔
1076

2,040✔
1077
    if (m_disconnect_delay_in_progress) {
4,290✔
1078
        m_reconnect_disconnect_timer.reset();
2,074✔
1079
        m_disconnect_delay_in_progress = false;
2,074✔
1080
    }
2,074✔
1081

2,040✔
1082
    milliseconds_type time = m_client.m_connection_linger_time;
4,290✔
1083

2,040✔
1084
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,290✔
1085
        // If the operation is aborted, the connection object may have been
2,040✔
1086
        // destroyed.
2,040✔
1087
        if (status != ErrorCodes::OperationAborted)
4,290✔
1088
            handle_disconnect_wait(status); // Throws
12✔
1089
    });                                     // Throws
4,290✔
1090
    m_disconnect_delay_in_progress = true;
4,290✔
1091
}
4,290✔
1092

1093

1094
void Connection::handle_disconnect_wait(Status status)
1095
{
12✔
1096
    if (!status.is_ok()) {
12✔
1097
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1098
        throw Exception(status);
×
1099
    }
×
1100

6✔
1101
    m_disconnect_delay_in_progress = false;
12✔
1102

6✔
1103
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1104
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1105
        if (m_client.m_connection_linger_time > 0)
12✔
1106
            logger.detail("Linger time expired"); // Throws
×
1107
        voluntary_disconnect();                   // Throws
12✔
1108
        logger.info("Disconnected");              // Throws
12✔
1109
    }
12✔
1110
}
12✔
1111

1112

1113
void Connection::close_due_to_protocol_error(Status status)
1114
{
4✔
1115
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
4✔
1116
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
4✔
1117
    involuntary_disconnect(std::move(error_info),
4✔
1118
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
4✔
1119
}
4✔
1120

1121

1122
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1123
{
384✔
1124
    logger.info("Connection closed due to error: %1", status); // Throws
384✔
1125

234✔
1126
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
384✔
1127
}
384✔
1128

1129

1130
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1131
{
582✔
1132
    logger.info("Connection closed due to transient error: %1", status); // Throws
582✔
1133
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
582✔
1134
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
582✔
1135

318✔
1136
    involuntary_disconnect(std::move(error_info), reason); // Throw
582✔
1137
}
582✔
1138

1139

1140
// Close connection due to error discovered on the server-side, and then
1141
// reported to the client by way of a connection-level ERROR message.
1142
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1143
{
66✔
1144
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1145
                int(error_code)); // Throws
66✔
1146

32✔
1147
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
42✔
1148
                                      : ConnectionTerminationReason::server_said_try_again_later;
56✔
1149
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1150
                           reason); // Throws
66✔
1151
}
66✔
1152

1153

1154
void Connection::disconnect(const SessionErrorInfo& info)
1155
{
3,298✔
1156
    // Cancel connect timeout watchdog
1,650✔
1157
    m_connect_timer.reset();
3,298✔
1158

1,650✔
1159
    if (m_state == ConnectionState::connected) {
3,298✔
1160
        m_disconnect_time = monotonic_clock_now();
3,178✔
1161
        m_disconnect_has_occurred = true;
3,178✔
1162

1,588✔
1163
        // Sessions that are in the Deactivating state at this time can be
1,588✔
1164
        // immediately discarded, in part because they are no longer enlisted to
1,588✔
1165
        // send. Such sessions will be taken to the Deactivated state by
1,588✔
1166
        // Session::connection_lost(), and then they will be removed from
1,588✔
1167
        // `m_sessions`.
1,588✔
1168
        auto i = m_sessions.begin(), end = m_sessions.end();
3,178✔
1169
        while (i != end) {
6,920✔
1170
            // Prevent invalidation of the main iterator when erasing elements
2,082✔
1171
            auto j = i++;
3,742✔
1172
            Session& sess = *j->second;
3,742✔
1173
            sess.connection_lost(); // Throws
3,742✔
1174
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,742✔
1175
                m_sessions.erase(j);
1,764✔
1176
        }
3,742✔
1177
    }
3,178✔
1178

1,650✔
1179
    change_state_to_disconnected();
3,298✔
1180

1,650✔
1181
    m_ping_delay_in_progress = false;
3,298✔
1182
    m_waiting_for_pong = false;
3,298✔
1183
    m_send_ping = false;
3,298✔
1184
    m_minimize_next_ping_delay = false;
3,298✔
1185
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,298✔
1186
    m_ping_sent = false;
3,298✔
1187
    m_heartbeat_timer.reset();
3,298✔
1188
    m_previous_ping_rtt = 0;
3,298✔
1189

1,650✔
1190
    m_websocket_sentinel->destroyed = true;
3,298✔
1191
    m_websocket_sentinel.reset();
3,298✔
1192
    m_websocket.reset();
3,298✔
1193
    m_input_body_buffer.reset();
3,298✔
1194
    m_sending_session = nullptr;
3,298✔
1195
    m_sessions_enlisted_to_send.clear();
3,298✔
1196
    m_sending = false;
3,298✔
1197

1,650✔
1198
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,298✔
1199
    initiate_reconnect_wait();                                           // Throws
3,298✔
1200
}
3,298✔
1201

1202
bool Connection::is_flx_sync_connection() const noexcept
1203
{
104,204✔
1204
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
104,204✔
1205
}
104,204✔
1206

1207
void Connection::receive_pong(milliseconds_type timestamp)
1208
{
186✔
1209
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
186✔
1210

64✔
1211
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
186✔
1212
    if (REALM_UNLIKELY(!legal_at_this_time)) {
186✔
1213
        close_due_to_protocol_error(
×
1214
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1215
        return;
×
1216
    }
×
1217

64✔
1218
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
186✔
1219
        close_due_to_protocol_error(
×
1220
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1221
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1222
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1223
        return;
×
1224
    }
×
1225

64✔
1226
    milliseconds_type now = monotonic_clock_now();
186✔
1227
    milliseconds_type round_trip_time = now - timestamp;
186✔
1228
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
186✔
1229
    m_previous_ping_rtt = round_trip_time;
186✔
1230

64✔
1231
    // If this PONG message is a response to a PING mesage that was sent after
64✔
1232
    // the last invocation of cancel_reconnect_delay(), then the connection is
64✔
1233
    // still good, and we do not have to skip the next reconnect delay.
64✔
1234
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
186✔
1235
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
148✔
1236
        m_ping_after_scheduled_reset_of_reconnect_info = false;
148✔
1237
        m_reconnect_info.scheduled_reset = false;
148✔
1238
    }
148✔
1239

64✔
1240
    m_heartbeat_timer.reset();
186✔
1241
    m_waiting_for_pong = false;
186✔
1242

64✔
1243
    initiate_ping_delay(now); // Throws
186✔
1244

64✔
1245
    if (m_client.m_roundtrip_time_handler)
186✔
1246
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1247
}
186✔
1248

1249
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1250
{
68,024✔
1251
    if (session_ident == 0) {
68,024✔
1252
        return nullptr;
×
1253
    }
×
1254

35,362✔
1255
    auto* sess = get_session(session_ident);
68,024✔
1256
    if (REALM_LIKELY(sess)) {
68,024✔
1257
        return sess;
68,024✔
1258
    }
68,024✔
1259
    // Check the history to see if the message received was for a previous session
UNCOV
1260
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
×
1261
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1262
        close_due_to_protocol_error(
×
1263
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1264
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1265
                          session_ident)});
×
1266
    }
×
UNCOV
1267
    else {
×
UNCOV
1268
        logger.error("Received %1 message for closed session, session_ident = %2", message,
×
UNCOV
1269
                     session_ident); // Throws
×
UNCOV
1270
    }
×
UNCOV
1271
    return nullptr;
×
UNCOV
1272
}
×
1273

1274
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1275
{
978✔
1276
    Session* sess = nullptr;
978✔
1277
    if (session_ident != 0) {
978✔
1278
        sess = find_and_validate_session(session_ident, "ERROR");
908✔
1279
        if (REALM_UNLIKELY(!sess)) {
908✔
1280
            return;
×
1281
        }
×
1282
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
908✔
1283
            close_due_to_protocol_error(std::move(status)); // Throws
×
1284
            return;
×
1285
        }
×
1286

472✔
1287
        if (sess->m_state == Session::Deactivated) {
908✔
UNCOV
1288
            finish_session_deactivation(sess);
×
UNCOV
1289
        }
×
1290
        return;
908✔
1291
    }
908✔
1292

34✔
1293
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1294
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1295
                info.server_requests_action); // Throws
70✔
1296

34✔
1297
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1298
    if (REALM_LIKELY(known_error_code)) {
70✔
1299
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1300
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1301
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1302
            return;
66✔
1303
        }
66✔
1304
        close_due_to_protocol_error(
×
1305
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1306
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1307
                          info.raw_error_code)});
×
1308
    }
×
1309
    else {
4✔
1310
        close_due_to_protocol_error(
4✔
1311
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1312
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1313
    }
4✔
1314
}
70✔
1315

1316

1317
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1318
                                             session_ident_type session_ident)
1319
{
16✔
1320
    if (session_ident == 0) {
16✔
1321
        return close_due_to_protocol_error(
×
1322
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1323
    }
×
1324

8✔
1325
    if (!is_flx_sync_connection()) {
16✔
1326
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1327
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1328
    }
×
1329

8✔
1330
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1331
    if (REALM_UNLIKELY(!sess)) {
16✔
1332
        return;
×
1333
    }
×
1334

8✔
1335
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1336
        close_due_to_protocol_error(std::move(status));
×
1337
    }
×
1338
}
16✔
1339

1340

1341
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1342
{
3,240✔
1343
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,240✔
1344
    if (REALM_UNLIKELY(!sess)) {
3,240✔
1345
        return;
×
1346
    }
×
1347

1,504✔
1348
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,240✔
1349
        close_due_to_protocol_error(std::move(status)); // Throws
×
1350
}
3,240✔
1351

1352
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1353
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1354
                                          DownloadBatchState batch_state,
1355
                                          const ReceivedChangesets& received_changesets)
1356
{
44,048✔
1357
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
44,048✔
1358
    if (REALM_UNLIKELY(!sess)) {
44,048✔
1359
        return;
×
1360
    }
×
1361

23,786✔
1362
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
44,048✔
1363
                                                     received_changesets);
44,048✔
1364
        !status.is_ok()) {
44,048✔
1365
        close_due_to_protocol_error(std::move(status));
×
1366
    }
×
1367
}
44,048✔
1368

1369
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1370
{
15,660✔
1371
    Session* sess = find_and_validate_session(session_ident, "MARK");
15,660✔
1372
    if (REALM_UNLIKELY(!sess)) {
15,660✔
1373
        return;
×
1374
    }
×
1375

7,744✔
1376
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
15,660✔
1377
        close_due_to_protocol_error(std::move(status)); // Throws
×
1378
}
15,660✔
1379

1380

1381
void Connection::receive_unbound_message(session_ident_type session_ident)
1382
{
4,110✔
1383
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,110✔
1384
    if (REALM_UNLIKELY(!sess)) {
4,110✔
1385
        return;
×
1386
    }
×
1387

1,828✔
1388
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,110✔
1389
        close_due_to_protocol_error(std::move(status)); // Throws
×
1390
        return;
×
1391
    }
×
1392

1,828✔
1393
    if (sess->m_state == Session::Deactivated) {
4,110✔
1394
        finish_session_deactivation(sess);
4,110✔
1395
    }
4,110✔
1396
}
4,110✔
1397

1398

1399
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1400
                                               std::string_view body)
1401
{
44✔
1402
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
44✔
1403
    if (REALM_UNLIKELY(!sess)) {
44✔
1404
        return;
×
1405
    }
×
1406

22✔
1407
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
44✔
1408
        close_due_to_protocol_error(std::move(status));
×
1409
    }
×
1410
}
44✔
1411

1412

1413
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1414
                                            std::string_view message)
1415
{
5,726✔
1416
    std::string prefix;
5,726✔
1417
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,726✔
1418
        prefix = util::format("Server[%1]", m_appservices_coid);
5,724✔
1419
    }
5,724✔
1420
    else {
2✔
1421
        prefix = "Server";
2✔
1422
    }
2✔
1423

2,916✔
1424
    if (session_ident != 0) {
5,726✔
1425
        if (auto sess = get_session(session_ident)) {
3,866✔
1426
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,866✔
1427
            return;
3,866✔
1428
        }
3,866✔
1429

NEW
1430
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
×
NEW
1431
                   message);
×
UNCOV
1432
        return;
×
UNCOV
1433
    }
×
1434

948✔
1435
    logger.log(level, "%1 log: %2", prefix, message);
1,860✔
1436
}
1,860✔
1437

1438

1439
void Connection::receive_appservices_request_id(std::string_view coid)
1440
{
5,038✔
1441
    // Only set once per connection
2,536✔
1442
    if (!coid.empty() && m_appservices_coid.empty()) {
5,038✔
1443
        m_appservices_coid = coid;
2,226✔
1444
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,226✔
1445
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,226✔
1446
    }
2,226✔
1447
}
5,038✔
1448

1449

1450
void Connection::handle_protocol_error(Status status)
1451
{
×
1452
    close_due_to_protocol_error(std::move(status));
×
1453
}
×
1454

1455

1456
// Sessions are guaranteed to be granted the opportunity to send a message in
1457
// the order that they enlist. Note that this is important to ensure
1458
// nonoverlapping communication with the server for consecutive sessions
1459
// associated with the same Realm file.
1460
//
1461
// CAUTION: The specified session may get destroyed before this function
1462
// returns, but only if its Session::send_message() puts it into the Deactivated
1463
// state.
1464
void Connection::enlist_to_send(Session* sess)
1465
{
156,850✔
1466
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
156,850✔
1467
    m_sessions_enlisted_to_send.push_back(sess); // Throws
156,850✔
1468
    if (!m_sending)
156,850✔
1469
        send_next_message(); // Throws
58,296✔
1470
}
156,850✔
1471

1472

1473
std::string Connection::get_active_appservices_connection_id()
1474
{
72✔
1475
    return m_appservices_coid;
72✔
1476
}
72✔
1477

1478
void Session::cancel_resumption_delay()
1479
{
4,068✔
1480
    REALM_ASSERT_EX(m_state == Active, m_state);
4,068✔
1481

2,326✔
1482
    if (!m_suspended)
4,068✔
1483
        return;
3,664✔
1484

222✔
1485
    m_suspended = false;
404✔
1486

222✔
1487
    logger.debug("Resumed"); // Throws
404✔
1488

222✔
1489
    if (unbind_process_complete())
404✔
1490
        initiate_rebind(); // Throws
400✔
1491

222✔
1492
    m_conn.one_more_active_unsuspended_session(); // Throws
404✔
1493

222✔
1494
    on_resumed(); // Throws
404✔
1495
}
404✔
1496

1497

1498
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1499
                                                 std::vector<ProtocolErrorInfo>* out)
1500
{
21,298✔
1501
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,298✔
1502
        return;
21,258✔
1503
    }
21,258✔
1504

20✔
1505
#ifdef REALM_DEBUG
40✔
1506
    REALM_ASSERT_DEBUG(
40✔
1507
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
40✔
1508
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
40✔
1509
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
40✔
1510
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
40✔
1511
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
40✔
1512
                       }));
40✔
1513
#endif
40✔
1514

20✔
1515
    while (!m_pending_compensating_write_errors.empty() &&
80✔
1516
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
62✔
1517
               changesets.back().version) {
40✔
1518
        auto& cur_error = m_pending_compensating_write_errors.front();
40✔
1519
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
40✔
1520
        out->push_back(std::move(cur_error));
40✔
1521
        m_pending_compensating_write_errors.pop_front();
40✔
1522
    }
40✔
1523
}
40✔
1524

1525

1526
void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& progress,
1527
                                   std::uint_fast64_t downloadable_bytes,
1528
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1529
                                   DownloadBatchState download_batch_state)
1530
{
41,998✔
1531
    auto& history = repl.get_history();
41,998✔
1532
    if (received_changesets.empty()) {
41,998✔
1533
        if (download_batch_state == DownloadBatchState::MoreToCome) {
20,672✔
1534
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1535
                                       "received empty download message that was not the last in batch",
×
1536
                                       ProtocolError::bad_progress);
×
1537
        }
×
1538
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
20,672✔
1539
        return;
20,672✔
1540
    }
20,672✔
1541

11,774✔
1542
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,326✔
1543
    auto transact = get_db()->start_read();
21,326✔
1544
    history.integrate_server_changesets(
21,326✔
1545
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,326✔
1546
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,314✔
1547
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,298✔
1548
        }); // Throws
21,298✔
1549
    if (received_changesets.size() == 1) {
21,326✔
1550
        logger.debug("1 remote changeset integrated, producing client version %1",
14,596✔
1551
                     version_info.sync_version.version); // Throws
14,596✔
1552
    }
14,596✔
1553
    else {
6,730✔
1554
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,730✔
1555
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,730✔
1556
    }
6,730✔
1557

11,774✔
1558
    for (const auto& pending_error : pending_compensating_write_errors) {
11,794✔
1559
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
40✔
1560
                    pending_error.compensating_write_rejected_client_version,
40✔
1561
                    *pending_error.compensating_write_server_version, pending_error.message);
40✔
1562
        try {
40✔
1563
            on_connection_state_changed(
40✔
1564
                m_conn.get_state(),
40✔
1565
                SessionErrorInfo{pending_error,
40✔
1566
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
40✔
1567
                                                          pending_error.message)});
40✔
1568
        }
40✔
1569
        catch (...) {
20✔
1570
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1571
        }
×
1572
    }
40✔
1573
}
21,326✔
1574

1575

1576
void Session::on_integration_failure(const IntegrationException& error)
1577
{
32✔
1578
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1579
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1580
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1581

16✔
1582
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1583
    m_error_to_send = true;
32✔
1584

16✔
1585
    // Surface the error to the user otherwise is lost.
16✔
1586
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1587

16✔
1588
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1589
    // message cannot have been sent unless an ERROR message was received.
16✔
1590
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1591
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1592
        ensure_enlisted_to_send(); // Throws
32✔
1593
    }
32✔
1594
}
32✔
1595

1596
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1597
{
43,368✔
1598
    REALM_ASSERT_EX(m_state == Active, m_state);
43,368✔
1599
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
43,368✔
1600
    m_download_progress = progress.download;
43,368✔
1601
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
43,368✔
1602
    m_progress = progress;
43,368✔
1603
    if (upload_progressed) {
43,368✔
1604
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
31,932✔
1605
            if (progress.upload.client_version > m_upload_progress.client_version)
13,072✔
1606
                m_upload_progress = progress.upload;
808✔
1607
            m_last_version_selected_for_upload = progress.upload.client_version;
13,072✔
1608
        }
13,072✔
1609

16,928✔
1610
        check_for_upload_completion();
31,932✔
1611
    }
31,932✔
1612

23,420✔
1613
    do_recognize_sync_version(client_version); // Allows upload process to resume
43,368✔
1614
    check_for_download_completion();           // Throws
43,368✔
1615

23,420✔
1616
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,420✔
1617
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
43,368✔
1618
        auto& flx_subscription_store = *get_flx_subscription_store();
2,412✔
1619
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,412✔
1620
    }
2,412✔
1621

23,420✔
1622
    // Since the deactivation process has not been initiated, the UNBIND
23,420✔
1623
    // message cannot have been sent unless an ERROR message was received.
23,420✔
1624
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
43,368✔
1625
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
43,368✔
1626
        ensure_enlisted_to_send(); // Throws
43,364✔
1627
    }
43,364✔
1628
}
43,368✔
1629

1630

1631
Session::~Session()
1632
{
9,606✔
1633
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,626✔
1634
}
9,606✔
1635

1636

1637
std::string Session::make_logger_prefix(session_ident_type ident)
1638
{
9,604✔
1639
    std::ostringstream out;
9,604✔
1640
    out.imbue(std::locale::classic());
9,604✔
1641
    out << "Session[" << ident << "]: "; // Throws
9,604✔
1642
    return out.str();                    // Throws
9,604✔
1643
}
9,604✔
1644

1645

1646
void Session::activate()
1647
{
9,602✔
1648
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,602✔
1649

4,624✔
1650
    logger.debug("Activating"); // Throws
9,602✔
1651

4,624✔
1652
    bool has_pending_client_reset = false;
9,602✔
1653
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,606✔
1654
        // The reason we need a mutable reference from get_client_reset_config() is because we
4,626✔
1655
        // don't want the session to keep a strong reference to the client_reset_config->fresh_copy
4,626✔
1656
        // DB. If it did, then the fresh DB would stay alive for the duration of this sync session
4,626✔
1657
        // and we want to clean it up once the reset is finished. Additionally, the fresh copy will
4,626✔
1658
        // be set to a new copy on every reset so there is no reason to keep a reference to it.
4,626✔
1659
        // The modification to the client reset config happens via std::move(client_reset_config->fresh_copy).
4,626✔
1660
        // If the client reset config were a `const &` then this std::move would create another strong
4,626✔
1661
        // reference which we don't want to happen.
4,626✔
1662
        util::Optional<ClientReset>& client_reset_config = get_client_reset_config();
9,606✔
1663

4,626✔
1664
        bool file_exists = util::File::exists(get_realm_path());
9,606✔
1665

4,626✔
1666
        logger.info("client_reset_config = %1, Realm exists = %2, "
9,606✔
1667
                    "client reset = %3",
9,606✔
1668
                    client_reset_config ? "true" : "false", file_exists ? "true" : "false",
9,600✔
1669
                    (client_reset_config && file_exists) ? "true" : "false"); // Throws
9,606✔
1670
        if (client_reset_config && !m_client_reset_operation) {
9,606✔
1671
            m_client_reset_operation = std::make_unique<_impl::ClientResetOperation>(
336✔
1672
                logger, get_db(), std::move(client_reset_config->fresh_copy), client_reset_config->mode,
336✔
1673
                std::move(client_reset_config->notify_before_client_reset),
336✔
1674
                std::move(client_reset_config->notify_after_client_reset),
336✔
1675
                client_reset_config->recovery_is_allowed); // Throws
336✔
1676
        }
336✔
1677

4,626✔
1678
        if (!m_client_reset_operation) {
9,606✔
1679
            const ClientReplication& repl = access_realm(); // Throws
9,270✔
1680
            repl.get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,270✔
1681
                                          &has_pending_client_reset); // Throws
9,270✔
1682
        }
9,270✔
1683
    }
9,606✔
1684
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,602✔
1685
                 m_client_file_ident.salt); // Throws
9,602✔
1686
    m_upload_progress = m_progress.upload;
9,602✔
1687
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,602✔
1688
    m_download_progress = m_progress.download;
9,602✔
1689
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,602✔
1690

4,624✔
1691
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,602✔
1692
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,602✔
1693
    logger.debug("progress_download_client_version = %1",
9,602✔
1694
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,602✔
1695
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,602✔
1696
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,602✔
1697

4,624✔
1698
    reset_protocol_state();
9,602✔
1699
    m_state = Active;
9,602✔
1700

4,624✔
1701
    REALM_ASSERT(!m_suspended);
9,602✔
1702
    m_conn.one_more_active_unsuspended_session(); // Throws
9,602✔
1703

4,624✔
1704
    try {
9,602✔
1705
        process_pending_flx_bootstrap();
9,602✔
1706
    }
9,602✔
1707
    catch (const IntegrationException& error) {
4,626✔
1708
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1709
        m_suspended = true;
4✔
1710
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1711
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
4✔
1712
    }
4✔
1713

4,624✔
1714
    if (has_pending_client_reset) {
9,606✔
1715
        handle_pending_client_reset_acknowledgement();
20✔
1716
    }
20✔
1717
}
9,606✔
1718

1719

1720
// The caller (Connection) must discard the session if the session has become
1721
// deactivated upon return.
1722
void Session::initiate_deactivation()
1723
{
9,604✔
1724
    REALM_ASSERT_EX(m_state == Active, m_state);
9,604✔
1725

4,626✔
1726
    logger.debug("Initiating deactivation"); // Throws
9,604✔
1727

4,626✔
1728
    m_state = Deactivating;
9,604✔
1729

4,626✔
1730
    if (!m_suspended)
9,604✔
1731
        m_conn.one_less_active_unsuspended_session(); // Throws
9,082✔
1732

4,626✔
1733
    if (m_enlisted_to_send) {
9,604✔
1734
        REALM_ASSERT(!unbind_process_complete());
4,806✔
1735
        return;
4,806✔
1736
    }
4,806✔
1737

2,216✔
1738
    // Deactivate immediately if the BIND message has not yet been sent and the
2,216✔
1739
    // session is not enlisted to send, or if the unbinding process has already
2,216✔
1740
    // completed.
2,216✔
1741
    if (!m_bind_message_sent || unbind_process_complete()) {
4,798✔
1742
        complete_deactivation(); // Throws
970✔
1743
        // Life cycle state is now Deactivated
448✔
1744
        return;
970✔
1745
    }
970✔
1746

1,768✔
1747
    // Ready to send the UNBIND message, if it has not already been sent
1,768✔
1748
    if (!m_unbind_message_sent) {
3,828✔
1749
        enlist_to_send(); // Throws
3,658✔
1750
        return;
3,658✔
1751
    }
3,658✔
1752
}
3,828✔
1753

1754

1755
void Session::complete_deactivation()
1756
{
9,606✔
1757
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,606✔
1758
    m_state = Deactivated;
9,606✔
1759

4,626✔
1760
    logger.debug("Deactivation completed"); // Throws
9,606✔
1761
}
9,606✔
1762

1763

1764
// Called by the associated Connection object when this session is granted an
1765
// opportunity to send a message.
1766
//
1767
// The caller (Connection) must discard the session if the session has become
1768
// deactivated upon return.
1769
void Session::send_message()
1770
{
155,226✔
1771
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
155,226✔
1772
    REALM_ASSERT(m_enlisted_to_send);
155,226✔
1773
    m_enlisted_to_send = false;
155,226✔
1774
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
155,226✔
1775
        // Deactivation has been initiated. If the UNBIND message has not been
4,326✔
1776
        // sent yet, there is no point in sending it. Instead, we can let the
4,326✔
1777
        // deactivation process complete.
4,326✔
1778
        if (!m_bind_message_sent) {
8,972✔
1779
            return complete_deactivation(); // Throws
2,660✔
1780
            // Life cycle state is now Deactivated
1,342✔
1781
        }
2,660✔
1782

2,984✔
1783
        // Session life cycle state is Deactivating or the unbinding process has
2,984✔
1784
        // been initiated by a session specific ERROR message
2,984✔
1785
        if (!m_unbind_message_sent)
6,312✔
1786
            send_unbind_message(); // Throws
6,312✔
1787
        return;
6,312✔
1788
    }
6,312✔
1789

73,456✔
1790
    // Session life cycle state is Active and the unbinding process has
73,456✔
1791
    // not been initiated
73,456✔
1792
    REALM_ASSERT(!m_unbind_message_sent);
146,254✔
1793

73,456✔
1794
    if (!m_bind_message_sent)
146,254✔
1795
        return send_bind_message(); // Throws
8,628✔
1796

69,146✔
1797
    if (!m_ident_message_sent) {
137,626✔
1798
        if (have_client_file_ident())
6,582✔
1799
            send_ident_message(); // Throws
6,582✔
1800
        return;
6,582✔
1801
    }
6,582✔
1802

65,804✔
1803
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
131,044✔
1804
                                                      [](const PendingTestCommand& command) {
65,876✔
1805
                                                          return command.pending;
144✔
1806
                                                      });
144✔
1807
    if (has_pending_test_command) {
131,044✔
1808
        return send_test_command_message();
44✔
1809
    }
44✔
1810

65,782✔
1811
    if (m_error_to_send)
131,000✔
1812
        return send_json_error_message(); // Throws
32✔
1813

65,766✔
1814
    // Stop sending upload, mark and query messages when the client detects an error.
65,766✔
1815
    if (m_client_error) {
130,968✔
1816
        return;
16✔
1817
    }
16✔
1818

65,758✔
1819
    if (m_target_download_mark > m_last_download_mark_sent)
130,952✔
1820
        return send_mark_message(); // Throws
16,218✔
1821

57,728✔
1822
    auto is_upload_allowed = [&]() -> bool {
114,744✔
1823
        if (!m_is_flx_sync_session) {
114,744✔
1824
            return true;
103,712✔
1825
        }
103,712✔
1826

5,698✔
1827
        auto migration_store = get_migration_store();
11,032✔
1828
        if (!migration_store) {
11,032✔
1829
            return true;
×
1830
        }
×
1831

5,698✔
1832
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
11,032✔
1833
        if (!sentinel_query_version) {
11,032✔
1834
            return true;
11,002✔
1835
        }
11,002✔
1836

16✔
1837
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
16✔
1838
        return m_last_sent_flx_query_version != *sentinel_query_version;
30✔
1839
    };
30✔
1840

57,728✔
1841
    if (!is_upload_allowed()) {
114,734✔
1842
        return;
16✔
1843
    }
16✔
1844

57,720✔
1845
    auto check_pending_flx_version = [&]() -> bool {
114,730✔
1846
        if (!m_is_flx_sync_session) {
114,730✔
1847
            return false;
103,714✔
1848
        }
103,714✔
1849

5,690✔
1850
        if (!m_allow_upload) {
11,016✔
1851
            return false;
2,174✔
1852
        }
2,174✔
1853

4,594✔
1854
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(
8,842✔
1855
            m_last_sent_flx_query_version, m_upload_progress.client_version);
8,842✔
1856

4,594✔
1857
        if (!m_pending_flx_sub_set) {
8,842✔
1858
            return false;
7,364✔
1859
        }
7,364✔
1860

738✔
1861
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,478✔
1862
    };
1,478✔
1863

57,720✔
1864
    if (check_pending_flx_version()) {
114,718✔
1865
        return send_query_change_message(); // throws
834✔
1866
    }
834✔
1867

57,304✔
1868
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
113,884✔
1869
        return send_upload_message(); // Throws
55,498✔
1870
    }
55,498✔
1871
}
113,884✔
1872

1873

1874
void Session::send_bind_message()
1875
{
8,628✔
1876
    REALM_ASSERT_EX(m_state == Active, m_state);
8,628✔
1877

4,310✔
1878
    session_ident_type session_ident = m_ident;
8,628✔
1879
    bool need_client_file_ident = !have_client_file_ident();
8,628✔
1880
    const bool is_subserver = false;
8,628✔
1881

4,310✔
1882

4,310✔
1883
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,628✔
1884
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,628✔
1885
    OutputBuffer& out = m_conn.get_output_buffer();
8,628✔
1886
    // Discard the token since it's ignored by the server.
4,310✔
1887
    std::string empty_access_token;
8,628✔
1888
    if (m_is_flx_sync_session) {
8,628✔
1889
        nlohmann::json bind_json_data;
1,376✔
1890
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,376✔
1891
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1892
        }
60✔
1893
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,376✔
1894
        if (logger.would_log(util::Logger::Level::debug)) {
1,376✔
1895
            std::string json_data_dump;
1,376✔
1896
            if (!bind_json_data.empty()) {
1,376✔
1897
                json_data_dump = bind_json_data.dump();
1,376✔
1898
            }
1,376✔
1899
            logger.debug(
1,376✔
1900
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,376✔
1901
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,376✔
1902
        }
1,376✔
1903
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,376✔
1904
                                       need_client_file_ident, is_subserver); // Throws
1,376✔
1905
    }
1,376✔
1906
    else {
7,252✔
1907
        std::string server_path = get_virt_path();
7,252✔
1908
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,252✔
1909
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,252✔
1910
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,252✔
1911
                                       need_client_file_ident, is_subserver); // Throws
7,252✔
1912
    }
7,252✔
1913
    m_conn.initiate_write_message(out, this); // Throws
8,628✔
1914

4,310✔
1915
    m_bind_message_sent = true;
8,628✔
1916

4,310✔
1917
    // Ready to send the IDENT message if the file identifier pair is already
4,310✔
1918
    // available.
4,310✔
1919
    if (!need_client_file_ident)
8,628✔
1920
        enlist_to_send(); // Throws
3,508✔
1921
}
8,628✔
1922

1923

1924
void Session::send_ident_message()
1925
{
6,582✔
1926
    REALM_ASSERT_EX(m_state == Active, m_state);
6,582✔
1927
    REALM_ASSERT(m_bind_message_sent);
6,582✔
1928
    REALM_ASSERT(!m_unbind_message_sent);
6,582✔
1929
    REALM_ASSERT(have_client_file_ident());
6,582✔
1930

3,342✔
1931

3,342✔
1932
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,582✔
1933
    OutputBuffer& out = m_conn.get_output_buffer();
6,582✔
1934
    session_ident_type session_ident = m_ident;
6,582✔
1935

3,342✔
1936
    if (m_is_flx_sync_session) {
6,582✔
1937
        const auto active_query_set = get_flx_subscription_store()->get_active();
962✔
1938
        const auto active_query_body = active_query_set.to_ext_json();
962✔
1939
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
962✔
1940
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
962✔
1941
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
962✔
1942
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
962✔
1943
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
962✔
1944
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
962✔
1945
                     active_query_body); // Throws
962✔
1946
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
962✔
1947
                                        active_query_set.version(), active_query_body); // Throws
962✔
1948
        m_last_sent_flx_query_version = active_query_set.version();
962✔
1949
    }
962✔
1950
    else {
5,620✔
1951
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,620✔
1952
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,620✔
1953
                     "latest_server_version_salt=%6)",
5,620✔
1954
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,620✔
1955
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,620✔
1956
                     m_progress.latest_server_version.salt);                                  // Throws
5,620✔
1957
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,620✔
1958
    }
5,620✔
1959
    m_conn.initiate_write_message(out, this); // Throws
6,582✔
1960

3,342✔
1961
    m_ident_message_sent = true;
6,582✔
1962

3,342✔
1963
    // Other messages may be waiting to be sent
3,342✔
1964
    enlist_to_send(); // Throws
6,582✔
1965
}
6,582✔
1966

1967
void Session::send_query_change_message()
1968
{
834✔
1969
    REALM_ASSERT_EX(m_state == Active, m_state);
834✔
1970
    REALM_ASSERT(m_ident_message_sent);
834✔
1971
    REALM_ASSERT(!m_unbind_message_sent);
834✔
1972
    REALM_ASSERT(m_pending_flx_sub_set);
834✔
1973
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
834✔
1974

416✔
1975
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
834✔
1976
        return;
×
1977
    }
×
1978

416✔
1979
    auto sub_store = get_flx_subscription_store();
834✔
1980
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
834✔
1981
    auto latest_queries = latest_sub_set.to_ext_json();
834✔
1982
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
834✔
1983
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
834✔
1984

416✔
1985
    OutputBuffer& out = m_conn.get_output_buffer();
834✔
1986
    session_ident_type session_ident = get_ident();
834✔
1987
    ClientProtocol& protocol = m_conn.get_client_protocol();
834✔
1988
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
834✔
1989
    m_conn.initiate_write_message(out, this);
834✔
1990

416✔
1991
    m_last_sent_flx_query_version = latest_sub_set.version();
834✔
1992

416✔
1993
    request_download_completion_notification();
834✔
1994
}
834✔
1995

1996
void Session::send_upload_message()
1997
{
55,498✔
1998
    REALM_ASSERT_EX(m_state == Active, m_state);
55,498✔
1999
    REALM_ASSERT(m_ident_message_sent);
55,498✔
2000
    REALM_ASSERT(!m_unbind_message_sent);
55,498✔
2001

27,924✔
2002
    if (REALM_UNLIKELY(get_client().is_dry_run()))
55,498✔
2003
        return;
27,924✔
2004

27,924✔
2005
    version_type target_upload_version = get_db()->get_version_of_latest_snapshot();
55,498✔
2006
    if (m_pending_flx_sub_set) {
55,498✔
2007
        REALM_ASSERT(m_is_flx_sync_session);
644✔
2008
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
644✔
2009
    }
644✔
2010
    if (target_upload_version > m_last_version_available) {
55,498✔
2011
        m_last_version_available = target_upload_version;
308✔
2012
    }
308✔
2013

27,924✔
2014
    const ClientReplication& repl = access_realm(); // Throws
55,498✔
2015

27,924✔
2016
    std::vector<UploadChangeset> uploadable_changesets;
55,498✔
2017
    version_type locked_server_version = 0;
55,498✔
2018
    repl.get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
55,498✔
2019
                                                  locked_server_version); // Throws
55,498✔
2020

27,924✔
2021
    if (uploadable_changesets.empty()) {
55,498✔
2022
        // Nothing more to upload right now
13,938✔
2023
        check_for_upload_completion(); // Throws
28,102✔
2024
        // If we need to limit upload up to some version other than the last client version available and there are no
13,938✔
2025
        // changes to upload, then there is no need to send an empty message.
13,938✔
2026
        if (m_pending_flx_sub_set) {
28,102✔
2027
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
208✔
2028
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
208✔
2029
            // Other messages may be waiting to be sent
104✔
2030
            return enlist_to_send(); // Throws
208✔
2031
        }
208✔
2032
    }
27,396✔
2033
    else {
27,396✔
2034
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
27,396✔
2035
    }
27,396✔
2036

27,924✔
2037
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
55,394✔
2038
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
436✔
2039
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
436✔
2040
    }
436✔
2041

27,820✔
2042
    version_type progress_client_version = m_upload_progress.client_version;
55,290✔
2043
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
55,290✔
2044

27,820✔
2045
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
55,290✔
2046
                 "locked_server_version=%3, num_changesets=%4)",
55,290✔
2047
                 progress_client_version, progress_server_version, locked_server_version,
55,290✔
2048
                 uploadable_changesets.size()); // Throws
55,290✔
2049

27,820✔
2050
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,290✔
2051
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
55,290✔
2052

27,820✔
2053
    for (const UploadChangeset& uc : uploadable_changesets) {
49,414✔
2054
        logger.debug(util::LogCategory::changeset,
41,776✔
2055
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
41,776✔
2056
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
41,776✔
2057
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
41,776✔
2058
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
41,776✔
2059
        if (logger.would_log(util::Logger::Level::trace)) {
41,776✔
2060
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2061
            if (changeset_data.size() < 1024) {
×
NEW
2062
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2063
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2064
            }
×
2065
            else {
×
NEW
2066
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2067
                             protocol.compressed_hex_dump(changeset_data));
×
2068
            }
×
2069

2070
#if REALM_DEBUG
×
2071
            ChunkedBinaryInputStream in{changeset_data};
×
2072
            Changeset log;
×
2073
            try {
×
2074
                parse_changeset(in, log);
×
2075
                std::stringstream ss;
×
2076
                log.print(ss);
×
NEW
2077
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2078
            }
×
2079
            catch (const BadChangesetError& err) {
×
NEW
2080
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2081
            }
×
2082
#endif
×
2083
        }
×
2084

20,182✔
2085
#if 0 // Upload log compaction is currently not implemented
2086
        if (!get_client().m_disable_upload_compaction) {
2087
            ChangesetEncoder::Buffer encode_buffer;
2088

2089
            {
2090
                // Upload compaction only takes place within single changesets to
2091
                // avoid another client seeing inconsistent snapshots.
2092
                ChunkedBinaryInputStream stream{uc.changeset};
2093
                Changeset changeset;
2094
                parse_changeset(stream, changeset); // Throws
2095
                // FIXME: What is the point of setting these? How can compaction care about them?
2096
                changeset.version = uc.progress.client_version;
2097
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2098
                changeset.origin_timestamp = uc.origin_timestamp;
2099
                changeset.origin_file_ident = uc.origin_file_ident;
2100

2101
                compact_changesets(&changeset, 1);
2102
                encode_changeset(changeset, encode_buffer);
2103

2104
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2105
                             encode_buffer.size()); // Throws
2106
            }
2107

2108
            upload_message_builder.add_changeset(
2109
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2110
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2111
        }
2112
        else
2113
#endif
2114
        {
41,776✔
2115
            upload_message_builder.add_changeset(uc.progress.client_version,
41,776✔
2116
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
41,776✔
2117
                                                 uc.origin_file_ident,
41,776✔
2118
                                                 uc.changeset); // Throws
41,776✔
2119
        }
41,776✔
2120
    }
41,776✔
2121

27,820✔
2122
    int protocol_version = m_conn.get_negotiated_protocol_version();
55,290✔
2123
    OutputBuffer& out = m_conn.get_output_buffer();
55,290✔
2124
    session_ident_type session_ident = get_ident();
55,290✔
2125
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
55,290✔
2126
                                               progress_server_version,
55,290✔
2127
                                               locked_server_version); // Throws
55,290✔
2128
    m_conn.initiate_write_message(out, this);                          // Throws
55,290✔
2129

27,820✔
2130
    // Other messages may be waiting to be sent
27,820✔
2131
    enlist_to_send(); // Throws
55,290✔
2132
}
55,290✔
2133

2134

2135
void Session::send_mark_message()
2136
{
16,220✔
2137
    REALM_ASSERT_EX(m_state == Active, m_state);
16,220✔
2138
    REALM_ASSERT(m_ident_message_sent);
16,220✔
2139
    REALM_ASSERT(!m_unbind_message_sent);
16,220✔
2140
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
16,220✔
2141

8,030✔
2142
    request_ident_type request_ident = m_target_download_mark;
16,220✔
2143
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
16,220✔
2144

8,030✔
2145
    ClientProtocol& protocol = m_conn.get_client_protocol();
16,220✔
2146
    OutputBuffer& out = m_conn.get_output_buffer();
16,220✔
2147
    session_ident_type session_ident = get_ident();
16,220✔
2148
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
16,220✔
2149
    m_conn.initiate_write_message(out, this);                      // Throws
16,220✔
2150

8,030✔
2151
    m_last_download_mark_sent = request_ident;
16,220✔
2152

8,030✔
2153
    // Other messages may be waiting to be sent
8,030✔
2154
    enlist_to_send(); // Throws
16,220✔
2155
}
16,220✔
2156

2157

2158
void Session::send_unbind_message()
2159
{
6,312✔
2160
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,312✔
2161
    REALM_ASSERT(m_bind_message_sent);
6,312✔
2162
    REALM_ASSERT(!m_unbind_message_sent);
6,312✔
2163

2,984✔
2164
    logger.debug("Sending: UNBIND"); // Throws
6,312✔
2165

2,984✔
2166
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,312✔
2167
    OutputBuffer& out = m_conn.get_output_buffer();
6,312✔
2168
    session_ident_type session_ident = get_ident();
6,312✔
2169
    protocol.make_unbind_message(out, session_ident); // Throws
6,312✔
2170
    m_conn.initiate_write_message(out, this);         // Throws
6,312✔
2171

2,984✔
2172
    m_unbind_message_sent = true;
6,312✔
2173
}
6,312✔
2174

2175

2176
void Session::send_json_error_message()
2177
{
32✔
2178
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2179
    REALM_ASSERT(m_ident_message_sent);
32✔
2180
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2181
    REALM_ASSERT(m_error_to_send);
32✔
2182
    REALM_ASSERT(m_client_error);
32✔
2183

16✔
2184
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2185
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2186
    session_ident_type session_ident = get_ident();
32✔
2187
    auto protocol_error = m_client_error->error_for_server;
32✔
2188

16✔
2189
    auto message = util::format("%1", m_client_error->to_status());
32✔
2190
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2191
                session_ident); // Throws
32✔
2192

16✔
2193
    nlohmann::json error_body_json;
32✔
2194
    error_body_json["message"] = std::move(message);
32✔
2195
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2196
                                     error_body_json.dump()); // Throws
32✔
2197
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2198

16✔
2199
    m_error_to_send = false;
32✔
2200
    enlist_to_send(); // Throws
32✔
2201
}
32✔
2202

2203

2204
void Session::send_test_command_message()
2205
{
44✔
2206
    REALM_ASSERT_EX(m_state == Active, m_state);
44✔
2207

22✔
2208
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2209
                           [](const PendingTestCommand& command) {
44✔
2210
                               return command.pending;
44✔
2211
                           });
44✔
2212
    REALM_ASSERT(it != m_pending_test_commands.end());
44✔
2213

22✔
2214
    ClientProtocol& protocol = m_conn.get_client_protocol();
44✔
2215
    OutputBuffer& out = m_conn.get_output_buffer();
44✔
2216
    auto session_ident = get_ident();
44✔
2217

22✔
2218
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
44✔
2219
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
44✔
2220

22✔
2221
    m_conn.initiate_write_message(out, this); // Throws;
44✔
2222
    it->pending = false;
44✔
2223

22✔
2224
    enlist_to_send();
44✔
2225
}
44✔
2226

2227

2228
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2229
{
3,240✔
2230
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,240✔
2231
                 client_file_ident.salt); // Throws
3,240✔
2232

1,504✔
2233
    // Ignore the message if the deactivation process has been initiated,
1,504✔
2234
    // because in that case, the associated Realm and SessionWrapper must
1,504✔
2235
    // not be accessed any longer.
1,504✔
2236
    if (m_state != Active)
3,240✔
2237
        return Status::OK(); // Success
78✔
2238

1,500✔
2239
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,162✔
2240
                               !m_unbound_message_received);
3,162✔
2241
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,162✔
2242
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2243
    }
×
2244
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,162✔
2245
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2246
    }
×
2247
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,162✔
2248
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2249
    }
×
2250

1,500✔
2251
    m_client_file_ident = client_file_ident;
3,162✔
2252

1,500✔
2253
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,162✔
2254
        // Ready to send the IDENT message
2255
        ensure_enlisted_to_send(); // Throws
×
2256
        return Status::OK();       // Success
×
2257
    }
×
2258

1,500✔
2259
    // access before the client reset (if applicable) because
1,500✔
2260
    // the reset can take a while and the sync session might have died
1,500✔
2261
    // by the time the reset finishes.
1,500✔
2262
    ClientReplication& repl = access_realm(); // Throws
3,162✔
2263

1,500✔
2264
    auto client_reset_if_needed = [&]() -> bool {
3,162✔
2265
        if (!m_client_reset_operation) {
3,162✔
2266
            return false;
2,826✔
2267
        }
2,826✔
2268

168✔
2269
        // ClientResetOperation::finalize() will return true only if the operation actually did
168✔
2270
        // a client reset. It may choose not to do a reset if the local Realm does not exist
168✔
2271
        // at this point (in that case there is nothing to reset). But in any case, we must
168✔
2272
        // clean up m_client_reset_operation at this point as sync should be able to continue from
168✔
2273
        // this point forward.
168✔
2274
        auto client_reset_operation = std::move(m_client_reset_operation);
336✔
2275
        util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
220✔
2276
            this->on_flx_sync_version_complete(version);
104✔
2277
        };
104✔
2278
        if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
336✔
2279
                                              std::move(on_flx_subscription_complete))) {
168✔
2280
            return false;
×
2281
        }
×
2282

168✔
2283
        // The fresh Realm has been used to reset the state
168✔
2284
        logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2285

168✔
2286
        SaltedFileIdent client_file_ident;
336✔
2287
        bool has_pending_client_reset = false;
336✔
2288
        repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress,
336✔
2289
                                      &has_pending_client_reset); // Throws
336✔
2290
        REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
336✔
2291
        REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
336✔
2292
        REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2293
                        m_progress.download.last_integrated_client_version);
336✔
2294
        REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
336✔
2295
        REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
336✔
2296
                        m_progress.upload.last_integrated_server_version);
336✔
2297
        logger.trace(util::LogCategory::reset, "last_version_available  = %1", m_last_version_available); // Throws
336✔
2298

168✔
2299
        m_upload_progress = m_progress.upload;
336✔
2300
        m_download_progress = m_progress.download;
336✔
2301
        // In recovery mode, there may be new changesets to upload and nothing left to download.
168✔
2302
        // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
168✔
2303
        // For both, we want to allow uploads again without needing external changes to download first.
168✔
2304
        m_allow_upload = true;
336✔
2305
        REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
336✔
2306

168✔
2307
        if (has_pending_client_reset) {
336✔
2308
            handle_pending_client_reset_acknowledgement();
268✔
2309
        }
268✔
2310

168✔
2311
        // If a migration or rollback is in progress, mark it complete when client reset is completed.
168✔
2312
        if (auto migration_store = get_migration_store()) {
336✔
2313
            migration_store->complete_migration_or_rollback();
240✔
2314
        }
240✔
2315

168✔
2316
        return true;
336✔
2317
    };
336✔
2318
    // if a client reset happens, it will take care of setting the file ident
1,500✔
2319
    // and if not, we do it here
1,500✔
2320
    bool did_client_reset = false;
3,162✔
2321
    try {
3,162✔
2322
        did_client_reset = client_reset_if_needed();
3,162✔
2323
    }
3,162✔
2324
    catch (const std::exception& e) {
1,534✔
2325
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
68✔
2326
        logger.error(err_msg.c_str());
68✔
2327
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
68✔
2328
        suspend(err_info);
68✔
2329
        return Status::OK();
68✔
2330
    }
68✔
2331
    if (!did_client_reset) {
3,094✔
2332
        repl.get_history().set_client_file_ident(client_file_ident,
2,826✔
2333
                                                 m_fix_up_object_ids); // Throws
2,826✔
2334
        m_progress.download.last_integrated_client_version = 0;
2,826✔
2335
        m_progress.upload.client_version = 0;
2,826✔
2336
        m_last_version_selected_for_upload = 0;
2,826✔
2337
    }
2,826✔
2338

1,466✔
2339
    // Ready to send the IDENT message
1,466✔
2340
    ensure_enlisted_to_send(); // Throws
3,094✔
2341
    return Status::OK();       // Success
3,094✔
2342
}
3,094✔
2343

2344
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2345
                                         DownloadBatchState batch_state, int64_t query_version,
2346
                                         const ReceivedChangesets& received_changesets)
2347
{
44,050✔
2348
    REALM_ASSERT_EX(query_version >= 0, query_version);
44,050✔
2349
    // Ignore the message if the deactivation process has been initiated,
23,788✔
2350
    // because in that case, the associated Realm and SessionWrapper must
23,788✔
2351
    // not be accessed any longer.
23,788✔
2352
    if (m_state != Active)
44,050✔
2353
        return Status::OK();
478✔
2354

23,520✔
2355
    if (is_steady_state_download_message(batch_state, query_version)) {
43,572✔
2356
        batch_state = DownloadBatchState::SteadyState;
41,998✔
2357
    }
41,998✔
2358

23,520✔
2359
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,572✔
2360
                 "latest_server_version=%3, latest_server_version_salt=%4, "
43,572✔
2361
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
43,572✔
2362
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
43,572✔
2363
                 progress.download.server_version, progress.download.last_integrated_client_version,
43,572✔
2364
                 progress.latest_server_version.version, progress.latest_server_version.salt,
43,572✔
2365
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
43,572✔
2366
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
43,572✔
2367

23,520✔
2368
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,520✔
2369
    // changeset over and over again.
23,520✔
2370
    if (m_client_error) {
43,572✔
2371
        logger.debug("Ignoring download message because the client detected an integration error");
×
2372
        return Status::OK();
×
2373
    }
×
2374

23,520✔
2375
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
43,576✔
2376
    if (REALM_UNLIKELY(!legal_at_this_time)) {
43,572✔
2377
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2378
    }
×
2379
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
43,572✔
2380
        logger.error("Bad sync progress received (%1)", status);
×
2381
        return status;
×
2382
    }
×
2383

23,520✔
2384
    version_type server_version = m_progress.download.server_version;
43,572✔
2385
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
43,572✔
2386
    for (const Transformer::RemoteChangeset& changeset : received_changesets) {
44,092✔
2387
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,782✔
2388
        // version must be increasing, but can stay the same during bootstraps.
22,782✔
2389
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,602✔
2390
                                                         : (changeset.remote_version > server_version);
42,534✔
2391
        // Each server version cannot be greater than the one in the header of the download message.
22,782✔
2392
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,354✔
2393
        if (!good_server_version) {
43,354✔
2394
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2395
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2396
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2397
        }
×
2398
        server_version = changeset.remote_version;
43,354✔
2399
        // Check that per-changeset last integrated client version is "weakly"
22,782✔
2400
        // increasing.
22,782✔
2401
        bool good_client_version =
43,354✔
2402
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,354✔
2403
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,354✔
2404
        if (!good_client_version) {
43,354✔
2405
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2406
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2407
                                 "(%1, %2, %3)",
×
2408
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2409
                                 progress.download.last_integrated_client_version)};
×
2410
        }
×
2411
        last_integrated_client_version = changeset.last_integrated_local_version;
43,354✔
2412
        // Server shouldn't send our own changes, and zero is not a valid client
22,782✔
2413
        // file identifier.
22,782✔
2414
        bool good_file_ident =
43,354✔
2415
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,354✔
2416
        if (!good_file_ident) {
43,354✔
2417
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2418
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2419
                                 changeset.origin_file_ident)};
×
2420
        }
×
2421
    }
43,354✔
2422

23,520✔
2423
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
43,572✔
2424
                                       batch_state, received_changesets.size());
43,572✔
2425
    if (hook_action == SyncClientHookAction::EarlyReturn) {
43,572✔
2426
        return Status::OK();
12✔
2427
    }
12✔
2428
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
43,560✔
2429

23,514✔
2430
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
43,560✔
2431
        clear_resumption_delay_state();
1,566✔
2432
        return Status::OK();
1,566✔
2433
    }
1,566✔
2434

22,730✔
2435
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
41,994✔
2436

22,730✔
2437
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
41,994✔
2438
                                  batch_state, received_changesets.size());
41,994✔
2439
    if (hook_action == SyncClientHookAction::EarlyReturn) {
41,994✔
2440
        return Status::OK();
×
2441
    }
×
2442
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
41,994✔
2443

22,730✔
2444
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,730✔
2445
    // after a retryable session error.
22,730✔
2446
    clear_resumption_delay_state();
41,994✔
2447
    return Status::OK();
41,994✔
2448
}
41,994✔
2449

2450
Status Session::receive_mark_message(request_ident_type request_ident)
2451
{
15,660✔
2452
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
15,660✔
2453

7,744✔
2454
    // Ignore the message if the deactivation process has been initiated,
7,744✔
2455
    // because in that case, the associated Realm and SessionWrapper must
7,744✔
2456
    // not be accessed any longer.
7,744✔
2457
    if (m_state != Active)
15,660✔
2458
        return Status::OK(); // Success
34✔
2459

7,718✔
2460
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
15,626✔
2461
    if (REALM_UNLIKELY(!legal_at_this_time)) {
15,626✔
2462
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
×
2463
    }
×
2464
    bool good_request_ident =
15,626✔
2465
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
15,626✔
2466
    if (REALM_UNLIKELY(!good_request_ident)) {
15,626✔
2467
        return {
×
2468
            ErrorCodes::SyncProtocolInvariantFailed,
×
2469
            util::format(
×
2470
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2471
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2472
    }
×
2473

7,718✔
2474
    m_server_version_at_last_download_mark = m_progress.download.server_version;
15,626✔
2475
    m_last_download_mark_received = request_ident;
15,626✔
2476
    check_for_download_completion(); // Throws
15,626✔
2477

7,718✔
2478
    return Status::OK(); // Success
15,626✔
2479
}
15,626✔
2480

2481

2482
// The caller (Connection) must discard the session if the session has become
2483
// deactivated upon return.
2484
Status Session::receive_unbound_message()
2485
{
4,110✔
2486
    logger.debug("Received: UNBOUND");
4,110✔
2487

1,828✔
2488
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,110✔
2489
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,110✔
2490
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2491
    }
×
2492

1,828✔
2493
    // The fact that the UNBIND message has been sent, but an ERROR message has
1,828✔
2494
    // not been received, implies that the deactivation process must have been
1,828✔
2495
    // initiated, so this session must be in the Deactivating state or the session
1,828✔
2496
    // has been suspended because of a client side error.
1,828✔
2497
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,110!
2498

1,828✔
2499
    m_unbound_message_received = true;
4,110✔
2500

1,828✔
2501
    // Detect completion of the unbinding process
1,828✔
2502
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,110✔
2503
        // The deactivation process completes when the unbinding process
1,828✔
2504
        // completes.
1,828✔
2505
        complete_deactivation(); // Throws
4,110✔
2506
        // Life cycle state is now Deactivated
1,828✔
2507
    }
4,110✔
2508

1,828✔
2509
    return Status::OK(); // Success
4,110✔
2510
}
4,110✔
2511

2512

2513
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2514
{
16✔
2515
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2516
    // Ignore the message if the deactivation process has been initiated,
8✔
2517
    // because in that case, the associated Realm and SessionWrapper must
8✔
2518
    // not be accessed any longer.
8✔
2519
    if (m_state == Active) {
16✔
2520
        on_flx_sync_error(query_version, message); // throws
16✔
2521
    }
16✔
2522
    return Status::OK();
16✔
2523
}
16✔
2524

2525
// The caller (Connection) must discard the session if the session has become
2526
// deactivated upon return.
2527
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2528
{
908✔
2529
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
908✔
2530
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
908✔
2531

472✔
2532
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
908✔
2533
    if (REALM_UNLIKELY(!legal_at_this_time)) {
908✔
2534
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2535
    }
×
2536

472✔
2537
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
908✔
2538
    auto status = protocol_error_to_status(protocol_error, info.message);
908✔
2539
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
908✔
2540
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2541
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2542
                             info.raw_error_code)};
×
2543
    }
×
2544

472✔
2545
    // Can't process debug hook actions once the Session is undergoing deactivation, since
472✔
2546
    // the SessionWrapper may not be available
472✔
2547
    if (m_state == Active) {
908✔
2548
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
904✔
2549
        if (debug_action == SyncClientHookAction::EarlyReturn) {
904✔
2550
            return Status::OK();
8✔
2551
        }
8✔
2552
    }
900✔
2553

468✔
2554
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
468✔
2555
    // containing the compensating write has appeared in a download message.
468✔
2556
    if (status == ErrorCodes::SyncCompensatingWrite) {
900✔
2557
        // If the client is not active, the compensating writes will not be processed now, but will be
20✔
2558
        // sent again the next time the client connects
20✔
2559
        if (m_state == Active) {
40✔
2560
            REALM_ASSERT(info.compensating_write_server_version.has_value());
40✔
2561
            m_pending_compensating_write_errors.push_back(info);
40✔
2562
        }
40✔
2563
        return Status::OK();
40✔
2564
    }
40✔
2565

448✔
2566
    m_error_message_received = true;
860✔
2567
    suspend(SessionErrorInfo{info, std::move(status)});
860✔
2568
    return Status::OK();
860✔
2569
}
860✔
2570

2571
void Session::suspend(const SessionErrorInfo& info)
2572
{
928✔
2573
    REALM_ASSERT(!m_suspended);
928✔
2574
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
928✔
2575
    logger.debug("Suspended"); // Throws
928✔
2576

482✔
2577
    m_suspended = true;
928✔
2578

482✔
2579
    // Detect completion of the unbinding process
482✔
2580
    if (m_unbind_message_send_complete && m_error_message_received) {
928!
2581
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2582
        // we received an ERROR message implies that the deactivation process must
2583
        // have been initiated, so this session must be in the Deactivating state.
UNCOV
2584
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2585

2586
        // The deactivation process completes when the unbinding process
2587
        // completes.
UNCOV
2588
        complete_deactivation(); // Throws
×
2589
        // Life cycle state is now Deactivated
UNCOV
2590
    }
×
2591

482✔
2592
    // Notify the application of the suspension of the session if the session is
482✔
2593
    // still in the Active state
482✔
2594
    if (m_state == Active) {
928✔
2595
        m_conn.one_less_active_unsuspended_session(); // Throws
924✔
2596
        on_suspended(info);                           // Throws
924✔
2597
    }
924✔
2598

482✔
2599
    if (!info.is_fatal) {
928✔
2600
        begin_resumption_delay(info);
400✔
2601
    }
400✔
2602

482✔
2603
    // Ready to send the UNBIND message, if it has not been sent already
482✔
2604
    if (!m_unbind_message_sent)
928✔
2605
        ensure_enlisted_to_send(); // Throws
924✔
2606
}
928✔
2607

2608
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2609
{
44✔
2610
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
44✔
2611
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2612
                           [&](const PendingTestCommand& command) {
44✔
2613
                               return command.id == ident;
44✔
2614
                           });
44✔
2615
    if (it == m_pending_test_commands.end()) {
44✔
2616
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2617
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2618
    }
×
2619

22✔
2620
    it->promise.emplace_value(std::string{body});
44✔
2621
    m_pending_test_commands.erase(it);
44✔
2622

22✔
2623
    return Status::OK();
44✔
2624
}
44✔
2625

2626
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2627
{
400✔
2628
    REALM_ASSERT(!m_try_again_activation_timer);
400✔
2629

220✔
2630
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
400✔
2631
                                  error_info.resumption_delay_interval);
400✔
2632
    auto try_again_interval = m_try_again_delay_info.delay_interval();
400✔
2633
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
400✔
2634
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2635
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2636
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2637
        try_again_interval = std::chrono::milliseconds{1000};
20✔
2638
    }
20✔
2639
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
400✔
2640
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
400✔
2641
        if (status == ErrorCodes::OperationAborted)
400✔
2642
            return;
4✔
2643
        else if (!status.is_ok())
396✔
2644
            throw Exception(status);
×
2645

218✔
2646
        m_try_again_activation_timer.reset();
396✔
2647
        cancel_resumption_delay();
396✔
2648
    });
396✔
2649
}
400✔
2650

2651
void Session::clear_resumption_delay_state()
2652
{
43,564✔
2653
    if (m_try_again_activation_timer) {
43,564✔
2654
        logger.debug("Clearing resumption delay state after successful download");
×
2655
        m_try_again_delay_info.reset();
×
2656
    }
×
2657
}
43,564✔
2658

2659
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2660
{
43,576✔
2661
    const SyncProgress& a = m_progress;
43,576✔
2662
    const SyncProgress& b = progress;
43,576✔
2663
    std::string message;
43,576✔
2664
    if (b.latest_server_version.version < a.latest_server_version.version) {
43,576✔
2665
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2666
                               "session (current: %1, received: %2)",
×
2667
                               a.latest_server_version.version, b.latest_server_version.version);
×
2668
    }
×
2669
    if (b.upload.client_version < a.upload.client_version) {
43,576✔
2670
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2671
                               "throughout a session (current: %1, received: %2)",
×
2672
                               a.upload.client_version, b.upload.client_version);
×
2673
    }
×
2674
    if (b.upload.client_version > m_last_version_available) {
43,576✔
2675
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2676
                               "version in existence (current: %1, received: %2)",
×
2677
                               m_last_version_available, b.upload.client_version);
×
2678
    }
×
2679
    if (b.download.server_version < a.download.server_version) {
43,576✔
2680
        message =
×
2681
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2682
                         a.download.server_version, b.download.server_version);
×
2683
    }
×
2684
    if (b.download.server_version > b.latest_server_version.version) {
43,576✔
2685
        message = util::format(
×
2686
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2687
            b.download.server_version, b.latest_server_version.version);
×
2688
    }
×
2689
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
43,576✔
2690
        message = util::format(
×
2691
            "Last integrated client version on the server at the position in the server's history of the download "
×
2692
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2693
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2694
    }
×
2695
    if (b.download.last_integrated_client_version > b.upload.client_version) {
43,576✔
2696
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2697
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2698
                               "on the server (download: %1, upload: %2)",
×
2699
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2700
    }
×
2701
    if (b.download.server_version < b.upload.last_integrated_server_version) {
43,576✔
2702
        message = util::format(
×
2703
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2704
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2705
            b.download.server_version, b.upload.last_integrated_server_version);
×
2706
    }
×
2707

23,524✔
2708
    if (message.empty()) {
43,576✔
2709
        return Status::OK();
43,574✔
2710
    }
43,574✔
2711
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2712
}
2✔
2713

2714

2715
void Session::check_for_upload_completion()
2716
{
75,270✔
2717
    REALM_ASSERT_EX(m_state == Active, m_state);
75,270✔
2718
    if (!m_upload_completion_notification_requested) {
75,270✔
2719
        return;
44,654✔
2720
    }
44,654✔
2721

15,146✔
2722
    // during an ongoing client reset operation, we never upload anything
15,146✔
2723
    if (m_client_reset_operation)
30,616✔
2724
        return;
234✔
2725

15,028✔
2726
    // Upload process must have reached end of history
15,028✔
2727
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
30,382✔
2728
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
30,382✔
2729
    if (!scan_complete)
30,382✔
2730
        return;
4,880✔
2731

12,602✔
2732
    // All uploaded changesets must have been acknowledged by the server
12,602✔
2733
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,502✔
2734
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,502✔
2735
    if (!all_uploads_accepted)
25,502✔
2736
        return;
10,848✔
2737

7,248✔
2738
    m_upload_completion_notification_requested = false;
14,654✔
2739
    on_upload_completion(); // Throws
14,654✔
2740
}
14,654✔
2741

2742

2743
void Session::check_for_download_completion()
2744
{
58,990✔
2745
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
58,990✔
2746
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
58,990✔
2747
    if (m_last_download_mark_received == m_last_triggering_download_mark)
58,990✔
2748
        return;
43,160✔
2749
    if (m_last_download_mark_received < m_target_download_mark)
15,830✔
2750
        return;
452✔
2751
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
15,378✔
2752
        return;
×
2753
    m_last_triggering_download_mark = m_target_download_mark;
15,378✔
2754
    if (REALM_UNLIKELY(!m_allow_upload)) {
15,378✔
2755
        // Activate the upload process now, and enable immediate reactivation
1,990✔
2756
        // after a subsequent fast reconnect.
1,990✔
2757
        m_allow_upload = true;
4,166✔
2758
        ensure_enlisted_to_send(); // Throws
4,166✔
2759
    }
4,166✔
2760
    on_download_completion(); // Throws
15,378✔
2761
}
15,378✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc