• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1816

04 Nov 2023 12:29AM UTC coverage: 91.648% (-0.01%) from 91.66%
1816

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92128 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

84 existing lines in 15 files now uncovered.

230681 of 251702 relevant lines covered (91.65%)

6383138.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.01
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <system_error>
2
#include <sstream>
3

4
#include <realm/util/assert.hpp>
5
#include <realm/util/safe_int_ops.hpp>
6
#include <realm/util/random.hpp>
7
#include <realm/util/memory_stream.hpp>
8
#include <realm/util/basic_system_errors.hpp>
9
#include <realm/util/scope_exit.hpp>
10
#include <realm/util/to_string.hpp>
11
#include <realm/util/uri.hpp>
12
#include <realm/util/platform_info.hpp>
13
#include <realm/sync/impl/clock.hpp>
14
#include <realm/impl/simulated_failure.hpp>
15
#include <realm/sync/network/http.hpp>
16
#include <realm/sync/network/websocket.hpp>
17
#include <realm/sync/noinst/client_history_impl.hpp>
18
#include <realm/sync/noinst/client_impl_base.hpp>
19
#include <realm/sync/noinst/compact_changesets.hpp>
20
#include <realm/sync/protocol.hpp>
21
#include <realm/version.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,780✔
48
    m_backoff_state.reset();
1,780✔
49
    scheduled_reset = false;
1,780✔
50
}
1,780✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,322✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,322✔
57
}
3,322✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
5,384✔
62
    if (scheduled_reset) {
5,384✔
63
        reset();
4✔
64
    }
4✔
65

2,712✔
66
    if (!m_backoff_state.triggering_error) {
5,384✔
67
        return std::chrono::milliseconds::zero();
4,148✔
68
    }
4,148✔
69

646✔
70
    switch (*m_backoff_state.triggering_error) {
1,236✔
71
        case ConnectionTerminationReason::closed_voluntarily:
74✔
72
            return std::chrono::milliseconds::zero();
74✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,144✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,144✔
77
                return std::chrono::milliseconds::max();
920✔
78
            }
920✔
79

112✔
80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
224✔
81
            return m_backoff_state.delay_interval();
224✔
82
    }
1,236✔
83
}
1,236✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
3,344✔
89
    util::Uri uri(url); // Throws
3,344✔
90
    uri.canonicalize(); // Throws
3,344✔
91
    std::string userinfo, address_2, port_2;
3,344✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,344✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,344✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,344✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,344✔
96
    if (REALM_UNLIKELY(!good))
3,344✔
97
        return false;
1,496✔
98
    ProtocolEnvelope protocol_2;
3,344✔
99
    port_type port_3;
3,344✔
100
    if (realm_scheme) {
3,344✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
3,344✔
111
        REALM_ASSERT(ws_scheme);
3,344✔
112
        if (uri.get_scheme() == "ws:") {
3,344✔
113
            protocol_2 = ProtocolEnvelope::ws;
3,340✔
114
            port_3 = 80;
3,340✔
115
        }
3,340✔
116
        else {
4✔
117
            protocol_2 = ProtocolEnvelope::wss;
4✔
118
            port_3 = 443;
4✔
119
        }
4✔
120
    }
3,344✔
121
    if (!port_2.empty()) {
3,344✔
122
        std::istringstream in(port_2);    // Throws
3,344✔
123
        in.imbue(std::locale::classic()); // Throws
3,344✔
124
        in >> port_3;
3,344✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,344✔
126
            return false;
1,496✔
127
    }
3,344✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
3,344✔
129

1,496✔
130
    protocol = protocol_2;
3,344✔
131
    address = std::move(address_2);
3,344✔
132
    port = port_3;
3,344✔
133
    path = std::move(path_2);
3,344✔
134
    return true;
3,344✔
135
}
3,344✔
136

137

138
ClientImpl::ClientImpl(ClientConfig config)
139
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
140
    , logger{*logger_ptr}
141
    , m_reconnect_mode{config.reconnect_mode}
142
    , m_connect_timeout{config.connect_timeout}
143
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
144
    , m_ping_keepalive_period{config.ping_keepalive_period}
145
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
146
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
147
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
148
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
149
    , m_dry_run{config.dry_run}
150
    , m_enable_default_port_hack{config.enable_default_port_hack}
151
    , m_disable_upload_compaction{config.disable_upload_compaction}
152
    , m_fix_up_object_ids{config.fix_up_object_ids}
153
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
154
    , m_socket_provider{std::move(config.socket_provider)}
155
    , m_client_protocol{} // Throws
156
    , m_one_connection_per_session{config.one_connection_per_session}
157
    , m_random{}
158
{
8,978✔
159
    // FIXME: Would be better if seeding was up to the application.
4,420✔
160
    util::seed_prng_nondeterministically(m_random); // Throws
8,978✔
161

4,420✔
162
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,978✔
163
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,978✔
164
                 get_current_protocol_version()); // Throws
8,978✔
165
    logger.info("Platform: %1", util::get_platform_info());
8,978✔
166
    const char* build_mode;
8,978✔
167
#if REALM_DEBUG
8,978✔
168
    build_mode = "Debug";
8,978✔
169
#else
170
    build_mode = "Release";
171
#endif
172
    logger.debug("Build mode: %1", build_mode);
8,978✔
173
    logger.debug("Config param: one_connection_per_session = %1",
8,978✔
174
                 config.one_connection_per_session); // Throws
8,978✔
175
    logger.debug("Config param: connect_timeout = %1 ms",
8,978✔
176
                 config.connect_timeout); // Throws
8,978✔
177
    logger.debug("Config param: connection_linger_time = %1 ms",
8,978✔
178
                 config.connection_linger_time); // Throws
8,978✔
179
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,978✔
180
                 config.ping_keepalive_period); // Throws
8,978✔
181
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,978✔
182
                 config.pong_keepalive_timeout); // Throws
8,978✔
183
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,978✔
184
                 config.fast_reconnect_limit); // Throws
8,978✔
185
    logger.debug("Config param: disable_upload_compaction = %1",
8,978✔
186
                 config.disable_upload_compaction); // Throws
8,978✔
187
    logger.debug("Config param: disable_sync_to_disk = %1",
8,978✔
188
                 config.disable_sync_to_disk); // Throws
8,978✔
189
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,978✔
190
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,978✔
191
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,978✔
192
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
8,978✔
193

4,420✔
194
    if (config.reconnect_mode != ReconnectMode::normal) {
8,978✔
195
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
772✔
196
                    "never do this in production!");
772✔
197
    }
772✔
198

4,420✔
199
    if (config.dry_run) {
8,978✔
200
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
201
                    "never do this in production!");
×
202
    }
×
203

4,420✔
204
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
8,978✔
205

4,420✔
206
    if (m_one_connection_per_session) {
8,978✔
207
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
208
        // multiplexing.
2✔
209
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
210
        //            "never do this in production");
2✔
211
    }
4✔
212

4,420✔
213
    if (config.disable_upload_activation_delay) {
8,978✔
214
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
215
                    "never do this in production");
×
216
    }
×
217

4,420✔
218
    if (config.disable_sync_to_disk) {
8,978✔
219
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
220
                    "never do this in production");
×
221
    }
×
222

4,420✔
223
    m_actualize_and_finalize = create_trigger([this](Status status) {
15,638✔
224
        if (status == ErrorCodes::OperationAborted)
15,638✔
225
            return;
×
226
        else if (!status.is_ok())
15,638✔
227
            throw Exception(status);
×
228
        actualize_and_finalize_session_wrappers(); // Throws
15,638✔
229
    });
15,638✔
230
}
8,978✔
231

232

233
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
234
{
162,104✔
235
    REALM_ASSERT(m_socket_provider);
162,104✔
236
    {
162,104✔
237
        std::lock_guard lock(m_drain_mutex);
162,104✔
238
        ++m_outstanding_posts;
162,104✔
239
        m_drained = false;
162,104✔
240
    }
162,104✔
241
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
162,108✔
242
        auto decr_guard = util::make_scope_exit([&]() noexcept {
162,110✔
243
            std::lock_guard lock(m_drain_mutex);
162,106✔
244
            REALM_ASSERT(m_outstanding_posts);
162,106✔
245
            --m_outstanding_posts;
162,106✔
246
            m_drain_cv.notify_all();
162,106✔
247
        });
162,106✔
248
        handler(status);
162,106✔
249
    });
162,106✔
250
}
162,104✔
251

252

253
void ClientImpl::drain_connections()
254
{
8,976✔
255
    logger.debug("Draining connections during sync client shutdown");
8,976✔
256
    for (auto& server_slot_pair : m_server_slots) {
5,662✔
257
        auto& server_slot = server_slot_pair.second;
2,348✔
258

1,106✔
259
        if (server_slot.connection) {
2,348✔
260
            auto& conn = server_slot.connection;
2,248✔
261
            conn->force_close();
2,248✔
262
        }
2,248✔
263
        else {
100✔
264
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
265
                conn_pair.second->force_close();
6✔
266
            }
6✔
267
        }
100✔
268
    }
2,348✔
269
}
8,976✔
270

271

272
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
273
                                                       SyncSocketProvider::FunctionHandler&& handler)
274
{
16,190✔
275
    REALM_ASSERT(m_socket_provider);
16,190✔
276
    {
16,190✔
277
        std::lock_guard lock(m_drain_mutex);
16,190✔
278
        ++m_outstanding_posts;
16,190✔
279
        m_drained = false;
16,190✔
280
    }
16,190✔
281
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,190✔
282
        handler(status);
16,188✔
283

7,870✔
284
        std::lock_guard lock(m_drain_mutex);
16,188✔
285
        REALM_ASSERT(m_outstanding_posts);
16,188✔
286
        --m_outstanding_posts;
16,188✔
287
        m_drain_cv.notify_all();
16,188✔
288
    });
16,188✔
289
}
16,190✔
290

291

292
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
293
{
11,440✔
294
    REALM_ASSERT(m_socket_provider);
11,440✔
295
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,440✔
296
}
11,440✔
297

298
Connection::~Connection()
299
{
2,458✔
300
    if (m_websocket_sentinel) {
2,458✔
301
        m_websocket_sentinel->destroyed = true;
×
302
        m_websocket_sentinel.reset();
×
303
    }
×
304
}
2,458✔
305

306
void Connection::activate()
307
{
2,460✔
308
    REALM_ASSERT(m_on_idle);
2,460✔
309
    m_activated = true;
2,460✔
310
    if (m_num_active_sessions == 0)
2,460✔
311
        m_on_idle->trigger();
×
312
    // We cannot in general connect immediately, because a prior failure to
1,160✔
313
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,160✔
314
    initiate_reconnect_wait(); // Throws
2,460✔
315
}
2,460✔
316

317

318
void Connection::activate_session(std::unique_ptr<Session> sess)
319
{
9,624✔
320
    REALM_ASSERT(sess);
9,624✔
321
    REALM_ASSERT(&sess->m_conn == this);
9,624✔
322
    REALM_ASSERT(!m_force_closed);
9,624✔
323
    Session& sess_2 = *sess;
9,624✔
324
    session_ident_type ident = sess->m_ident;
9,624✔
325
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,624✔
326
    bool was_inserted = p.second;
9,624✔
327
    REALM_ASSERT(was_inserted);
9,624✔
328
    // Save the session ident to the historical list of session idents
4,636✔
329
    m_session_history.insert(ident);
9,624✔
330
    sess_2.activate(); // Throws
9,624✔
331
    if (m_state == ConnectionState::connected) {
9,624✔
332
        bool fast_reconnect = false;
6,726✔
333
        sess_2.connection_established(fast_reconnect); // Throws
6,726✔
334
    }
6,726✔
335
    ++m_num_active_sessions;
9,624✔
336
}
9,624✔
337

338

339
void Connection::initiate_session_deactivation(Session* sess)
340
{
9,624✔
341
    REALM_ASSERT(sess);
9,624✔
342
    REALM_ASSERT(&sess->m_conn == this);
9,624✔
343
    REALM_ASSERT(m_num_active_sessions);
9,624✔
344
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,636✔
345
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,636✔
346
    // decrementing the num active sessions value.
4,636✔
347
    sess->initiate_deactivation(); // Throws
9,624✔
348
    if (sess->m_state == Session::Deactivated) {
9,624✔
349
        finish_session_deactivation(sess);
1,002✔
350
    }
1,002✔
351
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,624✔
352
        if (m_activated && m_state == ConnectionState::disconnected)
3,970✔
353
            m_on_idle->trigger();
334✔
354
    }
3,970✔
355
}
9,624✔
356

357

358
void Connection::cancel_reconnect_delay()
359
{
1,984✔
360
    REALM_ASSERT(m_activated);
1,984✔
361

1,054✔
362
    if (m_reconnect_delay_in_progress) {
1,984✔
363
        if (m_nonzero_reconnect_delay)
1,772✔
364
            logger.detail("Canceling reconnect delay"); // Throws
890✔
365

948✔
366
        // Cancel the in-progress wait operation by destroying the timer
948✔
367
        // object. Destruction is needed in this case, because a new wait
948✔
368
        // operation might have to be initiated before the previous one
948✔
369
        // completes (its completion handler starts to execute), so the new wait
948✔
370
        // operation must be done on a new timer object.
948✔
371
        m_reconnect_disconnect_timer.reset();
1,772✔
372
        m_reconnect_delay_in_progress = false;
1,772✔
373
        m_reconnect_info.reset();
1,772✔
374
        initiate_reconnect_wait(); // Throws
1,772✔
375
        return;
1,772✔
376
    }
1,772✔
377

106✔
378
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
379
    // that we are allowed to re-connect as quickly as possible.
106✔
380
    //
106✔
381
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
106✔
382
    // backoff/delay state before calculating the next delay, unless a PONG message is received
106✔
383
    // for the urgent PING message we send below.
106✔
384
    //
106✔
385
    // If we get a PONG message for the urgent PING message sent below, then the connection is
106✔
386
    // healthy and we can calculate the next delay normally.
106✔
387
    if (m_state != ConnectionState::disconnected) {
212✔
388
        m_reconnect_info.scheduled_reset = true;
212✔
389
        m_ping_after_scheduled_reset_of_reconnect_info = false;
212✔
390

106✔
391
        schedule_urgent_ping(); // Throws
212✔
392
        return;
212✔
393
    }
212✔
394
    // Nothing to do in this case. The next reconnect attemp will be made as
106✔
395
    // soon as there are any sessions that are both active and unsuspended.
106✔
396
}
212✔
397

398
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
399
{
8,048✔
400
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,048✔
401
    auto ident = sess->m_ident;
8,048✔
402
    m_sessions.erase(ident);
8,048✔
403
    m_session_history.erase(ident);
8,048✔
404
}
8,048✔
405

406
void Connection::force_close()
407
{
2,254✔
408
    if (m_force_closed) {
2,254✔
409
        return;
×
410
    }
×
411

1,064✔
412
    m_force_closed = true;
2,254✔
413

1,064✔
414
    if (m_state != ConnectionState::disconnected) {
2,254✔
415
        voluntary_disconnect();
2,170✔
416
    }
2,170✔
417

1,064✔
418
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,254✔
419
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,254✔
420
        m_reconnect_disconnect_timer.reset();
84✔
421
        m_reconnect_delay_in_progress = false;
84✔
422
        m_disconnect_delay_in_progress = false;
84✔
423
    }
84✔
424

1,064✔
425
    // We must copy any session pointers we want to close to a vector because force_closing
1,064✔
426
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,064✔
427
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,064✔
428
    std::vector<Session*> to_close;
2,254✔
429
    for (auto& session_pair : m_sessions) {
1,142✔
430
        if (session_pair.second->m_state == Session::State::Active) {
156✔
431
            to_close.push_back(session_pair.second.get());
156✔
432
        }
156✔
433
    }
156✔
434

1,064✔
435
    for (auto& sess : to_close) {
1,142✔
436
        sess->force_close();
156✔
437
    }
156✔
438

1,064✔
439
    logger.debug("Force closed idle connection");
2,254✔
440
}
2,254✔
441

442

443
void Connection::websocket_connected_handler(const std::string& protocol)
444
{
3,200✔
445
    if (!protocol.empty()) {
3,200✔
446
        std::string_view expected_prefix =
3,200✔
447
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,932✔
448
        // FIXME: Use std::string_view::begins_with() in C++20.
1,564✔
449
        auto prefix_matches = [&](std::string_view other) {
3,200✔
450
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,200✔
451
        };
3,200✔
452
        if (prefix_matches(expected_prefix)) {
3,200✔
453
            util::MemoryInputStream in;
3,200✔
454
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,200✔
455
            in.imbue(std::locale::classic());
3,200✔
456
            in.unsetf(std::ios_base::skipws);
3,200✔
457
            int value_2 = 0;
3,200✔
458
            in >> value_2;
3,200✔
459
            if (in && in.eof() && value_2 >= 0) {
3,200✔
460
                bool good_version =
3,200✔
461
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,200✔
462
                if (good_version) {
3,200✔
463
                    logger.detail("Negotiated protocol version: %1", value_2);
3,200✔
464
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,564✔
465
                    // will provide the appservices connection ID via a log message.
1,564✔
466
                    // TODO: Remove once the server starts sending the connection ID
1,564✔
467
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,200✔
468
                    m_negotiated_protocol_version = value_2;
3,200✔
469
                    handle_connection_established(); // Throws
3,200✔
470
                    return;
3,200✔
471
                }
3,200✔
472
            }
×
473
        }
3,200✔
474
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
475
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
476
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
477
    }
×
478
    else {
×
479
        close_due_to_client_side_error(
×
480
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
481
            ConnectionTerminationReason::bad_headers_in_http_response);
×
482
    }
×
483
}
3,200✔
484

485

486
bool Connection::websocket_binary_message_received(util::Span<const char> data)
487
{
75,904✔
488
    if (m_force_closed) {
75,904✔
489
        logger.debug("Received binary message after connection was force closed");
×
490
        return false;
×
491
    }
×
492

39,078✔
493
    using sf = SimulatedFailure;
75,904✔
494
    if (sf::check_trigger(sf::sync_client__read_head)) {
75,904✔
495
        close_due_to_client_side_error(
452✔
496
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
452✔
497
            ConnectionTerminationReason::read_or_write_error);
452✔
498
        return bool(m_websocket);
452✔
499
    }
452✔
500

38,834✔
501
    handle_message_received(data);
75,452✔
502
    return bool(m_websocket);
75,452✔
503
}
75,452✔
504

505

506
void Connection::websocket_error_handler()
507
{
524✔
508
    m_websocket_error_received = true;
524✔
509
}
524✔
510

511
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
512
{
652✔
513
    if (m_force_closed) {
652✔
514
        logger.debug("Received websocket close message after connection was force closed");
×
515
        return false;
×
516
    }
×
517
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
652✔
518

340✔
519
    switch (error_code) {
652✔
520
        case WebSocketError::websocket_ok:
76✔
521
            break;
76✔
522
        case WebSocketError::websocket_resolve_failed:
4✔
523
            [[fallthrough]];
4✔
524
        case WebSocketError::websocket_connection_failed: {
4✔
525
            SessionErrorInfo error_info(
4✔
526
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
4✔
527
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
4✔
528
            break;
4✔
529
        }
4✔
530
        case WebSocketError::websocket_read_error:
510✔
531
            [[fallthrough]];
510✔
532
        case WebSocketError::websocket_write_error: {
510✔
533
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
510✔
534
                                         ConnectionTerminationReason::read_or_write_error);
510✔
535
            break;
510✔
536
        }
510✔
537
        case WebSocketError::websocket_going_away:
268✔
538
            [[fallthrough]];
×
539
        case WebSocketError::websocket_protocol_error:
✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_unsupported_data:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_invalid_payload_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_policy_violation:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_reserved:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_no_status_received:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_invalid_extension: {
✔
552
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
553
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
554
            break;
×
555
        }
×
556
        case WebSocketError::websocket_message_too_big: {
4✔
557
            auto message = util::format(
4✔
558
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
559
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
560
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
561
            involuntary_disconnect(std::move(error_info),
4✔
562
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
563
            break;
4✔
564
        }
×
565
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
566
            close_due_to_client_side_error(
10✔
567
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
568
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
569
            break;
10✔
570
        }
×
571
        case WebSocketError::websocket_client_too_old:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_client_too_new:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_protocol_mismatch: {
✔
576
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
577
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
578
            break;
×
579
        }
×
580
        case WebSocketError::websocket_fatal_error: {
✔
581
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
582
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
583
            break;
×
584
        }
×
585
        case WebSocketError::websocket_forbidden: {
✔
586
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
587
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
588
            involuntary_disconnect(std::move(error_info),
×
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
590
            break;
×
591
        }
×
592
        case WebSocketError::websocket_unauthorized: {
36✔
593
            SessionErrorInfo error_info(
36✔
594
                {ErrorCodes::AuthError,
36✔
595
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
596
                IsFatal{false});
36✔
597
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
598
            involuntary_disconnect(std::move(error_info),
36✔
599
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
600
            break;
36✔
601
        }
×
602
        case WebSocketError::websocket_moved_permanently: {
12✔
603
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
605
            involuntary_disconnect(std::move(error_info),
12✔
606
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
607
            break;
12✔
608
        }
×
609
        case WebSocketError::websocket_abnormal_closure: {
✔
610
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
612
            involuntary_disconnect(std::move(error_info),
×
613
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_internal_server_error:
✔
617
            [[fallthrough]];
×
618
        case WebSocketError::websocket_retry_error: {
✔
619
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
621
            break;
×
622
        }
652✔
623
    }
652✔
624

340✔
625
    return bool(m_websocket);
652✔
626
}
652✔
627

628
// Guarantees that handle_reconnect_wait() is never called from within the
629
// execution of initiate_reconnect_wait() (no callback reentrance).
630
void Connection::initiate_reconnect_wait()
631
{
7,550✔
632
    REALM_ASSERT(m_activated);
7,550✔
633
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,550✔
634
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,550✔
635

3,730✔
636
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,730✔
637
    if (m_force_closed) {
7,550✔
638
        return;
2,166✔
639
    }
2,166✔
640

2,712✔
641
    m_reconnect_delay_in_progress = true;
5,384✔
642
    auto delay = m_reconnect_info.delay_interval();
5,384✔
643
    if (delay == std::chrono::milliseconds::max()) {
5,384✔
644
        logger.detail("Reconnection delayed indefinitely"); // Throws
938✔
645
        // Not actually starting a timer corresponds to an infinite wait
500✔
646
        m_nonzero_reconnect_delay = true;
938✔
647
        return;
938✔
648
    }
938✔
649

2,212✔
650
    if (delay == std::chrono::milliseconds::zero()) {
4,446✔
651
        m_nonzero_reconnect_delay = false;
4,224✔
652
    }
4,224✔
653
    else {
222✔
654
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
222✔
655
        m_nonzero_reconnect_delay = true;
222✔
656
    }
222✔
657

2,212✔
658
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,212✔
659
    // we need it to be cancelable in case the connection is terminated before the timer
2,212✔
660
    // callback is run.
2,212✔
661
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,448✔
662
        // If the operation is aborted, the connection object may have been
2,214✔
663
        // destroyed.
2,214✔
664
        if (status != ErrorCodes::OperationAborted)
4,448✔
665
            handle_reconnect_wait(status); // Throws
3,322✔
666
    });                                    // Throws
4,448✔
667
}
4,446✔
668

669

670
void Connection::handle_reconnect_wait(Status status)
671
{
3,322✔
672
    if (!status.is_ok()) {
3,322✔
673
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
674
        throw Exception(status);
×
675
    }
×
676

1,624✔
677
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,322✔
678
    m_reconnect_delay_in_progress = false;
3,322✔
679

1,624✔
680
    if (m_num_active_unsuspended_sessions > 0)
3,322✔
681
        initiate_reconnect(); // Throws
3,322✔
682
}
3,322✔
683

684
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
685
    explicit WebSocketObserverShim(Connection* conn)
686
        : conn(conn)
687
        , sentinel(conn->m_websocket_sentinel)
688
    {
3,322✔
689
    }
3,322✔
690

691
    Connection* conn;
692
    util::bind_ptr<LifecycleSentinel> sentinel;
693

694
    void websocket_connected_handler(const std::string& protocol) override
695
    {
3,200✔
696
        if (sentinel->destroyed) {
3,200✔
697
            return;
×
698
        }
×
699

1,564✔
700
        return conn->websocket_connected_handler(protocol);
3,200✔
701
    }
3,200✔
702

703
    void websocket_error_handler() override
704
    {
524✔
705
        if (sentinel->destroyed) {
524✔
706
            return;
×
707
        }
×
708

276✔
709
        conn->websocket_error_handler();
524✔
710
    }
524✔
711

712
    bool websocket_binary_message_received(util::Span<const char> data) override
713
    {
75,904✔
714
        if (sentinel->destroyed) {
75,904✔
715
            return false;
×
716
        }
×
717

39,078✔
718
        return conn->websocket_binary_message_received(data);
75,904✔
719
    }
75,904✔
720

721
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
722
    {
652✔
723
        if (sentinel->destroyed) {
652✔
724
            return true;
×
725
        }
×
726

340✔
727
        return conn->websocket_closed_handler(was_clean, error_code, msg);
652✔
728
    }
652✔
729
};
730

731
void Connection::initiate_reconnect()
732
{
3,322✔
733
    REALM_ASSERT(m_activated);
3,322✔
734

1,624✔
735
    m_state = ConnectionState::connecting;
3,322✔
736
    report_connection_state_change(ConnectionState::connecting); // Throws
3,322✔
737
    if (m_websocket_sentinel) {
3,322✔
738
        m_websocket_sentinel->destroyed = true;
×
739
    }
×
740
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,322✔
741
    m_websocket.reset();
3,322✔
742

1,624✔
743
    // Watchdog
1,624✔
744
    initiate_connect_wait(); // Throws
3,322✔
745

1,624✔
746
    std::vector<std::string> sec_websocket_protocol;
3,322✔
747
    {
3,322✔
748
        auto protocol_prefix =
3,322✔
749
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,048✔
750
        int min = get_oldest_supported_protocol_version();
3,322✔
751
        int max = get_current_protocol_version();
3,322✔
752
        REALM_ASSERT_3(min, <=, max);
3,322✔
753
        // List protocol version in descending order to ensure that the server
1,624✔
754
        // selects the highest possible version.
1,624✔
755
        for (int version = max; version >= min; --version) {
33,208✔
756
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
29,886✔
757
        }
29,886✔
758
    }
3,322✔
759

1,624✔
760
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,322✔
761
                m_server_endpoint.port, m_http_request_path_prefix);
3,322✔
762

1,624✔
763
    m_websocket_error_received = false;
3,322✔
764
    m_websocket =
3,322✔
765
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,322✔
766
                                            WebSocketEndpoint{
3,322✔
767
                                                m_server_endpoint.address,
3,322✔
768
                                                m_server_endpoint.port,
3,322✔
769
                                                get_http_request_path(),
3,322✔
770
                                                std::move(sec_websocket_protocol),
3,322✔
771
                                                is_ssl(m_server_endpoint.envelope),
3,322✔
772
                                                /// DEPRECATED - The following will be removed in a future release
1,624✔
773
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,322✔
774
                                                m_verify_servers_ssl_certificate,
3,322✔
775
                                                m_ssl_trust_certificate_path,
3,322✔
776
                                                m_ssl_verify_callback,
3,322✔
777
                                                m_proxy_config,
3,322✔
778
                                            });
3,322✔
779
}
3,322✔
780

781

782
void Connection::initiate_connect_wait()
783
{
3,322✔
784
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,624✔
785
    // fully establish the connection (including SSL and WebSocket
1,624✔
786
    // handshakes). Without such a watchdog, connect operations could take very
1,624✔
787
    // long, or even indefinite time.
1,624✔
788
    milliseconds_type time = m_client.m_connect_timeout;
3,322✔
789

1,624✔
790
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,322✔
791
        // If the operation is aborted, the connection object may have been
1,624✔
792
        // destroyed.
1,624✔
793
        if (status != ErrorCodes::OperationAborted)
3,322✔
794
            handle_connect_wait(status); // Throws
×
795
    });                                  // Throws
3,322✔
796
}
3,322✔
797

798

799
void Connection::handle_connect_wait(Status status)
800
{
×
801
    if (!status.is_ok()) {
×
802
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
803
        throw Exception(status);
×
804
    }
×
805

806
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
807
    logger.info("Connect timeout"); // Throws
×
808
    involuntary_disconnect(
×
809
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
810
                         IsFatal{false}},
×
811
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
812
}
×
813

814

815
void Connection::handle_connection_established()
816
{
3,200✔
817
    // Cancel connect timeout watchdog
1,564✔
818
    m_connect_timer.reset();
3,200✔
819

1,564✔
820
    m_state = ConnectionState::connected;
3,200✔
821

1,564✔
822
    milliseconds_type now = monotonic_clock_now();
3,200✔
823
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,200✔
824
    initiate_ping_delay(now);     // Throws
3,200✔
825

1,564✔
826
    bool fast_reconnect = false;
3,200✔
827
    if (m_disconnect_has_occurred) {
3,200✔
828
        milliseconds_type time = now - m_disconnect_time;
946✔
829
        if (time <= m_client.m_fast_reconnect_limit)
946✔
830
            fast_reconnect = true;
946✔
831
    }
946✔
832

1,564✔
833
    for (auto& p : m_sessions) {
4,250✔
834
        Session& sess = *p.second;
4,250✔
835
        sess.connection_established(fast_reconnect); // Throws
4,250✔
836
    }
4,250✔
837

1,564✔
838
    report_connection_state_change(ConnectionState::connected); // Throws
3,200✔
839
}
3,200✔
840

841

842
void Connection::schedule_urgent_ping()
843
{
212✔
844
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
212✔
845
    if (m_ping_delay_in_progress) {
212✔
846
        m_heartbeat_timer.reset();
190✔
847
        m_ping_delay_in_progress = false;
190✔
848
        m_minimize_next_ping_delay = true;
190✔
849
        milliseconds_type now = monotonic_clock_now();
190✔
850
        initiate_ping_delay(now); // Throws
190✔
851
        return;
190✔
852
    }
190✔
853
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
22✔
854
    if (!m_send_ping)
22✔
855
        m_minimize_next_ping_delay = true;
22✔
856
}
22✔
857

858

859
void Connection::initiate_ping_delay(milliseconds_type now)
860
{
3,578✔
861
    REALM_ASSERT(!m_ping_delay_in_progress);
3,578✔
862
    REALM_ASSERT(!m_waiting_for_pong);
3,578✔
863
    REALM_ASSERT(!m_send_ping);
3,578✔
864

1,722✔
865
    milliseconds_type delay = 0;
3,578✔
866
    if (!m_minimize_next_ping_delay) {
3,578✔
867
        delay = m_client.m_ping_keepalive_period;
3,382✔
868
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,632✔
869
        // the first PING message to be sent since the connection was
1,632✔
870
        // established. The purpose of this randomized deduction is to reduce
1,632✔
871
        // the risk of many connections sending PING messages simultaneously to
1,632✔
872
        // the server.
1,632✔
873
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,268✔
874
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,382✔
875
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,382✔
876
        delay -= randomized_deduction;
3,382✔
877
        // Deduct the time spent waiting for PONG
1,632✔
878
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,382✔
879
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,382✔
880
        if (spent_time < delay) {
3,382✔
881
            delay -= spent_time;
3,374✔
882
        }
3,374✔
883
        else {
8✔
884
            delay = 0;
8✔
885
        }
8✔
886
    }
3,382✔
887
    else {
196✔
888
        m_minimize_next_ping_delay = false;
196✔
889
    }
196✔
890

1,722✔
891

1,722✔
892
    m_ping_delay_in_progress = true;
3,578✔
893

1,722✔
894
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,578✔
895
        if (status == ErrorCodes::OperationAborted)
3,576✔
896
            return;
3,364✔
897
        else if (!status.is_ok())
212✔
898
            throw Exception(status);
×
899

82✔
900
        handle_ping_delay();                                    // Throws
212✔
901
    });                                                         // Throws
212✔
902
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,578✔
903
}
3,578✔
904

905

906
void Connection::handle_ping_delay()
907
{
212✔
908
    REALM_ASSERT(m_ping_delay_in_progress);
212✔
909
    m_ping_delay_in_progress = false;
212✔
910
    m_send_ping = true;
212✔
911

82✔
912
    initiate_pong_timeout(); // Throws
212✔
913

82✔
914
    if (m_state == ConnectionState::connected && !m_sending)
212✔
915
        send_next_message(); // Throws
176✔
916
}
212✔
917

918

919
void Connection::initiate_pong_timeout()
920
{
212✔
921
    REALM_ASSERT(!m_ping_delay_in_progress);
212✔
922
    REALM_ASSERT(!m_waiting_for_pong);
212✔
923
    REALM_ASSERT(m_send_ping);
212✔
924

82✔
925
    m_waiting_for_pong = true;
212✔
926
    m_pong_wait_started_at = monotonic_clock_now();
212✔
927

82✔
928
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
212✔
929
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
212✔
930
        if (status == ErrorCodes::OperationAborted)
212✔
931
            return;
200✔
932
        else if (!status.is_ok())
12✔
933
            throw Exception(status);
×
934

6✔
935
        handle_pong_timeout(); // Throws
12✔
936
    });                        // Throws
12✔
937
}
212✔
938

939

940
void Connection::handle_pong_timeout()
941
{
12✔
942
    REALM_ASSERT(m_waiting_for_pong);
12✔
943
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
944
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
945
                                 ConnectionTerminationReason::pong_timeout);
12✔
946
}
12✔
947

948

949
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
950
{
93,510✔
951
    // Stop sending messages if an websocket error was received.
46,358✔
952
    if (m_websocket_error_received)
93,510✔
953
        return;
×
954

46,358✔
955
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
93,510✔
956
        if (sentinel->destroyed) {
93,390✔
957
            return;
1,442✔
958
        }
1,442✔
959
        if (!status.is_ok()) {
91,948✔
960
            if (status != ErrorCodes::Error::OperationAborted) {
×
961
                // Write errors will be handled by the websocket_write_error_handler() callback
962
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
963
            }
×
964
            return;
×
965
        }
×
966
        handle_write_message(); // Throws
91,948✔
967
    });                         // Throws
91,948✔
968
    m_sending_session = sess;
93,510✔
969
    m_sending = true;
93,510✔
970
}
93,510✔
971

972

973
void Connection::handle_write_message()
974
{
91,944✔
975
    m_sending_session->message_sent(); // Throws
91,944✔
976
    if (m_sending_session->m_state == Session::Deactivated) {
91,944✔
977
        finish_session_deactivation(m_sending_session);
82✔
978
    }
82✔
979
    m_sending_session = nullptr;
91,944✔
980
    m_sending = false;
91,944✔
981
    send_next_message(); // Throws
91,944✔
982
}
91,944✔
983

984

985
void Connection::send_next_message()
986
{
147,976✔
987
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
147,976✔
988
    REALM_ASSERT(!m_sending_session);
147,976✔
989
    REALM_ASSERT(!m_sending);
147,976✔
990
    if (m_send_ping) {
147,976✔
991
        send_ping(); // Throws
200✔
992
        return;
200✔
993
    }
200✔
994
    while (!m_sessions_enlisted_to_send.empty()) {
205,912✔
995
        // The state of being connected is not supposed to be able to change
74,814✔
996
        // across this loop thanks to the "no callback reentrance" guarantee
74,814✔
997
        // provided by Websocket::async_write_text(), and friends.
74,814✔
998
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
151,854✔
999

74,814✔
1000
        Session& sess = *m_sessions_enlisted_to_send.front();
151,854✔
1001
        m_sessions_enlisted_to_send.pop_front();
151,854✔
1002
        sess.send_message(); // Throws
151,854✔
1003

74,814✔
1004
        if (sess.m_state == Session::Deactivated) {
151,854✔
1005
            finish_session_deactivation(&sess);
2,664✔
1006
        }
2,664✔
1007

74,814✔
1008
        // An enlisted session may choose to not send a message. In that case,
74,814✔
1009
        // we should pass the opportunity to the next enlisted session.
74,814✔
1010
        if (m_sending)
151,854✔
1011
            break;
93,718✔
1012
    }
151,854✔
1013
}
147,776✔
1014

1015

1016
void Connection::send_ping()
1017
{
200✔
1018
    REALM_ASSERT(!m_ping_delay_in_progress);
200✔
1019
    REALM_ASSERT(m_waiting_for_pong);
200✔
1020
    REALM_ASSERT(m_send_ping);
200✔
1021

76✔
1022
    m_send_ping = false;
200✔
1023
    if (m_reconnect_info.scheduled_reset)
200✔
1024
        m_ping_after_scheduled_reset_of_reconnect_info = true;
160✔
1025

76✔
1026
    m_last_ping_sent_at = monotonic_clock_now();
200✔
1027
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
200✔
1028
                 m_previous_ping_rtt); // Throws
200✔
1029

76✔
1030
    ClientProtocol& protocol = get_client_protocol();
200✔
1031
    OutputBuffer& out = get_output_buffer();
200✔
1032
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
200✔
1033
    initiate_write_ping(out);                                          // Throws
200✔
1034
    m_ping_sent = true;
200✔
1035
}
200✔
1036

1037

1038
void Connection::initiate_write_ping(const OutputBuffer& out)
1039
{
200✔
1040
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
200✔
1041
        if (sentinel->destroyed) {
200✔
1042
            return;
×
1043
        }
×
1044
        if (!status.is_ok()) {
200✔
1045
            if (status != ErrorCodes::Error::OperationAborted) {
×
1046
                // Write errors will be handled by the websocket_write_error_handler() callback
1047
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1048
            }
×
1049
            return;
×
1050
        }
×
1051
        handle_write_ping(); // Throws
200✔
1052
    });                      // Throws
200✔
1053
    m_sending = true;
200✔
1054
}
200✔
1055

1056

1057
void Connection::handle_write_ping()
1058
{
200✔
1059
    REALM_ASSERT(m_sending);
200✔
1060
    REALM_ASSERT(!m_sending_session);
200✔
1061
    m_sending = false;
200✔
1062
    send_next_message(); // Throws
200✔
1063
}
200✔
1064

1065

1066
void Connection::handle_message_received(util::Span<const char> data)
1067
{
75,448✔
1068
    // parse_message_received() parses the message and calls the proper handler
38,834✔
1069
    // on the Connection object (this).
38,834✔
1070
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
75,448✔
1071
}
75,448✔
1072

1073

1074
void Connection::initiate_disconnect_wait()
1075
{
4,270✔
1076
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,270✔
1077

2,030✔
1078
    if (m_disconnect_delay_in_progress) {
4,270✔
1079
        m_reconnect_disconnect_timer.reset();
2,046✔
1080
        m_disconnect_delay_in_progress = false;
2,046✔
1081
    }
2,046✔
1082

2,030✔
1083
    milliseconds_type time = m_client.m_connection_linger_time;
4,270✔
1084

2,030✔
1085
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,270✔
1086
        // If the operation is aborted, the connection object may have been
2,028✔
1087
        // destroyed.
2,028✔
1088
        if (status != ErrorCodes::OperationAborted)
4,268✔
1089
            handle_disconnect_wait(status); // Throws
6✔
1090
    });                                     // Throws
4,268✔
1091
    m_disconnect_delay_in_progress = true;
4,270✔
1092
}
4,270✔
1093

1094

1095
void Connection::handle_disconnect_wait(Status status)
1096
{
6✔
1097
    if (!status.is_ok()) {
6✔
1098
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1099
        throw Exception(status);
×
1100
    }
×
1101

1102
    m_disconnect_delay_in_progress = false;
6✔
1103

1104
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
6✔
1105
    if (m_num_active_unsuspended_sessions == 0) {
6✔
1106
        if (m_client.m_connection_linger_time > 0)
6✔
1107
            logger.detail("Linger time expired"); // Throws
×
1108
        voluntary_disconnect();                   // Throws
6✔
1109
        logger.info("Disconnected");              // Throws
6✔
1110
    }
6✔
1111
}
6✔
1112

1113

1114
void Connection::close_due_to_protocol_error(Status status)
1115
{
4✔
1116
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
4✔
1117
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
4✔
1118
    involuntary_disconnect(std::move(error_info),
4✔
1119
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
4✔
1120
}
4✔
1121

1122

1123
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1124
{
462✔
1125
    logger.info("Connection closed due to error: %1", status); // Throws
462✔
1126

250✔
1127
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
462✔
1128
}
462✔
1129

1130

1131
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1132
{
522✔
1133
    logger.info("Connection closed due to transient error: %1", status); // Throws
522✔
1134
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
522✔
1135
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
522✔
1136

274✔
1137
    involuntary_disconnect(std::move(error_info), reason); // Throw
522✔
1138
}
522✔
1139

1140

1141
// Close connection due to error discovered on the server-side, and then
1142
// reported to the client by way of a connection-level ERROR message.
1143
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1144
{
66✔
1145
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1146
                int(error_code)); // Throws
66✔
1147

32✔
1148
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
42✔
1149
                                      : ConnectionTerminationReason::server_said_try_again_later;
56✔
1150
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1151
                           reason); // Throws
66✔
1152
}
66✔
1153

1154

1155
void Connection::disconnect(const SessionErrorInfo& info)
1156
{
3,320✔
1157
    // Cancel connect timeout watchdog
1,622✔
1158
    m_connect_timer.reset();
3,320✔
1159

1,622✔
1160
    if (m_state == ConnectionState::connected) {
3,320✔
1161
        m_disconnect_time = monotonic_clock_now();
3,198✔
1162
        m_disconnect_has_occurred = true;
3,198✔
1163

1,562✔
1164
        // Sessions that are in the Deactivating state at this time can be
1,562✔
1165
        // immediately discarded, in part because they are no longer enlisted to
1,562✔
1166
        // send. Such sessions will be taken to the Deactivated state by
1,562✔
1167
        // Session::connection_lost(), and then they will be removed from
1,562✔
1168
        // `m_sessions`.
1,562✔
1169
        auto i = m_sessions.begin(), end = m_sessions.end();
3,198✔
1170
        while (i != end) {
6,788✔
1171
            // Prevent invalidation of the main iterator when erasing elements
1,846✔
1172
            auto j = i++;
3,590✔
1173
            Session& sess = *j->second;
3,590✔
1174
            sess.connection_lost(); // Throws
3,590✔
1175
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,590✔
1176
                m_sessions.erase(j);
1,576✔
1177
        }
3,590✔
1178
    }
3,198✔
1179

1,622✔
1180
    change_state_to_disconnected();
3,320✔
1181

1,622✔
1182
    m_ping_delay_in_progress = false;
3,320✔
1183
    m_waiting_for_pong = false;
3,320✔
1184
    m_send_ping = false;
3,320✔
1185
    m_minimize_next_ping_delay = false;
3,320✔
1186
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,320✔
1187
    m_ping_sent = false;
3,320✔
1188
    m_heartbeat_timer.reset();
3,320✔
1189
    m_previous_ping_rtt = 0;
3,320✔
1190

1,622✔
1191
    m_websocket_sentinel->destroyed = true;
3,320✔
1192
    m_websocket_sentinel.reset();
3,320✔
1193
    m_websocket.reset();
3,320✔
1194
    m_input_body_buffer.reset();
3,320✔
1195
    m_sending_session = nullptr;
3,320✔
1196
    m_sessions_enlisted_to_send.clear();
3,320✔
1197
    m_sending = false;
3,320✔
1198

1,622✔
1199
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,320✔
1200
    initiate_reconnect_wait();                                           // Throws
3,320✔
1201
}
3,320✔
1202

1203
bool Connection::is_flx_sync_connection() const noexcept
1204
{
105,646✔
1205
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
105,646✔
1206
}
105,646✔
1207

1208
void Connection::receive_pong(milliseconds_type timestamp)
1209
{
188✔
1210
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
188✔
1211

72✔
1212
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
188✔
1213
    if (REALM_UNLIKELY(!legal_at_this_time)) {
188✔
1214
        close_due_to_protocol_error(
×
1215
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1216
        return;
×
1217
    }
×
1218

72✔
1219
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
188✔
1220
        close_due_to_protocol_error(
×
1221
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1222
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1223
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1224
        return;
×
1225
    }
×
1226

72✔
1227
    milliseconds_type now = monotonic_clock_now();
188✔
1228
    milliseconds_type round_trip_time = now - timestamp;
188✔
1229
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
188✔
1230
    m_previous_ping_rtt = round_trip_time;
188✔
1231

72✔
1232
    // If this PONG message is a response to a PING mesage that was sent after
72✔
1233
    // the last invocation of cancel_reconnect_delay(), then the connection is
72✔
1234
    // still good, and we do not have to skip the next reconnect delay.
72✔
1235
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
188✔
1236
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
148✔
1237
        m_ping_after_scheduled_reset_of_reconnect_info = false;
148✔
1238
        m_reconnect_info.scheduled_reset = false;
148✔
1239
    }
148✔
1240

72✔
1241
    m_heartbeat_timer.reset();
188✔
1242
    m_waiting_for_pong = false;
188✔
1243

72✔
1244
    initiate_ping_delay(now); // Throws
188✔
1245

72✔
1246
    if (m_client.m_roundtrip_time_handler)
188✔
1247
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1248
}
188✔
1249

1250
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1251
{
69,254✔
1252
    if (session_ident == 0) {
69,254✔
1253
        return nullptr;
×
1254
    }
×
1255

35,912✔
1256
    auto* sess = get_session(session_ident);
69,254✔
1257
    if (REALM_LIKELY(sess)) {
69,254✔
1258
        return sess;
69,254✔
1259
    }
69,254✔
1260
    // Check the history to see if the message received was for a previous session
UNCOV
1261
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
×
1262
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1263
        close_due_to_protocol_error(
×
1264
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1265
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1266
                          session_ident)});
×
1267
    }
×
UNCOV
1268
    else {
×
UNCOV
1269
        logger.error("Received %1 message for closed session, session_ident = %2", message,
×
UNCOV
1270
                     session_ident); // Throws
×
UNCOV
1271
    }
×
UNCOV
1272
    return nullptr;
×
UNCOV
1273
}
×
1274

1275
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1276
{
944✔
1277
    Session* sess = nullptr;
944✔
1278
    if (session_ident != 0) {
944✔
1279
        sess = find_and_validate_session(session_ident, "ERROR");
874✔
1280
        if (REALM_UNLIKELY(!sess)) {
874✔
1281
            return;
×
1282
        }
×
1283
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
874✔
1284
            close_due_to_protocol_error(std::move(status)); // Throws
×
1285
            return;
×
1286
        }
×
1287

456✔
1288
        if (sess->m_state == Session::Deactivated) {
874✔
1289
            finish_session_deactivation(sess);
×
1290
        }
×
1291
        return;
874✔
1292
    }
874✔
1293

34✔
1294
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1295
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1296
                info.server_requests_action); // Throws
70✔
1297

34✔
1298
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1299
    if (REALM_LIKELY(known_error_code)) {
70✔
1300
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1301
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1302
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1303
            return;
66✔
1304
        }
66✔
1305
        close_due_to_protocol_error(
×
1306
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1307
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1308
                          info.raw_error_code)});
×
1309
    }
×
1310
    else {
4✔
1311
        close_due_to_protocol_error(
4✔
1312
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1313
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1314
    }
4✔
1315
}
70✔
1316

1317

1318
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1319
                                             session_ident_type session_ident)
1320
{
16✔
1321
    if (session_ident == 0) {
16✔
1322
        return close_due_to_protocol_error(
×
1323
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1324
    }
×
1325

8✔
1326
    if (!is_flx_sync_connection()) {
16✔
1327
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1328
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1329
    }
×
1330

8✔
1331
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1332
    if (REALM_UNLIKELY(!sess)) {
16✔
1333
        return;
×
1334
    }
×
1335

8✔
1336
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1337
        close_due_to_protocol_error(std::move(status));
×
1338
    }
×
1339
}
16✔
1340

1341

1342
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1343
{
3,300✔
1344
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,300✔
1345
    if (REALM_UNLIKELY(!sess)) {
3,300✔
1346
        return;
×
1347
    }
×
1348

1,534✔
1349
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,300✔
1350
        close_due_to_protocol_error(std::move(status)); // Throws
×
1351
}
3,300✔
1352

1353
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1354
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1355
                                          DownloadBatchState batch_state,
1356
                                          const ReceivedChangesets& received_changesets)
1357
{
44,744✔
1358
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
44,744✔
1359
    if (REALM_UNLIKELY(!sess)) {
44,744✔
1360
        return;
×
1361
    }
×
1362

24,088✔
1363
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
44,744✔
1364
                                                     received_changesets);
44,744✔
1365
        !status.is_ok()) {
44,744✔
1366
        close_due_to_protocol_error(std::move(status));
×
1367
    }
×
1368
}
44,744✔
1369

1370
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1371
{
15,976✔
1372
    Session* sess = find_and_validate_session(session_ident, "MARK");
15,976✔
1373
    if (REALM_UNLIKELY(!sess)) {
15,976✔
1374
        return;
×
1375
    }
×
1376

7,760✔
1377
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
15,976✔
1378
        close_due_to_protocol_error(std::move(status)); // Throws
×
1379
}
15,976✔
1380

1381

1382
void Connection::receive_unbound_message(session_ident_type session_ident)
1383
{
4,300✔
1384
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,300✔
1385
    if (REALM_UNLIKELY(!sess)) {
4,300✔
1386
        return;
×
1387
    }
×
1388

2,044✔
1389
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,300✔
1390
        close_due_to_protocol_error(std::move(status)); // Throws
×
1391
        return;
×
1392
    }
×
1393

2,044✔
1394
    if (sess->m_state == Session::Deactivated) {
4,300✔
1395
        finish_session_deactivation(sess);
4,300✔
1396
    }
4,300✔
1397
}
4,300✔
1398

1399

1400
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1401
                                               std::string_view body)
1402
{
44✔
1403
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
44✔
1404
    if (REALM_UNLIKELY(!sess)) {
44✔
1405
        return;
×
1406
    }
×
1407

22✔
1408
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
44✔
1409
        close_due_to_protocol_error(std::move(status));
×
1410
    }
×
1411
}
44✔
1412

1413

1414
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1415
                                            std::string_view message)
1416
{
5,944✔
1417
    std::string prefix;
5,944✔
1418
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,944✔
1419
        prefix = util::format("Server[%1]", m_appservices_coid);
5,944✔
1420
    }
5,944✔
1421
    else {
×
1422
        prefix = "Server";
×
1423
    }
×
1424

2,820✔
1425
    if (session_ident != 0) {
5,944✔
1426
        if (auto sess = get_session(session_ident)) {
4,088✔
1427
            sess->logger.log(level, "%1 log: %2", prefix, message);
4,088✔
1428
            return;
4,088✔
1429
        }
4,088✔
1430

1431
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1432
        return;
×
1433
    }
×
1434

908✔
1435
    logger.log(level, "%1 log: %2", prefix, message);
1,856✔
1436
}
1,856✔
1437

1438

1439
void Connection::receive_appservices_request_id(std::string_view coid)
1440
{
5,056✔
1441
    // Only set once per connection
2,472✔
1442
    if (!coid.empty() && m_appservices_coid.empty()) {
5,056✔
1443
        m_appservices_coid = coid;
2,230✔
1444
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,230✔
1445
    }
2,230✔
1446
}
5,056✔
1447

1448

1449
void Connection::handle_protocol_error(Status status)
1450
{
×
1451
    close_due_to_protocol_error(std::move(status));
×
1452
}
×
1453

1454

1455
// Sessions are guaranteed to be granted the opportunity to send a message in
1456
// the order that they enlist. Note that this is important to ensure
1457
// nonoverlapping communication with the server for consecutive sessions
1458
// associated with the same Realm file.
1459
//
1460
// CAUTION: The specified session may get destroyed before this function
1461
// returns, but only if its Session::send_message() puts it into the Deactivated
1462
// state.
1463
void Connection::enlist_to_send(Session* sess)
1464
{
153,532✔
1465
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
153,532✔
1466
    m_sessions_enlisted_to_send.push_back(sess); // Throws
153,532✔
1467
    if (!m_sending)
153,532✔
1468
        send_next_message(); // Throws
55,652✔
1469
}
153,532✔
1470

1471

1472
std::string Connection::get_active_appservices_connection_id()
1473
{
72✔
1474
    return m_appservices_coid;
72✔
1475
}
72✔
1476

1477
void Session::cancel_resumption_delay()
1478
{
4,102✔
1479
    REALM_ASSERT_EX(m_state == Active, m_state);
4,102✔
1480

2,196✔
1481
    if (!m_suspended)
4,102✔
1482
        return;
3,736✔
1483

204✔
1484
    m_suspended = false;
366✔
1485

204✔
1486
    logger.debug("Resumed"); // Throws
366✔
1487

204✔
1488
    if (unbind_process_complete())
366✔
1489
        initiate_rebind(); // Throws
362✔
1490

204✔
1491
    m_conn.one_more_active_unsuspended_session(); // Throws
366✔
1492

204✔
1493
    on_resumed(); // Throws
366✔
1494
}
366✔
1495

1496

1497
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1498
                                                 std::vector<ProtocolErrorInfo>* out)
1499
{
21,516✔
1500
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,516✔
1501
        return;
21,478✔
1502
    }
21,478✔
1503

20✔
1504
#ifdef REALM_DEBUG
38✔
1505
    REALM_ASSERT_DEBUG(
38✔
1506
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
38✔
1507
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
38✔
1508
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
38✔
1509
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
38✔
1510
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
38✔
1511
                       }));
38✔
1512
#endif
38✔
1513

20✔
1514
    while (!m_pending_compensating_write_errors.empty() &&
78✔
1515
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
60✔
1516
               changesets.back().version) {
40✔
1517
        auto& cur_error = m_pending_compensating_write_errors.front();
40✔
1518
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
40✔
1519
        out->push_back(std::move(cur_error));
40✔
1520
        m_pending_compensating_write_errors.pop_front();
40✔
1521
    }
40✔
1522
}
38✔
1523

1524

1525
void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& progress,
1526
                                   std::uint_fast64_t downloadable_bytes,
1527
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1528
                                   DownloadBatchState download_batch_state)
1529
{
42,678✔
1530
    auto& history = repl.get_history();
42,678✔
1531
    if (received_changesets.empty()) {
42,678✔
1532
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,138✔
1533
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1534
                                       "received empty download message that was not the last in batch",
×
1535
                                       ProtocolError::bad_progress);
×
1536
        }
×
1537
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
21,138✔
1538
        return;
21,138✔
1539
    }
21,138✔
1540

11,998✔
1541
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,540✔
1542
    auto transact = get_db()->start_read();
21,540✔
1543
    history.integrate_server_changesets(
21,540✔
1544
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,540✔
1545
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,528✔
1546
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,516✔
1547
        }); // Throws
21,516✔
1548
    if (received_changesets.size() == 1) {
21,540✔
1549
        logger.debug("1 remote changeset integrated, producing client version %1",
14,694✔
1550
                     version_info.sync_version.version); // Throws
14,694✔
1551
    }
14,694✔
1552
    else {
6,846✔
1553
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,846✔
1554
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,846✔
1555
    }
6,846✔
1556

11,998✔
1557
    for (const auto& pending_error : pending_compensating_write_errors) {
12,018✔
1558
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
40✔
1559
                    pending_error.compensating_write_rejected_client_version,
40✔
1560
                    *pending_error.compensating_write_server_version, pending_error.message);
40✔
1561
        try {
40✔
1562
            on_connection_state_changed(
40✔
1563
                m_conn.get_state(),
40✔
1564
                SessionErrorInfo{pending_error,
40✔
1565
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
40✔
1566
                                                          pending_error.message)});
40✔
1567
        }
40✔
1568
        catch (...) {
20✔
1569
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1570
        }
×
1571
    }
40✔
1572
}
21,540✔
1573

1574

1575
void Session::on_integration_failure(const IntegrationException& error)
1576
{
32✔
1577
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1578
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1579
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1580

16✔
1581
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1582
    m_error_to_send = true;
32✔
1583

16✔
1584
    // Surface the error to the user otherwise is lost.
16✔
1585
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1586

16✔
1587
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1588
    // message cannot have been sent unless an ERROR message was received.
16✔
1589
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1590
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1591
        ensure_enlisted_to_send(); // Throws
32✔
1592
    }
32✔
1593
}
32✔
1594

1595
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1596
{
44,062✔
1597
    REALM_ASSERT_EX(m_state == Active, m_state);
44,062✔
1598
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
44,062✔
1599
    m_download_progress = progress.download;
44,062✔
1600
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
44,062✔
1601
    m_progress = progress;
44,062✔
1602
    if (upload_progressed) {
44,062✔
1603
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
31,712✔
1604
            if (progress.upload.client_version > m_upload_progress.client_version)
13,192✔
1605
                m_upload_progress = progress.upload;
1,000✔
1606
            m_last_version_selected_for_upload = progress.upload.client_version;
13,192✔
1607
        }
13,192✔
1608

16,430✔
1609
        check_for_upload_completion();
31,712✔
1610
    }
31,712✔
1611

23,686✔
1612
    do_recognize_sync_version(client_version); // Allows upload process to resume
44,062✔
1613
    check_for_download_completion();           // Throws
44,062✔
1614

23,686✔
1615
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,686✔
1616
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
44,062✔
1617
        auto& flx_subscription_store = *get_flx_subscription_store();
2,446✔
1618
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,446✔
1619
    }
2,446✔
1620

23,686✔
1621
    // Since the deactivation process has not been initiated, the UNBIND
23,686✔
1622
    // message cannot have been sent unless an ERROR message was received.
23,686✔
1623
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
44,062✔
1624
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
44,062✔
1625
        ensure_enlisted_to_send(); // Throws
44,060✔
1626
    }
44,060✔
1627
}
44,062✔
1628

1629

1630
Session::~Session()
1631
{
9,620✔
1632
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,636✔
1633
}
9,620✔
1634

1635

1636
std::string Session::make_logger_prefix(session_ident_type ident)
1637
{
9,624✔
1638
    std::ostringstream out;
9,624✔
1639
    out.imbue(std::locale::classic());
9,624✔
1640
    out << "Session[" << ident << "]: "; // Throws
9,624✔
1641
    return out.str();                    // Throws
9,624✔
1642
}
9,624✔
1643

1644

1645
void Session::activate()
1646
{
9,624✔
1647
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,624✔
1648

4,636✔
1649
    logger.debug("Activating"); // Throws
9,624✔
1650

4,636✔
1651
    bool has_pending_client_reset = false;
9,624✔
1652
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,624✔
1653
        // The reason we need a mutable reference from get_client_reset_config() is because we
4,636✔
1654
        // don't want the session to keep a strong reference to the client_reset_config->fresh_copy
4,636✔
1655
        // DB. If it did, then the fresh DB would stay alive for the duration of this sync session
4,636✔
1656
        // and we want to clean it up once the reset is finished. Additionally, the fresh copy will
4,636✔
1657
        // be set to a new copy on every reset so there is no reason to keep a reference to it.
4,636✔
1658
        // The modification to the client reset config happens via std::move(client_reset_config->fresh_copy).
4,636✔
1659
        // If the client reset config were a `const &` then this std::move would create another strong
4,636✔
1660
        // reference which we don't want to happen.
4,636✔
1661
        util::Optional<ClientReset>& client_reset_config = get_client_reset_config();
9,624✔
1662

4,636✔
1663
        bool file_exists = util::File::exists(get_realm_path());
9,624✔
1664

4,636✔
1665
        logger.info("client_reset_config = %1, Realm exists = %2, "
9,624✔
1666
                    "client reset = %3",
9,624✔
1667
                    client_reset_config ? "true" : "false", file_exists ? "true" : "false",
9,618✔
1668
                    (client_reset_config && file_exists) ? "true" : "false"); // Throws
9,624✔
1669
        if (client_reset_config && !m_client_reset_operation) {
9,624✔
1670
            m_client_reset_operation = std::make_unique<_impl::ClientResetOperation>(
336✔
1671
                logger, get_db(), std::move(client_reset_config->fresh_copy), client_reset_config->mode,
336✔
1672
                std::move(client_reset_config->notify_before_client_reset),
336✔
1673
                std::move(client_reset_config->notify_after_client_reset),
336✔
1674
                client_reset_config->recovery_is_allowed); // Throws
336✔
1675
        }
336✔
1676

4,636✔
1677
        if (!m_client_reset_operation) {
9,624✔
1678
            const ClientReplication& repl = access_realm(); // Throws
9,286✔
1679
            repl.get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,286✔
1680
                                          &has_pending_client_reset); // Throws
9,286✔
1681
        }
9,286✔
1682
    }
9,624✔
1683
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,624✔
1684
                 m_client_file_ident.salt); // Throws
9,624✔
1685
    m_upload_progress = m_progress.upload;
9,624✔
1686
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,624✔
1687
    m_download_progress = m_progress.download;
9,624✔
1688
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,624✔
1689

4,636✔
1690
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,624✔
1691
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,624✔
1692
    logger.debug("progress_download_client_version = %1",
9,624✔
1693
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,624✔
1694
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,624✔
1695
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,624✔
1696

4,636✔
1697
    reset_protocol_state();
9,624✔
1698
    m_state = Active;
9,624✔
1699

4,636✔
1700
    REALM_ASSERT(!m_suspended);
9,624✔
1701
    m_conn.one_more_active_unsuspended_session(); // Throws
9,624✔
1702

4,636✔
1703
    try {
9,624✔
1704
        process_pending_flx_bootstrap();
9,624✔
1705
    }
9,624✔
1706
    catch (const IntegrationException& error) {
4,638✔
1707
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1708
        m_suspended = true;
4✔
1709
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1710
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
4✔
1711
    }
4✔
1712

4,636✔
1713
    if (has_pending_client_reset) {
9,624✔
1714
        handle_pending_client_reset_acknowledgement();
22✔
1715
    }
22✔
1716
}
9,622✔
1717

1718

1719
// The caller (Connection) must discard the session if the session has become
1720
// deactivated upon return.
1721
void Session::initiate_deactivation()
1722
{
9,622✔
1723
    REALM_ASSERT_EX(m_state == Active, m_state);
9,622✔
1724

4,636✔
1725
    logger.debug("Initiating deactivation"); // Throws
9,622✔
1726

4,636✔
1727
    m_state = Deactivating;
9,622✔
1728

4,636✔
1729
    if (!m_suspended)
9,622✔
1730
        m_conn.one_less_active_unsuspended_session(); // Throws
9,094✔
1731

4,636✔
1732
    if (m_enlisted_to_send) {
9,622✔
1733
        REALM_ASSERT(!unbind_process_complete());
5,122✔
1734
        return;
5,122✔
1735
    }
5,122✔
1736

2,252✔
1737
    // Deactivate immediately if the BIND message has not yet been sent and the
2,252✔
1738
    // session is not enlisted to send, or if the unbinding process has already
2,252✔
1739
    // completed.
2,252✔
1740
    if (!m_bind_message_sent || unbind_process_complete()) {
4,500✔
1741
        complete_deactivation(); // Throws
1,002✔
1742
        // Life cycle state is now Deactivated
454✔
1743
        return;
1,002✔
1744
    }
1,002✔
1745

1,798✔
1746
    // Ready to send the UNBIND message, if it has not already been sent
1,798✔
1747
    if (!m_unbind_message_sent) {
3,498✔
1748
        enlist_to_send(); // Throws
3,350✔
1749
        return;
3,350✔
1750
    }
3,350✔
1751
}
3,498✔
1752

1753

1754
void Session::complete_deactivation()
1755
{
9,622✔
1756
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,622✔
1757
    m_state = Deactivated;
9,622✔
1758

4,636✔
1759
    logger.debug("Deactivation completed"); // Throws
9,622✔
1760
}
9,622✔
1761

1762

1763
// Called by the associated Connection object when this session is granted an
1764
// opportunity to send a message.
1765
//
1766
// The caller (Connection) must discard the session if the session has become
1767
// deactivated upon return.
1768
void Session::send_message()
1769
{
151,846✔
1770
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
151,846✔
1771
    REALM_ASSERT(m_enlisted_to_send);
151,846✔
1772
    m_enlisted_to_send = false;
151,846✔
1773
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
151,846✔
1774
        // Deactivation has been initiated. If the UNBIND message has not been
4,308✔
1775
        // sent yet, there is no point in sending it. Instead, we can let the
4,308✔
1776
        // deactivation process complete.
4,308✔
1777
        if (!m_bind_message_sent) {
8,906✔
1778
            return complete_deactivation(); // Throws
2,664✔
1779
            // Life cycle state is now Deactivated
1,320✔
1780
        }
2,664✔
1781

2,988✔
1782
        // Session life cycle state is Deactivating or the unbinding process has
2,988✔
1783
        // been initiated by a session specific ERROR message
2,988✔
1784
        if (!m_unbind_message_sent)
6,242✔
1785
            send_unbind_message(); // Throws
6,242✔
1786
        return;
6,242✔
1787
    }
6,242✔
1788

70,500✔
1789
    // Session life cycle state is Active and the unbinding process has
70,500✔
1790
    // not been initiated
70,500✔
1791
    REALM_ASSERT(!m_unbind_message_sent);
142,940✔
1792

70,500✔
1793
    if (!m_bind_message_sent)
142,940✔
1794
        return send_bind_message(); // Throws
8,612✔
1795

66,242✔
1796
    if (!m_ident_message_sent) {
134,328✔
1797
        if (have_client_file_ident())
6,886✔
1798
            send_ident_message(); // Throws
6,886✔
1799
        return;
6,886✔
1800
    }
6,886✔
1801

62,966✔
1802
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
127,442✔
1803
                                                      [](const PendingTestCommand& command) {
63,038✔
1804
                                                          return command.pending;
144✔
1805
                                                      });
144✔
1806
    if (has_pending_test_command) {
127,442✔
1807
        return send_test_command_message();
44✔
1808
    }
44✔
1809

62,944✔
1810
    if (m_error_to_send)
127,398✔
1811
        return send_json_error_message(); // Throws
32✔
1812

62,928✔
1813
    // Stop sending upload, mark and query messages when the client detects an error.
62,928✔
1814
    if (m_client_error) {
127,366✔
1815
        return;
16✔
1816
    }
16✔
1817

62,920✔
1818
    if (m_target_download_mark > m_last_download_mark_sent)
127,350✔
1819
        return send_mark_message(); // Throws
16,536✔
1820

54,872✔
1821
    auto is_upload_allowed = [&]() -> bool {
110,832✔
1822
        if (!m_is_flx_sync_session) {
110,832✔
1823
            return true;
99,732✔
1824
        }
99,732✔
1825

5,736✔
1826
        auto migration_store = get_migration_store();
11,100✔
1827
        if (!migration_store) {
11,100✔
1828
            return true;
×
1829
        }
×
1830

5,736✔
1831
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
11,100✔
1832
        if (!sentinel_query_version) {
11,100✔
1833
            return true;
11,072✔
1834
        }
11,072✔
1835

14✔
1836
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
14✔
1837
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1838
    };
28✔
1839

54,872✔
1840
    if (!is_upload_allowed()) {
110,814✔
1841
        return;
16✔
1842
    }
16✔
1843

54,864✔
1844
    auto check_pending_flx_version = [&]() -> bool {
110,818✔
1845
        if (!m_is_flx_sync_session) {
110,818✔
1846
            return false;
99,732✔
1847
        }
99,732✔
1848

5,730✔
1849
        if (!m_allow_upload) {
11,086✔
1850
            return false;
2,200✔
1851
        }
2,200✔
1852

4,622✔
1853
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
8,886✔
1854

4,622✔
1855
        if (!m_pending_flx_sub_set) {
8,886✔
1856
            return false;
7,382✔
1857
        }
7,382✔
1858

754✔
1859
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,504✔
1860
    };
1,504✔
1861

54,864✔
1862
    if (check_pending_flx_version()) {
110,798✔
1863
        return send_query_change_message(); // throws
850✔
1864
    }
850✔
1865

54,438✔
1866
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
109,948✔
1867
        return send_upload_message(); // Throws
54,518✔
1868
    }
54,518✔
1869
}
109,948✔
1870

1871

1872
void Session::send_bind_message()
1873
{
8,612✔
1874
    REALM_ASSERT_EX(m_state == Active, m_state);
8,612✔
1875

4,258✔
1876
    session_ident_type session_ident = m_ident;
8,612✔
1877
    bool need_client_file_ident = !have_client_file_ident();
8,612✔
1878
    const bool is_subserver = false;
8,612✔
1879

4,258✔
1880

4,258✔
1881
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,612✔
1882
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,612✔
1883
    OutputBuffer& out = m_conn.get_output_buffer();
8,612✔
1884
    // Discard the token since it's ignored by the server.
4,258✔
1885
    std::string empty_access_token;
8,612✔
1886
    if (m_is_flx_sync_session) {
8,612✔
1887
        nlohmann::json bind_json_data;
1,350✔
1888
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,350✔
1889
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1890
        }
60✔
1891
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,350✔
1892
        if (logger.would_log(util::Logger::Level::debug)) {
1,350✔
1893
            std::string json_data_dump;
1,350✔
1894
            if (!bind_json_data.empty()) {
1,350✔
1895
                json_data_dump = bind_json_data.dump();
1,350✔
1896
            }
1,350✔
1897
            logger.debug(
1,350✔
1898
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,350✔
1899
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,350✔
1900
        }
1,350✔
1901
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,350✔
1902
                                       need_client_file_ident, is_subserver); // Throws
1,350✔
1903
    }
1,350✔
1904
    else {
7,262✔
1905
        std::string server_path = get_virt_path();
7,262✔
1906
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,262✔
1907
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,262✔
1908
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,262✔
1909
                                       need_client_file_ident, is_subserver); // Throws
7,262✔
1910
    }
7,262✔
1911
    m_conn.initiate_write_message(out, this); // Throws
8,612✔
1912

4,258✔
1913
    m_bind_message_sent = true;
8,612✔
1914

4,258✔
1915
    // Ready to send the IDENT message if the file identifier pair is already
4,258✔
1916
    // available.
4,258✔
1917
    if (!need_client_file_ident)
8,612✔
1918
        enlist_to_send(); // Throws
3,816✔
1919
}
8,612✔
1920

1921

1922
void Session::send_ident_message()
1923
{
6,886✔
1924
    REALM_ASSERT_EX(m_state == Active, m_state);
6,886✔
1925
    REALM_ASSERT(m_bind_message_sent);
6,886✔
1926
    REALM_ASSERT(!m_unbind_message_sent);
6,886✔
1927
    REALM_ASSERT(have_client_file_ident());
6,886✔
1928

3,276✔
1929

3,276✔
1930
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,886✔
1931
    OutputBuffer& out = m_conn.get_output_buffer();
6,886✔
1932
    session_ident_type session_ident = m_ident;
6,886✔
1933

3,276✔
1934
    if (m_is_flx_sync_session) {
6,886✔
1935
        const auto active_query_set = get_flx_subscription_store()->get_active();
974✔
1936
        const auto active_query_body = active_query_set.to_ext_json();
974✔
1937
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
974✔
1938
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
974✔
1939
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
974✔
1940
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
974✔
1941
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
974✔
1942
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
974✔
1943
                     active_query_body); // Throws
974✔
1944
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
974✔
1945
                                        active_query_set.version(), active_query_body); // Throws
974✔
1946
        m_last_sent_flx_query_version = active_query_set.version();
974✔
1947
    }
974✔
1948
    else {
5,912✔
1949
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,912✔
1950
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,912✔
1951
                     "latest_server_version_salt=%6)",
5,912✔
1952
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,912✔
1953
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,912✔
1954
                     m_progress.latest_server_version.salt);                                  // Throws
5,912✔
1955
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,912✔
1956
    }
5,912✔
1957
    m_conn.initiate_write_message(out, this); // Throws
6,886✔
1958

3,276✔
1959
    m_ident_message_sent = true;
6,886✔
1960

3,276✔
1961
    // Other messages may be waiting to be sent
3,276✔
1962
    enlist_to_send(); // Throws
6,886✔
1963
}
6,886✔
1964

1965
void Session::send_query_change_message()
1966
{
850✔
1967
    REALM_ASSERT_EX(m_state == Active, m_state);
850✔
1968
    REALM_ASSERT(m_ident_message_sent);
850✔
1969
    REALM_ASSERT(!m_unbind_message_sent);
850✔
1970
    REALM_ASSERT(m_pending_flx_sub_set);
850✔
1971
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
850✔
1972

426✔
1973
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
850✔
1974
        return;
×
1975
    }
×
1976

426✔
1977
    auto sub_store = get_flx_subscription_store();
850✔
1978
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
850✔
1979
    auto latest_queries = latest_sub_set.to_ext_json();
850✔
1980
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
850✔
1981
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
850✔
1982

426✔
1983
    OutputBuffer& out = m_conn.get_output_buffer();
850✔
1984
    session_ident_type session_ident = get_ident();
850✔
1985
    ClientProtocol& protocol = m_conn.get_client_protocol();
850✔
1986
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
850✔
1987
    m_conn.initiate_write_message(out, this);
850✔
1988

426✔
1989
    m_last_sent_flx_query_version = latest_sub_set.version();
850✔
1990

426✔
1991
    request_download_completion_notification();
850✔
1992
}
850✔
1993

1994
void Session::send_upload_message()
1995
{
54,516✔
1996
    REALM_ASSERT_EX(m_state == Active, m_state);
54,516✔
1997
    REALM_ASSERT(m_ident_message_sent);
54,516✔
1998
    REALM_ASSERT(!m_unbind_message_sent);
54,516✔
1999

27,428✔
2000
    if (REALM_UNLIKELY(get_client().is_dry_run()))
54,516✔
2001
        return;
27,428✔
2002

27,428✔
2003
    version_type target_upload_version = m_last_version_available;
54,516✔
2004
    if (m_pending_flx_sub_set) {
54,516✔
2005
        REALM_ASSERT(m_is_flx_sync_session);
654✔
2006
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
654✔
2007
    }
654✔
2008

27,428✔
2009
    const ClientReplication& repl = access_realm(); // Throws
54,516✔
2010

27,428✔
2011
    std::vector<UploadChangeset> uploadable_changesets;
54,516✔
2012
    version_type locked_server_version = 0;
54,516✔
2013
    repl.get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
54,516✔
2014
                                                  locked_server_version); // Throws
54,516✔
2015

27,428✔
2016
    if (uploadable_changesets.empty()) {
54,516✔
2017
        // Nothing more to upload right now
13,700✔
2018
        check_for_upload_completion(); // Throws
28,210✔
2019
        // If we need to limit upload up to some version other than the last client version available and there are no
13,700✔
2020
        // changes to upload, then there is no need to send an empty message.
13,700✔
2021
        if (m_pending_flx_sub_set) {
28,210✔
2022
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
208✔
2023
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
208✔
2024
            // Other messages may be waiting to be sent
104✔
2025
            return enlist_to_send(); // Throws
208✔
2026
        }
208✔
2027
    }
26,306✔
2028
    else {
26,306✔
2029
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
26,306✔
2030
    }
26,306✔
2031

27,428✔
2032
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
54,412✔
2033
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
446✔
2034
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
446✔
2035
    }
446✔
2036

27,324✔
2037
    version_type progress_client_version = m_upload_progress.client_version;
54,308✔
2038
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
54,308✔
2039

27,324✔
2040
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
54,308✔
2041
                 "locked_server_version=%3, num_changesets=%4)",
54,308✔
2042
                 progress_client_version, progress_server_version, locked_server_version,
54,308✔
2043
                 uploadable_changesets.size()); // Throws
54,308✔
2044

27,324✔
2045
    ClientProtocol& protocol = m_conn.get_client_protocol();
54,308✔
2046
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
54,308✔
2047

27,324✔
2048
    for (const UploadChangeset& uc : uploadable_changesets) {
48,850✔
2049
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
41,804✔
2050
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
41,804✔
2051
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
41,804✔
2052
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
41,804✔
2053
        if (logger.would_log(util::Logger::Level::trace)) {
41,804✔
2054
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2055
            if (changeset_data.size() < 1024) {
×
2056
                logger.trace("Changeset: %1",
×
2057
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2058
            }
×
2059
            else {
×
2060
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2061
                             protocol.compressed_hex_dump(changeset_data));
×
2062
            }
×
2063

2064
#if REALM_DEBUG
×
2065
            ChunkedBinaryInputStream in{changeset_data};
×
2066
            Changeset log;
×
2067
            try {
×
2068
                parse_changeset(in, log);
×
2069
                std::stringstream ss;
×
2070
                log.print(ss);
×
2071
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2072
            }
×
2073
            catch (const BadChangesetError& err) {
×
2074
                logger.error("Unable to parse changeset: %1", err.what());
×
2075
            }
×
2076
#endif
×
2077
        }
×
2078

20,278✔
2079
#if 0 // Upload log compaction is currently not implemented
2080
        if (!get_client().m_disable_upload_compaction) {
2081
            ChangesetEncoder::Buffer encode_buffer;
2082

2083
            {
2084
                // Upload compaction only takes place within single changesets to
2085
                // avoid another client seeing inconsistent snapshots.
2086
                ChunkedBinaryInputStream stream{uc.changeset};
2087
                Changeset changeset;
2088
                parse_changeset(stream, changeset); // Throws
2089
                // FIXME: What is the point of setting these? How can compaction care about them?
2090
                changeset.version = uc.progress.client_version;
2091
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2092
                changeset.origin_timestamp = uc.origin_timestamp;
2093
                changeset.origin_file_ident = uc.origin_file_ident;
2094

2095
                compact_changesets(&changeset, 1);
2096
                encode_changeset(changeset, encode_buffer);
2097

2098
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2099
                             encode_buffer.size()); // Throws
2100
            }
2101

2102
            upload_message_builder.add_changeset(
2103
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2104
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2105
        }
2106
        else
2107
#endif
2108
        {
41,804✔
2109
            upload_message_builder.add_changeset(uc.progress.client_version,
41,804✔
2110
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
41,804✔
2111
                                                 uc.origin_file_ident,
41,804✔
2112
                                                 uc.changeset); // Throws
41,804✔
2113
        }
41,804✔
2114
    }
41,804✔
2115

27,324✔
2116
    int protocol_version = m_conn.get_negotiated_protocol_version();
54,308✔
2117
    OutputBuffer& out = m_conn.get_output_buffer();
54,308✔
2118
    session_ident_type session_ident = get_ident();
54,308✔
2119
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
54,308✔
2120
                                               progress_server_version,
54,308✔
2121
                                               locked_server_version); // Throws
54,308✔
2122
    m_conn.initiate_write_message(out, this);                          // Throws
54,308✔
2123

27,324✔
2124
    // Other messages may be waiting to be sent
27,324✔
2125
    enlist_to_send(); // Throws
54,308✔
2126
}
54,308✔
2127

2128

2129
void Session::send_mark_message()
2130
{
16,536✔
2131
    REALM_ASSERT_EX(m_state == Active, m_state);
16,536✔
2132
    REALM_ASSERT(m_ident_message_sent);
16,536✔
2133
    REALM_ASSERT(!m_unbind_message_sent);
16,536✔
2134
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
16,536✔
2135

8,048✔
2136
    request_ident_type request_ident = m_target_download_mark;
16,536✔
2137
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
16,536✔
2138

8,048✔
2139
    ClientProtocol& protocol = m_conn.get_client_protocol();
16,536✔
2140
    OutputBuffer& out = m_conn.get_output_buffer();
16,536✔
2141
    session_ident_type session_ident = get_ident();
16,536✔
2142
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
16,536✔
2143
    m_conn.initiate_write_message(out, this);                      // Throws
16,536✔
2144

8,048✔
2145
    m_last_download_mark_sent = request_ident;
16,536✔
2146

8,048✔
2147
    // Other messages may be waiting to be sent
8,048✔
2148
    enlist_to_send(); // Throws
16,536✔
2149
}
16,536✔
2150

2151

2152
void Session::send_unbind_message()
2153
{
6,242✔
2154
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,242✔
2155
    REALM_ASSERT(m_bind_message_sent);
6,242✔
2156
    REALM_ASSERT(!m_unbind_message_sent);
6,242✔
2157

2,988✔
2158
    logger.debug("Sending: UNBIND"); // Throws
6,242✔
2159

2,988✔
2160
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,242✔
2161
    OutputBuffer& out = m_conn.get_output_buffer();
6,242✔
2162
    session_ident_type session_ident = get_ident();
6,242✔
2163
    protocol.make_unbind_message(out, session_ident); // Throws
6,242✔
2164
    m_conn.initiate_write_message(out, this);         // Throws
6,242✔
2165

2,988✔
2166
    m_unbind_message_sent = true;
6,242✔
2167
}
6,242✔
2168

2169

2170
void Session::send_json_error_message()
2171
{
32✔
2172
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
2173
    REALM_ASSERT(m_ident_message_sent);
32✔
2174
    REALM_ASSERT(!m_unbind_message_sent);
32✔
2175
    REALM_ASSERT(m_error_to_send);
32✔
2176
    REALM_ASSERT(m_client_error);
32✔
2177

16✔
2178
    ClientProtocol& protocol = m_conn.get_client_protocol();
32✔
2179
    OutputBuffer& out = m_conn.get_output_buffer();
32✔
2180
    session_ident_type session_ident = get_ident();
32✔
2181
    auto protocol_error = m_client_error->error_for_server;
32✔
2182

16✔
2183
    auto message = util::format("%1", m_client_error->to_status());
32✔
2184
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
32✔
2185
                session_ident); // Throws
32✔
2186

16✔
2187
    nlohmann::json error_body_json;
32✔
2188
    error_body_json["message"] = std::move(message);
32✔
2189
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
32✔
2190
                                     error_body_json.dump()); // Throws
32✔
2191
    m_conn.initiate_write_message(out, this);                 // Throws
32✔
2192

16✔
2193
    m_error_to_send = false;
32✔
2194
    enlist_to_send(); // Throws
32✔
2195
}
32✔
2196

2197

2198
void Session::send_test_command_message()
2199
{
44✔
2200
    REALM_ASSERT_EX(m_state == Active, m_state);
44✔
2201

22✔
2202
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2203
                           [](const PendingTestCommand& command) {
44✔
2204
                               return command.pending;
44✔
2205
                           });
44✔
2206
    REALM_ASSERT(it != m_pending_test_commands.end());
44✔
2207

22✔
2208
    ClientProtocol& protocol = m_conn.get_client_protocol();
44✔
2209
    OutputBuffer& out = m_conn.get_output_buffer();
44✔
2210
    auto session_ident = get_ident();
44✔
2211

22✔
2212
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
44✔
2213
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
44✔
2214

22✔
2215
    m_conn.initiate_write_message(out, this); // Throws;
44✔
2216
    it->pending = false;
44✔
2217

22✔
2218
    enlist_to_send();
44✔
2219
}
44✔
2220

2221

2222
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2223
{
3,300✔
2224
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,300✔
2225
                 client_file_ident.salt); // Throws
3,300✔
2226

1,534✔
2227
    // Ignore the message if the deactivation process has been initiated,
1,534✔
2228
    // because in that case, the associated Realm and SessionWrapper must
1,534✔
2229
    // not be accessed any longer.
1,534✔
2230
    if (m_state != Active)
3,300✔
2231
        return Status::OK(); // Success
120✔
2232

1,508✔
2233
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,180✔
2234
                               !m_unbound_message_received);
3,180✔
2235
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,180✔
2236
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2237
    }
×
2238
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,180✔
2239
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2240
    }
×
2241
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,180✔
2242
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2243
    }
×
2244

1,508✔
2245
    m_client_file_ident = client_file_ident;
3,180✔
2246

1,508✔
2247
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,180✔
2248
        // Ready to send the IDENT message
2249
        ensure_enlisted_to_send(); // Throws
×
2250
        return Status::OK();       // Success
×
2251
    }
×
2252

1,508✔
2253
    // access before the client reset (if applicable) because
1,508✔
2254
    // the reset can take a while and the sync session might have died
1,508✔
2255
    // by the time the reset finishes.
1,508✔
2256
    ClientReplication& repl = access_realm(); // Throws
3,180✔
2257

1,508✔
2258
    auto client_reset_if_needed = [&]() -> bool {
3,180✔
2259
        if (!m_client_reset_operation) {
3,180✔
2260
            return false;
2,844✔
2261
        }
2,844✔
2262

168✔
2263
        // ClientResetOperation::finalize() will return true only if the operation actually did
168✔
2264
        // a client reset. It may choose not to do a reset if the local Realm does not exist
168✔
2265
        // at this point (in that case there is nothing to reset). But in any case, we must
168✔
2266
        // clean up m_client_reset_operation at this point as sync should be able to continue from
168✔
2267
        // this point forward.
168✔
2268
        auto client_reset_operation = std::move(m_client_reset_operation);
336✔
2269
        util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
284✔
2270
            this->on_flx_sync_version_complete(version);
232✔
2271
        };
232✔
2272
        if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
336✔
2273
                                              std::move(on_flx_subscription_complete))) {
168✔
2274
            return false;
×
2275
        }
×
2276

168✔
2277
        // The fresh Realm has been used to reset the state
168✔
2278
        logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2279

168✔
2280
        SaltedFileIdent client_file_ident;
336✔
2281
        bool has_pending_client_reset = false;
336✔
2282
        repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress,
336✔
2283
                                      &has_pending_client_reset); // Throws
336✔
2284
        REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
336✔
2285
        REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
336✔
2286
        REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2287
                        m_progress.download.last_integrated_client_version);
336✔
2288
        REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
336✔
2289
        REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
336✔
2290
                        m_progress.upload.last_integrated_server_version);
336✔
2291
        logger.trace("last_version_available  = %1", m_last_version_available); // Throws
336✔
2292

168✔
2293
        m_upload_progress = m_progress.upload;
336✔
2294
        m_download_progress = m_progress.download;
336✔
2295
        // In recovery mode, there may be new changesets to upload and nothing left to download.
168✔
2296
        // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
168✔
2297
        // For both, we want to allow uploads again without needing external changes to download first.
168✔
2298
        m_allow_upload = true;
336✔
2299
        REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
336✔
2300

168✔
2301
        if (has_pending_client_reset) {
336✔
2302
            handle_pending_client_reset_acknowledgement();
268✔
2303
        }
268✔
2304

168✔
2305
        update_subscription_version_info();
336✔
2306

168✔
2307
        // If a migration or rollback is in progress, mark it complete when client reset is completed.
168✔
2308
        if (auto migration_store = get_migration_store()) {
336✔
2309
            migration_store->complete_migration_or_rollback();
240✔
2310
        }
240✔
2311

168✔
2312
        return true;
336✔
2313
    };
336✔
2314
    // if a client reset happens, it will take care of setting the file ident
1,508✔
2315
    // and if not, we do it here
1,508✔
2316
    bool did_client_reset = false;
3,180✔
2317
    try {
3,180✔
2318
        did_client_reset = client_reset_if_needed();
3,180✔
2319
    }
3,180✔
2320
    catch (const std::exception& e) {
1,542✔
2321
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
68✔
2322
        logger.error(err_msg.c_str());
68✔
2323
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
68✔
2324
        suspend(err_info);
68✔
2325
        return Status::OK();
68✔
2326
    }
68✔
2327
    if (!did_client_reset) {
3,112✔
2328
        repl.get_history().set_client_file_ident(client_file_ident,
2,844✔
2329
                                                 m_fix_up_object_ids); // Throws
2,844✔
2330
        m_progress.download.last_integrated_client_version = 0;
2,844✔
2331
        m_progress.upload.client_version = 0;
2,844✔
2332
        m_last_version_selected_for_upload = 0;
2,844✔
2333
    }
2,844✔
2334

1,474✔
2335
    // Ready to send the IDENT message
1,474✔
2336
    ensure_enlisted_to_send(); // Throws
3,112✔
2337
    return Status::OK();       // Success
3,112✔
2338
}
3,112✔
2339

2340
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2341
                                         DownloadBatchState batch_state, int64_t query_version,
2342
                                         const ReceivedChangesets& received_changesets)
2343
{
44,744✔
2344
    REALM_ASSERT_EX(query_version >= 0, query_version);
44,744✔
2345
    // Ignore the message if the deactivation process has been initiated,
24,088✔
2346
    // because in that case, the associated Realm and SessionWrapper must
24,088✔
2347
    // not be accessed any longer.
24,088✔
2348
    if (m_state != Active)
44,744✔
2349
        return Status::OK();
472✔
2350

23,790✔
2351
    if (is_steady_state_download_message(batch_state, query_version)) {
44,272✔
2352
        batch_state = DownloadBatchState::SteadyState;
42,682✔
2353
    }
42,682✔
2354

23,790✔
2355
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
44,272✔
2356
                 "latest_server_version=%3, latest_server_version_salt=%4, "
44,272✔
2357
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
44,272✔
2358
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
44,272✔
2359
                 progress.download.server_version, progress.download.last_integrated_client_version,
44,272✔
2360
                 progress.latest_server_version.version, progress.latest_server_version.salt,
44,272✔
2361
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
44,272✔
2362
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
44,272✔
2363

23,790✔
2364
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,790✔
2365
    // changeset over and over again.
23,790✔
2366
    if (m_client_error) {
44,272✔
2367
        logger.debug("Ignoring download message because the client detected an integration error");
×
2368
        return Status::OK();
×
2369
    }
×
2370

23,790✔
2371
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
44,272✔
2372
    if (REALM_UNLIKELY(!legal_at_this_time)) {
44,272✔
2373
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2374
    }
×
2375
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
44,272✔
2376
        logger.error("Bad sync progress received (%1)", status);
×
2377
        return status;
×
2378
    }
×
2379

23,790✔
2380
    version_type server_version = m_progress.download.server_version;
44,272✔
2381
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
44,272✔
2382
    for (const RemoteChangeset& changeset : received_changesets) {
45,112✔
2383
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,466✔
2384
        // version must be increasing, but can stay the same during bootstraps.
22,466✔
2385
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,294✔
2386
                                                         : (changeset.remote_version > server_version);
42,960✔
2387
        // Each server version cannot be greater than the one in the header of the download message.
22,466✔
2388
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,788✔
2389
        if (!good_server_version) {
43,788✔
2390
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2391
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2392
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2393
        }
×
2394
        server_version = changeset.remote_version;
43,788✔
2395
        // Check that per-changeset last integrated client version is "weakly"
22,466✔
2396
        // increasing.
22,466✔
2397
        bool good_client_version =
43,788✔
2398
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,788✔
2399
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,788✔
2400
        if (!good_client_version) {
43,788✔
2401
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2402
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2403
                                 "(%1, %2, %3)",
×
2404
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2405
                                 progress.download.last_integrated_client_version)};
×
2406
        }
×
2407
        last_integrated_client_version = changeset.last_integrated_local_version;
43,788✔
2408
        // Server shouldn't send our own changes, and zero is not a valid client
22,466✔
2409
        // file identifier.
22,466✔
2410
        bool good_file_ident =
43,788✔
2411
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,788✔
2412
        if (!good_file_ident) {
43,788✔
2413
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2414
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2415
                                 changeset.origin_file_ident)};
×
2416
        }
×
2417
    }
43,788✔
2418

23,790✔
2419
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
44,272✔
2420
                                       batch_state, received_changesets.size());
44,272✔
2421
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,272✔
2422
        return Status::OK();
12✔
2423
    }
12✔
2424
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,260✔
2425

23,784✔
2426
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
44,260✔
2427
        clear_resumption_delay_state();
1,582✔
2428
        return Status::OK();
1,582✔
2429
    }
1,582✔
2430

22,992✔
2431
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
42,678✔
2432

22,992✔
2433
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
42,678✔
2434
                                  batch_state, received_changesets.size());
42,678✔
2435
    if (hook_action == SyncClientHookAction::EarlyReturn) {
42,678✔
2436
        return Status::OK();
×
2437
    }
×
2438
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
42,678✔
2439

22,992✔
2440
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,992✔
2441
    // after a retryable session error.
22,992✔
2442
    clear_resumption_delay_state();
42,678✔
2443
    return Status::OK();
42,678✔
2444
}
42,678✔
2445

2446
Status Session::receive_mark_message(request_ident_type request_ident)
2447
{
15,976✔
2448
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
15,976✔
2449

7,760✔
2450
    // Ignore the message if the deactivation process has been initiated,
7,760✔
2451
    // because in that case, the associated Realm and SessionWrapper must
7,760✔
2452
    // not be accessed any longer.
7,760✔
2453
    if (m_state != Active)
15,976✔
2454
        return Status::OK(); // Success
94✔
2455

7,726✔
2456
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
15,882✔
2457
    if (REALM_UNLIKELY(!legal_at_this_time)) {
15,882✔
2458
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
×
2459
    }
×
2460
    bool good_request_ident =
15,882✔
2461
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
15,882✔
2462
    if (REALM_UNLIKELY(!good_request_ident)) {
15,882✔
2463
        return {
×
2464
            ErrorCodes::SyncProtocolInvariantFailed,
×
2465
            util::format(
×
2466
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2467
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2468
    }
×
2469

7,726✔
2470
    m_server_version_at_last_download_mark = m_progress.download.server_version;
15,882✔
2471
    m_last_download_mark_received = request_ident;
15,882✔
2472
    check_for_download_completion(); // Throws
15,882✔
2473

7,726✔
2474
    return Status::OK(); // Success
15,882✔
2475
}
15,882✔
2476

2477

2478
// The caller (Connection) must discard the session if the session has become
2479
// deactivated upon return.
2480
Status Session::receive_unbound_message()
2481
{
4,300✔
2482
    logger.debug("Received: UNBOUND");
4,300✔
2483

2,044✔
2484
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,300✔
2485
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,300✔
2486
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2487
    }
×
2488

2,044✔
2489
    // The fact that the UNBIND message has been sent, but an ERROR message has
2,044✔
2490
    // not been received, implies that the deactivation process must have been
2,044✔
2491
    // initiated, so this session must be in the Deactivating state or the session
2,044✔
2492
    // has been suspended because of a client side error.
2,044✔
2493
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,300!
2494

2,044✔
2495
    m_unbound_message_received = true;
4,300✔
2496

2,044✔
2497
    // Detect completion of the unbinding process
2,044✔
2498
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,300✔
2499
        // The deactivation process completes when the unbinding process
2,044✔
2500
        // completes.
2,044✔
2501
        complete_deactivation(); // Throws
4,300✔
2502
        // Life cycle state is now Deactivated
2,044✔
2503
    }
4,300✔
2504

2,044✔
2505
    return Status::OK(); // Success
4,300✔
2506
}
4,300✔
2507

2508

2509
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2510
{
16✔
2511
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2512
    // Ignore the message if the deactivation process has been initiated,
8✔
2513
    // because in that case, the associated Realm and SessionWrapper must
8✔
2514
    // not be accessed any longer.
8✔
2515
    if (m_state == Active) {
16✔
2516
        on_flx_sync_error(query_version, message); // throws
16✔
2517
    }
16✔
2518
    return Status::OK();
16✔
2519
}
16✔
2520

2521
// The caller (Connection) must discard the session if the session has become
2522
// deactivated upon return.
2523
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2524
{
874✔
2525
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
874✔
2526
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
874✔
2527

456✔
2528
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
874✔
2529
    if (REALM_UNLIKELY(!legal_at_this_time)) {
874✔
2530
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2531
    }
×
2532

456✔
2533
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
874✔
2534
    auto status = protocol_error_to_status(protocol_error, info.message);
874✔
2535
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
874✔
2536
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2537
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2538
                             info.raw_error_code)};
×
2539
    }
×
2540

456✔
2541
    // Can't process debug hook actions once the Session is undergoing deactivation, since
456✔
2542
    // the SessionWrapper may not be available
456✔
2543
    if (m_state == Active) {
874✔
2544
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
872✔
2545
        if (debug_action == SyncClientHookAction::EarlyReturn) {
872✔
2546
            return Status::OK();
8✔
2547
        }
8✔
2548
    }
866✔
2549

452✔
2550
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
452✔
2551
    // containing the compensating write has appeared in a download message.
452✔
2552
    if (status == ErrorCodes::SyncCompensatingWrite) {
866✔
2553
        // If the client is not active, the compensating writes will not be processed now, but will be
20✔
2554
        // sent again the next time the client connects
20✔
2555
        if (m_state == Active) {
40✔
2556
            REALM_ASSERT(info.compensating_write_server_version.has_value());
40✔
2557
            m_pending_compensating_write_errors.push_back(info);
40✔
2558
        }
40✔
2559
        return Status::OK();
40✔
2560
    }
40✔
2561

432✔
2562
    m_error_message_received = true;
826✔
2563
    suspend(SessionErrorInfo{info, std::move(status)});
826✔
2564
    return Status::OK();
826✔
2565
}
826✔
2566

2567
void Session::suspend(const SessionErrorInfo& info)
2568
{
894✔
2569
    REALM_ASSERT(!m_suspended);
894✔
2570
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
894!
2571
    logger.debug("Suspended"); // Throws
894✔
2572

466✔
2573
    m_suspended = true;
894✔
2574

466✔
2575
    // Detect completion of the unbinding process
466✔
2576
    if (m_unbind_message_send_complete && m_error_message_received) {
894!
2577
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2578
        // we received an ERROR message implies that the deactivation process must
2579
        // have been initiated, so this session must be in the Deactivating state.
2580
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2581

2582
        // The deactivation process completes when the unbinding process
2583
        // completes.
2584
        complete_deactivation(); // Throws
×
2585
        // Life cycle state is now Deactivated
2586
    }
×
2587

466✔
2588
    // Notify the application of the suspension of the session if the session is
466✔
2589
    // still in the Active state
466✔
2590
    if (m_state == Active) {
894✔
2591
        m_conn.one_less_active_unsuspended_session(); // Throws
892✔
2592
        on_suspended(info);                           // Throws
892✔
2593
    }
892✔
2594

466✔
2595
    if (!info.is_fatal) {
894✔
2596
        begin_resumption_delay(info);
362✔
2597
    }
362✔
2598

466✔
2599
    // Ready to send the UNBIND message, if it has not been sent already
466✔
2600
    if (!m_unbind_message_sent)
894✔
2601
        ensure_enlisted_to_send(); // Throws
892✔
2602
}
894✔
2603

2604
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2605
{
44✔
2606
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
44✔
2607
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2608
                           [&](const PendingTestCommand& command) {
44✔
2609
                               return command.id == ident;
44✔
2610
                           });
44✔
2611
    if (it == m_pending_test_commands.end()) {
44✔
2612
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2613
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2614
    }
×
2615

22✔
2616
    it->promise.emplace_value(std::string{body});
44✔
2617
    m_pending_test_commands.erase(it);
44✔
2618

22✔
2619
    return Status::OK();
44✔
2620
}
44✔
2621

2622
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2623
{
362✔
2624
    REALM_ASSERT(!m_try_again_activation_timer);
362✔
2625

202✔
2626
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
362✔
2627
                                  error_info.resumption_delay_interval);
362✔
2628
    auto try_again_interval = m_try_again_delay_info.delay_interval();
362✔
2629
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
362✔
2630
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2631
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2632
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2633
        try_again_interval = std::chrono::milliseconds{1000};
20✔
2634
    }
20✔
2635
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
362✔
2636
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
362✔
2637
        if (status == ErrorCodes::OperationAborted)
362✔
2638
            return;
4✔
2639
        else if (!status.is_ok())
358✔
2640
            throw Exception(status);
×
2641

200✔
2642
        m_try_again_activation_timer.reset();
358✔
2643
        cancel_resumption_delay();
358✔
2644
    });
358✔
2645
}
362✔
2646

2647
void Session::clear_resumption_delay_state()
2648
{
44,260✔
2649
    if (m_try_again_activation_timer) {
44,260✔
2650
        logger.debug("Clearing resumption delay state after successful download");
×
2651
        m_try_again_delay_info.reset();
×
2652
    }
×
2653
}
44,260✔
2654

2655
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2656
{
44,272✔
2657
    const SyncProgress& a = m_progress;
44,272✔
2658
    const SyncProgress& b = progress;
44,272✔
2659
    std::string message;
44,272✔
2660
    if (b.latest_server_version.version < a.latest_server_version.version) {
44,272✔
2661
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2662
                               "session (current: %1, received: %2)",
×
2663
                               a.latest_server_version.version, b.latest_server_version.version);
×
2664
    }
×
2665
    if (b.upload.client_version < a.upload.client_version) {
44,272✔
2666
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2667
                               "throughout a session (current: %1, received: %2)",
×
2668
                               a.upload.client_version, b.upload.client_version);
×
2669
    }
×
2670
    if (b.upload.client_version > m_last_version_available) {
44,272✔
2671
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2672
                               "version in existence (current: %1, received: %2)",
×
2673
                               m_last_version_available, b.upload.client_version);
×
2674
    }
×
2675
    if (b.download.server_version < a.download.server_version) {
44,272✔
2676
        message =
×
2677
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2678
                         a.download.server_version, b.download.server_version);
×
2679
    }
×
2680
    if (b.download.server_version > b.latest_server_version.version) {
44,272✔
2681
        message = util::format(
×
2682
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2683
            b.download.server_version, b.latest_server_version.version);
×
2684
    }
×
2685
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
44,272✔
2686
        message = util::format(
×
2687
            "Last integrated client version on the server at the position in the server's history of the download "
×
2688
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2689
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2690
    }
×
2691
    if (b.download.last_integrated_client_version > b.upload.client_version) {
44,272✔
2692
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2693
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2694
                               "on the server (download: %1, upload: %2)",
×
2695
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2696
    }
×
2697
    if (b.download.server_version < b.upload.last_integrated_server_version) {
44,272✔
2698
        message = util::format(
×
2699
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2700
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2701
            b.download.server_version, b.upload.last_integrated_server_version);
×
2702
    }
×
2703

23,790✔
2704
    if (message.empty()) {
44,272✔
2705
        return Status::OK();
44,272✔
2706
    }
44,272✔
2707
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
2708
}
×
2709

2710

2711
void Session::check_for_upload_completion()
2712
{
75,174✔
2713
    REALM_ASSERT_EX(m_state == Active, m_state);
75,174✔
2714
    if (!m_upload_completion_notification_requested) {
75,174✔
2715
        return;
43,888✔
2716
    }
43,888✔
2717

15,180✔
2718
    // during an ongoing client reset operation, we never upload anything
15,180✔
2719
    if (m_client_reset_operation)
31,286✔
2720
        return;
232✔
2721

15,064✔
2722
    // Upload process must have reached end of history
15,064✔
2723
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
31,054✔
2724
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
31,054✔
2725
    if (!scan_complete)
31,054✔
2726
        return;
5,436✔
2727

12,492✔
2728
    // All uploaded changesets must have been acknowledged by the server
12,492✔
2729
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,618✔
2730
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,618✔
2731
    if (!all_uploads_accepted)
25,618✔
2732
        return;
10,948✔
2733

7,250✔
2734
    m_upload_completion_notification_requested = false;
14,670✔
2735
    on_upload_completion(); // Throws
14,670✔
2736
}
14,670✔
2737

2738

2739
void Session::check_for_download_completion()
2740
{
59,944✔
2741
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
59,944✔
2742
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
59,944✔
2743
    if (m_last_download_mark_received == m_last_triggering_download_mark)
59,944✔
2744
        return;
43,856✔
2745
    if (m_last_download_mark_received < m_target_download_mark)
16,088✔
2746
        return;
460✔
2747
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
15,628✔
2748
        return;
×
2749
    m_last_triggering_download_mark = m_target_download_mark;
15,628✔
2750
    if (REALM_UNLIKELY(!m_allow_upload)) {
15,628✔
2751
        // Activate the upload process now, and enable immediate reactivation
1,990✔
2752
        // after a subsequent fast reconnect.
1,990✔
2753
        m_allow_upload = true;
4,412✔
2754
        ensure_enlisted_to_send(); // Throws
4,412✔
2755
    }
4,412✔
2756
    on_download_completion(); // Throws
15,628✔
2757
}
15,628✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc