• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1704

25 Sep 2023 01:32PM UTC coverage: 91.198% (-0.02%) from 91.215%
1704

push

Evergreen

web-flow
Merge pull request #7001 from realm/release/13.21.0

Release 13.21

95872 of 175714 branches covered (0.0%)

232403 of 254833 relevant lines covered (91.2%)

7022166.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.64
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <system_error>
2
#include <sstream>
3

4
#include <realm/util/assert.hpp>
5
#include <realm/util/safe_int_ops.hpp>
6
#include <realm/util/random.hpp>
7
#include <realm/util/memory_stream.hpp>
8
#include <realm/util/basic_system_errors.hpp>
9
#include <realm/util/scope_exit.hpp>
10
#include <realm/util/to_string.hpp>
11
#include <realm/util/uri.hpp>
12
#include <realm/util/platform_info.hpp>
13
#include <realm/sync/impl/clock.hpp>
14
#include <realm/impl/simulated_failure.hpp>
15
#include <realm/sync/network/http.hpp>
16
#include <realm/sync/network/websocket.hpp>
17
#include <realm/sync/noinst/client_history_impl.hpp>
18
#include <realm/sync/noinst/client_impl_base.hpp>
19
#include <realm/sync/noinst/compact_changesets.hpp>
20
#include <realm/sync/protocol.hpp>
21
#include <realm/version.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,740✔
48
    m_backoff_state.reset();
1,740✔
49
    scheduled_reset = false;
1,740✔
50
}
1,740✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,286✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,286✔
57
}
3,286✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
5,312✔
62
    if (scheduled_reset) {
5,312✔
63
        reset();
4✔
64
    }
4✔
65

2,818✔
66
    if (!m_backoff_state.triggering_error) {
5,312✔
67
        return std::chrono::milliseconds::zero();
4,090✔
68
    }
4,090✔
69

688✔
70
    switch (*m_backoff_state.triggering_error) {
1,222✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,124✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,124✔
77
                return std::chrono::milliseconds::max();
900✔
78
            }
900✔
79

112✔
80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
224✔
81
            return m_backoff_state.delay_interval();
224✔
82
    }
1,222✔
83
}
1,222✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
3,322✔
89
    util::Uri uri(url); // Throws
3,322✔
90
    uri.canonicalize(); // Throws
3,322✔
91
    std::string userinfo, address_2, port_2;
3,322✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,322✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,322✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,322✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,322✔
96
    if (REALM_UNLIKELY(!good))
3,322✔
97
        return false;
1,486✔
98
    ProtocolEnvelope protocol_2;
3,322✔
99
    port_type port_3;
3,322✔
100
    if (realm_scheme) {
3,322✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
3,322✔
111
        REALM_ASSERT(ws_scheme);
3,322✔
112
        if (uri.get_scheme() == "ws:") {
3,322✔
113
            protocol_2 = ProtocolEnvelope::ws;
3,318✔
114
            port_3 = 80;
3,318✔
115
        }
3,318✔
116
        else {
4✔
117
            protocol_2 = ProtocolEnvelope::wss;
4✔
118
            port_3 = 443;
4✔
119
        }
4✔
120
    }
3,322✔
121
    if (!port_2.empty()) {
3,322✔
122
        std::istringstream in(port_2);    // Throws
3,322✔
123
        in.imbue(std::locale::classic()); // Throws
3,322✔
124
        in >> port_3;
3,322✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,322✔
126
            return false;
1,486✔
127
    }
3,322✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
3,322✔
129

1,486✔
130
    protocol = protocol_2;
3,322✔
131
    address = std::move(address_2);
3,322✔
132
    port = port_3;
3,322✔
133
    path = std::move(path_2);
3,322✔
134
    return true;
3,322✔
135
}
3,322✔
136

137

138
ClientImpl::ClientImpl(ClientConfig config)
139
    : logger_ptr{config.logger ? std::move(config.logger) : std::make_shared<util::StderrLogger>()}
140
    , logger{*logger_ptr}
141
    , m_reconnect_mode{config.reconnect_mode}
142
    , m_connect_timeout{config.connect_timeout}
143
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
144
    , m_ping_keepalive_period{config.ping_keepalive_period}
145
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
146
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
147
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
148
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
149
    , m_dry_run{config.dry_run}
150
    , m_enable_default_port_hack{config.enable_default_port_hack}
151
    , m_disable_upload_compaction{config.disable_upload_compaction}
152
    , m_fix_up_object_ids{config.fix_up_object_ids}
153
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
154
    , m_socket_provider{std::move(config.socket_provider)}
155
    , m_client_protocol{} // Throws
156
    , m_one_connection_per_session{config.one_connection_per_session}
157
    , m_random{}
158
{
8,950✔
159
    // FIXME: Would be better if seeding was up to the application.
4,406✔
160
    util::seed_prng_nondeterministically(m_random); // Throws
8,950✔
161

4,406✔
162
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,950✔
163
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,950✔
164
                 get_current_protocol_version()); // Throws
8,950✔
165
    logger.info("Platform: %1", util::get_platform_info());
8,950✔
166
    const char* build_mode;
8,950✔
167
#if REALM_DEBUG
8,950✔
168
    build_mode = "Debug";
8,950✔
169
#else
170
    build_mode = "Release";
171
#endif
172
    logger.debug("Build mode: %1", build_mode);
8,950✔
173
    logger.debug("Config param: one_connection_per_session = %1",
8,950✔
174
                 config.one_connection_per_session); // Throws
8,950✔
175
    logger.debug("Config param: connect_timeout = %1 ms",
8,950✔
176
                 config.connect_timeout); // Throws
8,950✔
177
    logger.debug("Config param: connection_linger_time = %1 ms",
8,950✔
178
                 config.connection_linger_time); // Throws
8,950✔
179
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,950✔
180
                 config.ping_keepalive_period); // Throws
8,950✔
181
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,950✔
182
                 config.pong_keepalive_timeout); // Throws
8,950✔
183
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,950✔
184
                 config.fast_reconnect_limit); // Throws
8,950✔
185
    logger.debug("Config param: disable_upload_compaction = %1",
8,950✔
186
                 config.disable_upload_compaction); // Throws
8,950✔
187
    logger.debug("Config param: disable_sync_to_disk = %1",
8,950✔
188
                 config.disable_sync_to_disk); // Throws
8,950✔
189
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,950✔
190
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,950✔
191
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,950✔
192
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
8,950✔
193

4,406✔
194
    if (config.reconnect_mode != ReconnectMode::normal) {
8,950✔
195
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
768✔
196
                    "never do this in production!");
768✔
197
    }
768✔
198

4,406✔
199
    if (config.dry_run) {
8,950✔
200
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
201
                    "never do this in production!");
×
202
    }
×
203

4,406✔
204
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
8,950✔
205

4,406✔
206
    if (m_one_connection_per_session) {
8,950✔
207
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
208
        // multiplexing.
2✔
209
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
210
        //            "never do this in production");
2✔
211
    }
4✔
212

4,406✔
213
    if (config.disable_upload_activation_delay) {
8,950✔
214
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
215
                    "never do this in production");
×
216
    }
×
217

4,406✔
218
    if (config.disable_sync_to_disk) {
8,950✔
219
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
220
                    "never do this in production");
×
221
    }
×
222

4,406✔
223
    m_actualize_and_finalize = create_trigger([this](Status status) {
13,870✔
224
        if (status == ErrorCodes::OperationAborted)
13,870✔
225
            return;
×
226
        else if (!status.is_ok())
13,870✔
227
            throw Exception(status);
×
228
        actualize_and_finalize_session_wrappers(); // Throws
13,870✔
229
    });
13,870✔
230
}
8,950✔
231

232

233
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
234
{
98,074✔
235
    REALM_ASSERT(m_socket_provider);
98,074✔
236
    {
98,074✔
237
        std::lock_guard lock(m_drain_mutex);
98,074✔
238
        ++m_outstanding_posts;
98,074✔
239
        m_drained = false;
98,074✔
240
    }
98,074✔
241
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
98,076✔
242
        auto decr_guard = util::make_scope_exit([&]() noexcept {
98,074✔
243
            std::lock_guard lock(m_drain_mutex);
98,074✔
244
            REALM_ASSERT(m_outstanding_posts);
98,074✔
245
            --m_outstanding_posts;
98,074✔
246
            m_drain_cv.notify_all();
98,074✔
247
        });
98,074✔
248
        handler(status);
98,076✔
249
    });
98,076✔
250
}
98,074✔
251

252

253
void ClientImpl::drain_connections()
254
{
8,950✔
255
    logger.debug("Draining connections during sync client shutdown");
8,950✔
256
    for (auto& server_slot_pair : m_server_slots) {
5,638✔
257
        auto& server_slot = server_slot_pair.second;
2,328✔
258

1,096✔
259
        if (server_slot.connection) {
2,328✔
260
            auto& conn = server_slot.connection;
2,232✔
261
            conn->force_close();
2,232✔
262
        }
2,232✔
263
        else {
96✔
264
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
265
                conn_pair.second->force_close();
2✔
266
            }
2✔
267
        }
96✔
268
    }
2,328✔
269
}
8,950✔
270

271

272
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
273
                                                       SyncSocketProvider::FunctionHandler&& handler)
274
{
16,114✔
275
    REALM_ASSERT(m_socket_provider);
16,114✔
276
    {
16,114✔
277
        std::lock_guard lock(m_drain_mutex);
16,114✔
278
        ++m_outstanding_posts;
16,114✔
279
        m_drained = false;
16,114✔
280
    }
16,114✔
281
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,114✔
282
        handler(status);
16,114✔
283

7,966✔
284
        std::lock_guard lock(m_drain_mutex);
16,114✔
285
        REALM_ASSERT(m_outstanding_posts);
16,114✔
286
        --m_outstanding_posts;
16,114✔
287
        m_drain_cv.notify_all();
16,114✔
288
    });
16,114✔
289
}
16,114✔
290

291

292
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
293
{
11,392✔
294
    REALM_ASSERT(m_socket_provider);
11,392✔
295
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,392✔
296
}
11,392✔
297

298
Connection::~Connection()
299
{
2,442✔
300
    if (m_websocket_sentinel) {
2,442✔
301
        m_websocket_sentinel->destroyed = true;
×
302
        m_websocket_sentinel.reset();
×
303
    }
×
304
}
2,442✔
305

306
void Connection::activate()
307
{
2,442✔
308
    REALM_ASSERT(m_on_idle);
2,442✔
309
    m_activated = true;
2,442✔
310
    if (m_num_active_sessions == 0)
2,442✔
311
        m_on_idle->trigger();
×
312
    // We cannot in general connect immediately, because a prior failure to
1,152✔
313
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,152✔
314
    initiate_reconnect_wait(); // Throws
2,442✔
315
}
2,442✔
316

317

318
void Connection::activate_session(std::unique_ptr<Session> sess)
319
{
9,596✔
320
    REALM_ASSERT(sess);
9,596✔
321
    REALM_ASSERT(&sess->m_conn == this);
9,596✔
322
    REALM_ASSERT(!m_force_closed);
9,596✔
323
    Session& sess_2 = *sess;
9,596✔
324
    session_ident_type ident = sess->m_ident;
9,596✔
325
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,596✔
326
    bool was_inserted = p.second;
9,596✔
327
    REALM_ASSERT(was_inserted);
9,596✔
328
    // Save the session ident to the historical list of session idents
4,622✔
329
    m_session_history.insert(ident);
9,596✔
330
    sess_2.activate(); // Throws
9,596✔
331
    if (m_state == ConnectionState::connected) {
9,596✔
332
        bool fast_reconnect = false;
6,728✔
333
        sess_2.connection_established(fast_reconnect); // Throws
6,728✔
334
    }
6,728✔
335
    ++m_num_active_sessions;
9,596✔
336
}
9,596✔
337

338

339
void Connection::initiate_session_deactivation(Session* sess)
340
{
9,596✔
341
    REALM_ASSERT(sess);
9,596✔
342
    REALM_ASSERT(&sess->m_conn == this);
9,596✔
343
    REALM_ASSERT(m_num_active_sessions);
9,596✔
344
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,622✔
345
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,622✔
346
    // decrementing the num active sessions value.
4,622✔
347
    sess->initiate_deactivation(); // Throws
9,596✔
348
    if (sess->m_state == Session::Deactivated) {
9,596✔
349
        finish_session_deactivation(sess);
942✔
350
    }
942✔
351
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,596✔
352
        if (m_activated && m_state == ConnectionState::disconnected)
3,944✔
353
            m_on_idle->trigger();
324✔
354
    }
3,944✔
355
}
9,596✔
356

357

358
void Connection::cancel_reconnect_delay()
359
{
1,944✔
360
    REALM_ASSERT(m_activated);
1,944✔
361

1,126✔
362
    if (m_reconnect_delay_in_progress) {
1,944✔
363
        if (m_nonzero_reconnect_delay)
1,732✔
364
            logger.detail("Canceling reconnect delay"); // Throws
870✔
365

1,020✔
366
        // Cancel the in-progress wait operation by destroying the timer
1,020✔
367
        // object. Destruction is needed in this case, because a new wait
1,020✔
368
        // operation might have to be initiated before the previous one
1,020✔
369
        // completes (its completion handler starts to execute), so the new wait
1,020✔
370
        // operation must be done on a new timer object.
1,020✔
371
        m_reconnect_disconnect_timer.reset();
1,732✔
372
        m_reconnect_delay_in_progress = false;
1,732✔
373
        m_reconnect_info.reset();
1,732✔
374
        initiate_reconnect_wait(); // Throws
1,732✔
375
        return;
1,732✔
376
    }
1,732✔
377

106✔
378
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
379
    // that we are allowed to re-connect as quickly as possible.
106✔
380
    //
106✔
381
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
106✔
382
    // backoff/delay state before calculating the next delay, unless a PONG message is received
106✔
383
    // for the urgent PING message we send below.
106✔
384
    //
106✔
385
    // If we get a PONG message for the urgent PING message sent below, then the connection is
106✔
386
    // healthy and we can calculate the next delay normally.
106✔
387
    if (m_state != ConnectionState::disconnected) {
212✔
388
        m_reconnect_info.scheduled_reset = true;
212✔
389
        m_ping_after_scheduled_reset_of_reconnect_info = false;
212✔
390

106✔
391
        schedule_urgent_ping(); // Throws
212✔
392
        return;
212✔
393
    }
212✔
394
    // Nothing to do in this case. The next reconnect attemp will be made as
106✔
395
    // soon as there are any sessions that are both active and unsuspended.
106✔
396
}
212✔
397

398
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
399
{
7,692✔
400
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,692✔
401
    auto ident = sess->m_ident;
7,692✔
402
    m_sessions.erase(ident);
7,692✔
403
    m_session_history.erase(ident);
7,692✔
404
}
7,692✔
405

406
void Connection::force_close()
407
{
2,234✔
408
    if (m_force_closed) {
2,234✔
409
        return;
×
410
    }
×
411

1,050✔
412
    m_force_closed = true;
2,234✔
413

1,050✔
414
    if (m_state != ConnectionState::disconnected) {
2,234✔
415
        voluntary_disconnect();
2,148✔
416
    }
2,148✔
417

1,050✔
418
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,234✔
419
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,234✔
420
        m_reconnect_disconnect_timer.reset();
86✔
421
        m_reconnect_delay_in_progress = false;
86✔
422
        m_disconnect_delay_in_progress = false;
86✔
423
    }
86✔
424

1,050✔
425
    // We must copy any session pointers we want to close to a vector because force_closing
1,050✔
426
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,050✔
427
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,050✔
428
    std::vector<Session*> to_close;
2,234✔
429
    for (auto& session_pair : m_sessions) {
1,122✔
430
        if (session_pair.second->m_state == Session::State::Active) {
142✔
431
            to_close.push_back(session_pair.second.get());
142✔
432
        }
142✔
433
    }
142✔
434

1,050✔
435
    for (auto& sess : to_close) {
1,122✔
436
        sess->force_close();
142✔
437
    }
142✔
438

1,050✔
439
    logger.debug("Force closed idle connection");
2,234✔
440
}
2,234✔
441

442

443
void Connection::websocket_connected_handler(const std::string& protocol)
444
{
3,176✔
445
    if (!protocol.empty()) {
3,176✔
446
        std::string_view expected_prefix =
3,176✔
447
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,914✔
448
        // FIXME: Use std::string_view::begins_with() in C++20.
1,596✔
449
        auto prefix_matches = [&](std::string_view other) {
3,176✔
450
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,176✔
451
        };
3,176✔
452
        if (prefix_matches(expected_prefix)) {
3,176✔
453
            util::MemoryInputStream in;
3,176✔
454
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,176✔
455
            in.imbue(std::locale::classic());
3,176✔
456
            in.unsetf(std::ios_base::skipws);
3,176✔
457
            int value_2 = 0;
3,176✔
458
            in >> value_2;
3,176✔
459
            if (in && in.eof() && value_2 >= 0) {
3,176✔
460
                bool good_version =
3,176✔
461
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,176✔
462
                if (good_version) {
3,176✔
463
                    logger.detail("Negotiated protocol version: %1", value_2);
3,176✔
464
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,596✔
465
                    // will provide the appservices connection ID via a log message.
1,596✔
466
                    // TODO: Remove once the server starts sending the connection ID
1,596✔
467
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,176✔
468
                    m_negotiated_protocol_version = value_2;
3,176✔
469
                    handle_connection_established(); // Throws
3,176✔
470
                    return;
3,176✔
471
                }
3,176✔
472
            }
×
473
        }
3,176✔
474
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
475
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
476
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
477
    }
×
478
    else {
×
479
        close_due_to_client_side_error(
×
480
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
481
            ConnectionTerminationReason::bad_headers_in_http_response);
×
482
    }
×
483
}
3,176✔
484

485

486
bool Connection::websocket_binary_message_received(util::Span<const char> data)
487
{
73,072✔
488
    if (m_force_closed) {
73,072✔
489
        logger.debug("Received binary message after connection was force closed");
×
490
        return false;
×
491
    }
×
492

38,962✔
493
    using sf = SimulatedFailure;
73,072✔
494
    if (sf::check_trigger(sf::sync_client__read_head)) {
73,072✔
495
        close_due_to_client_side_error(
390✔
496
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
390✔
497
            ConnectionTerminationReason::read_or_write_error);
390✔
498
        return bool(m_websocket);
390✔
499
    }
390✔
500

38,716✔
501
    handle_message_received(data);
72,682✔
502
    return bool(m_websocket);
72,682✔
503
}
72,682✔
504

505

506
void Connection::websocket_error_handler()
507
{
566✔
508
    m_websocket_error_received = true;
566✔
509
}
566✔
510

511
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
512
{
694✔
513
    if (m_force_closed) {
694✔
514
        logger.debug("Received websocket close message after connection was force closed");
×
515
        return false;
×
516
    }
×
517
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
694✔
518

374✔
519
    switch (error_code) {
694✔
520
        case WebSocketError::websocket_ok:
76✔
521
            break;
76✔
522
        case WebSocketError::websocket_resolve_failed:
4✔
523
            [[fallthrough]];
4✔
524
        case WebSocketError::websocket_connection_failed: {
4✔
525
            SessionErrorInfo error_info(
4✔
526
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
4✔
527
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
4✔
528
            break;
4✔
529
        }
4✔
530
        case WebSocketError::websocket_read_error:
552✔
531
            [[fallthrough]];
552✔
532
        case WebSocketError::websocket_write_error: {
552✔
533
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
552✔
534
                                         ConnectionTerminationReason::read_or_write_error);
552✔
535
            break;
552✔
536
        }
552✔
537
        case WebSocketError::websocket_going_away:
302✔
538
            [[fallthrough]];
×
539
        case WebSocketError::websocket_protocol_error:
✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_unsupported_data:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_invalid_payload_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_policy_violation:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_reserved:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_no_status_received:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_invalid_extension: {
✔
552
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
553
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
554
            break;
×
555
        }
×
556
        case WebSocketError::websocket_message_too_big: {
4✔
557
            auto message = util::format(
4✔
558
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
559
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
560
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
561
            involuntary_disconnect(std::move(error_info),
4✔
562
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
563
            break;
4✔
564
        }
×
565
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
566
            close_due_to_client_side_error(
10✔
567
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
568
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
569
            break;
10✔
570
        }
×
571
        case WebSocketError::websocket_client_too_old:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_client_too_new:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_protocol_mismatch: {
✔
576
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
577
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
578
            break;
×
579
        }
×
580
        case WebSocketError::websocket_fatal_error: {
✔
581
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
582
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
583
            break;
×
584
        }
×
585
        case WebSocketError::websocket_forbidden: {
✔
586
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
587
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
588
            involuntary_disconnect(std::move(error_info),
×
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
590
            break;
×
591
        }
×
592
        case WebSocketError::websocket_unauthorized: {
36✔
593
            SessionErrorInfo error_info(
36✔
594
                {ErrorCodes::AuthError,
36✔
595
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
596
                IsFatal{false});
36✔
597
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
598
            involuntary_disconnect(std::move(error_info),
36✔
599
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
600
            break;
36✔
601
        }
×
602
        case WebSocketError::websocket_moved_permanently: {
12✔
603
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
605
            involuntary_disconnect(std::move(error_info),
12✔
606
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
607
            break;
12✔
608
        }
×
609
        case WebSocketError::websocket_abnormal_closure: {
✔
610
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
612
            involuntary_disconnect(std::move(error_info),
×
613
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_internal_server_error:
✔
617
            [[fallthrough]];
×
618
        case WebSocketError::websocket_retry_error: {
✔
619
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
621
            break;
×
622
        }
694✔
623
    }
694✔
624

374✔
625
    return bool(m_websocket);
694✔
626
}
694✔
627

628
// Guarantees that handle_reconnect_wait() is never called from within the
629
// execution of initiate_reconnect_wait() (no callback reentrance).
630
void Connection::initiate_reconnect_wait()
631
{
7,458✔
632
    REALM_ASSERT(m_activated);
7,458✔
633
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,458✔
634
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,458✔
635

3,824✔
636
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,824✔
637
    if (m_force_closed) {
7,458✔
638
        return;
2,146✔
639
    }
2,146✔
640

2,818✔
641
    m_reconnect_delay_in_progress = true;
5,312✔
642
    auto delay = m_reconnect_info.delay_interval();
5,312✔
643
    if (delay == std::chrono::milliseconds::max()) {
5,312✔
644
        logger.detail("Reconnection delayed indefinitely"); // Throws
918✔
645
        // Not actually starting a timer corresponds to an infinite wait
536✔
646
        m_nonzero_reconnect_delay = true;
918✔
647
        return;
918✔
648
    }
918✔
649

2,282✔
650
    if (delay == std::chrono::milliseconds::zero()) {
4,394✔
651
        m_nonzero_reconnect_delay = false;
4,170✔
652
    }
4,170✔
653
    else {
224✔
654
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
224✔
655
        m_nonzero_reconnect_delay = true;
224✔
656
    }
224✔
657

2,282✔
658
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,282✔
659
    // we need it to be cancelable in case the connection is terminated before the timer
2,282✔
660
    // callback is run.
2,282✔
661
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,394✔
662
        // If the operation is aborted, the connection object may have been
2,282✔
663
        // destroyed.
2,282✔
664
        if (status != ErrorCodes::OperationAborted)
4,394✔
665
            handle_reconnect_wait(status); // Throws
3,286✔
666
    });                                    // Throws
4,394✔
667
}
4,394✔
668

669

670
void Connection::handle_reconnect_wait(Status status)
671
{
3,286✔
672
    if (!status.is_ok()) {
3,286✔
673
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
674
        throw Exception(status);
×
675
    }
×
676

1,652✔
677
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,286✔
678
    m_reconnect_delay_in_progress = false;
3,286✔
679

1,652✔
680
    if (m_num_active_unsuspended_sessions > 0)
3,286✔
681
        initiate_reconnect(); // Throws
3,286✔
682
}
3,286✔
683

684
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
685
    explicit WebSocketObserverShim(Connection* conn)
686
        : conn(conn)
687
        , sentinel(conn->m_websocket_sentinel)
688
    {
3,284✔
689
    }
3,284✔
690

691
    Connection* conn;
692
    util::bind_ptr<LifecycleSentinel> sentinel;
693

694
    void websocket_connected_handler(const std::string& protocol) override
695
    {
3,176✔
696
        if (sentinel->destroyed) {
3,176✔
697
            return;
×
698
        }
×
699

1,596✔
700
        return conn->websocket_connected_handler(protocol);
3,176✔
701
    }
3,176✔
702

703
    void websocket_error_handler() override
704
    {
566✔
705
        if (sentinel->destroyed) {
566✔
706
            return;
×
707
        }
×
708

310✔
709
        conn->websocket_error_handler();
566✔
710
    }
566✔
711

712
    bool websocket_binary_message_received(util::Span<const char> data) override
713
    {
73,072✔
714
        if (sentinel->destroyed) {
73,072✔
715
            return false;
×
716
        }
×
717

38,962✔
718
        return conn->websocket_binary_message_received(data);
73,072✔
719
    }
73,072✔
720

721
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
722
    {
694✔
723
        if (sentinel->destroyed) {
694✔
724
            return true;
×
725
        }
×
726

374✔
727
        return conn->websocket_closed_handler(was_clean, error_code, msg);
694✔
728
    }
694✔
729
};
730

731
void Connection::initiate_reconnect()
732
{
3,286✔
733
    REALM_ASSERT(m_activated);
3,286✔
734

1,652✔
735
    m_state = ConnectionState::connecting;
3,286✔
736
    report_connection_state_change(ConnectionState::connecting); // Throws
3,286✔
737
    if (m_websocket_sentinel) {
3,286✔
738
        m_websocket_sentinel->destroyed = true;
×
739
    }
×
740
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,286✔
741
    m_websocket.reset();
3,286✔
742

1,652✔
743
    // Watchdog
1,652✔
744
    initiate_connect_wait(); // Throws
3,286✔
745

1,652✔
746
    std::vector<std::string> sec_websocket_protocol;
3,286✔
747
    {
3,286✔
748
        auto protocol_prefix =
3,286✔
749
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,020✔
750
        int min = get_oldest_supported_protocol_version();
3,286✔
751
        int max = get_current_protocol_version();
3,286✔
752
        REALM_ASSERT_3(min, <=, max);
3,286✔
753
        // List protocol version in descending order to ensure that the server
1,652✔
754
        // selects the highest possible version.
1,652✔
755
        for (int version = max; version >= min; --version) {
32,852✔
756
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
29,566✔
757
        }
29,566✔
758
    }
3,286✔
759

1,652✔
760
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,286✔
761
                m_server_endpoint.port, m_http_request_path_prefix);
3,286✔
762

1,652✔
763
    m_websocket_error_received = false;
3,286✔
764
    m_websocket =
3,286✔
765
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,286✔
766
                                            WebSocketEndpoint{
3,286✔
767
                                                m_server_endpoint.address,
3,286✔
768
                                                m_server_endpoint.port,
3,286✔
769
                                                get_http_request_path(),
3,286✔
770
                                                std::move(sec_websocket_protocol),
3,286✔
771
                                                is_ssl(m_server_endpoint.envelope),
3,286✔
772
                                                /// DEPRECATED - The following will be removed in a future release
1,652✔
773
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,286✔
774
                                                m_verify_servers_ssl_certificate,
3,286✔
775
                                                m_ssl_trust_certificate_path,
3,286✔
776
                                                m_ssl_verify_callback,
3,286✔
777
                                                m_proxy_config,
3,286✔
778
                                            });
3,286✔
779
}
3,286✔
780

781

782
void Connection::initiate_connect_wait()
783
{
3,286✔
784
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,652✔
785
    // fully establish the connection (including SSL and WebSocket
1,652✔
786
    // handshakes). Without such a watchdog, connect operations could take very
1,652✔
787
    // long, or even indefinite time.
1,652✔
788
    milliseconds_type time = m_client.m_connect_timeout;
3,286✔
789

1,652✔
790
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,286✔
791
        // If the operation is aborted, the connection object may have been
1,652✔
792
        // destroyed.
1,652✔
793
        if (status != ErrorCodes::OperationAborted)
3,286✔
794
            handle_connect_wait(status); // Throws
×
795
    });                                  // Throws
3,286✔
796
}
3,286✔
797

798

799
void Connection::handle_connect_wait(Status status)
800
{
×
801
    if (!status.is_ok()) {
×
802
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
803
        throw Exception(status);
×
804
    }
×
805

806
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
807
    logger.info("Connect timeout"); // Throws
×
808
    involuntary_disconnect(
×
809
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
810
                         IsFatal{false}},
×
811
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
812
}
×
813

814

815
void Connection::handle_connection_established()
816
{
3,176✔
817
    // Cancel connect timeout watchdog
1,596✔
818
    m_connect_timer.reset();
3,176✔
819

1,596✔
820
    m_state = ConnectionState::connected;
3,176✔
821

1,596✔
822
    milliseconds_type now = monotonic_clock_now();
3,176✔
823
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,176✔
824
    initiate_ping_delay(now);     // Throws
3,176✔
825

1,596✔
826
    bool fast_reconnect = false;
3,176✔
827
    if (m_disconnect_has_occurred) {
3,176✔
828
        milliseconds_type time = now - m_disconnect_time;
926✔
829
        if (time <= m_client.m_fast_reconnect_limit)
926✔
830
            fast_reconnect = true;
926✔
831
    }
926✔
832

1,596✔
833
    for (auto& p : m_sessions) {
4,214✔
834
        Session& sess = *p.second;
4,214✔
835
        sess.connection_established(fast_reconnect); // Throws
4,214✔
836
    }
4,214✔
837

1,596✔
838
    report_connection_state_change(ConnectionState::connected); // Throws
3,176✔
839
}
3,176✔
840

841

842
void Connection::schedule_urgent_ping()
843
{
212✔
844
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
212✔
845
    if (m_ping_delay_in_progress) {
212✔
846
        m_heartbeat_timer.reset();
196✔
847
        m_ping_delay_in_progress = false;
196✔
848
        m_minimize_next_ping_delay = true;
196✔
849
        milliseconds_type now = monotonic_clock_now();
196✔
850
        initiate_ping_delay(now); // Throws
196✔
851
        return;
196✔
852
    }
196✔
853
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
16✔
854
    if (!m_send_ping)
16✔
855
        m_minimize_next_ping_delay = true;
16✔
856
}
16✔
857

858

859
void Connection::initiate_ping_delay(milliseconds_type now)
860
{
3,550✔
861
    REALM_ASSERT(!m_ping_delay_in_progress);
3,550✔
862
    REALM_ASSERT(!m_waiting_for_pong);
3,550✔
863
    REALM_ASSERT(!m_send_ping);
3,550✔
864

1,744✔
865
    milliseconds_type delay = 0;
3,550✔
866
    if (!m_minimize_next_ping_delay) {
3,550✔
867
        delay = m_client.m_ping_keepalive_period;
3,350✔
868
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,650✔
869
        // the first PING message to be sent since the connection was
1,650✔
870
        // established. The purpose of this randomized deduction is to reduce
1,650✔
871
        // the risk of many connections sending PING messages simultaneously to
1,650✔
872
        // the server.
1,650✔
873
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,230✔
874
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,350✔
875
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,350✔
876
        delay -= randomized_deduction;
3,350✔
877
        // Deduct the time spent waiting for PONG
1,650✔
878
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,350✔
879
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,350✔
880
        if (spent_time < delay) {
3,350✔
881
            delay -= spent_time;
3,342✔
882
        }
3,342✔
883
        else {
8✔
884
            delay = 0;
8✔
885
        }
8✔
886
    }
3,350✔
887
    else {
200✔
888
        m_minimize_next_ping_delay = false;
200✔
889
    }
200✔
890

1,744✔
891

1,744✔
892
    m_ping_delay_in_progress = true;
3,550✔
893

1,744✔
894
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,550✔
895
        if (status == ErrorCodes::OperationAborted)
3,550✔
896
            return;
3,354✔
897
        else if (!status.is_ok())
196✔
898
            throw Exception(status);
×
899

66✔
900
        handle_ping_delay();                                    // Throws
196✔
901
    });                                                         // Throws
196✔
902
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,550✔
903
}
3,550✔
904

905

906
void Connection::handle_ping_delay()
907
{
196✔
908
    REALM_ASSERT(m_ping_delay_in_progress);
196✔
909
    m_ping_delay_in_progress = false;
196✔
910
    m_send_ping = true;
196✔
911

66✔
912
    initiate_pong_timeout(); // Throws
196✔
913

66✔
914
    if (m_state == ConnectionState::connected && !m_sending)
196✔
915
        send_next_message(); // Throws
170✔
916
}
196✔
917

918

919
void Connection::initiate_pong_timeout()
920
{
196✔
921
    REALM_ASSERT(!m_ping_delay_in_progress);
196✔
922
    REALM_ASSERT(!m_waiting_for_pong);
196✔
923
    REALM_ASSERT(m_send_ping);
196✔
924

66✔
925
    m_waiting_for_pong = true;
196✔
926
    m_pong_wait_started_at = monotonic_clock_now();
196✔
927

66✔
928
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
196✔
929
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
196✔
930
        if (status == ErrorCodes::OperationAborted)
196✔
931
            return;
184✔
932
        else if (!status.is_ok())
12✔
933
            throw Exception(status);
×
934

6✔
935
        handle_pong_timeout(); // Throws
12✔
936
    });                        // Throws
12✔
937
}
196✔
938

939

940
void Connection::handle_pong_timeout()
941
{
12✔
942
    REALM_ASSERT(m_waiting_for_pong);
12✔
943
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
944
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
945
                                 ConnectionTerminationReason::pong_timeout);
12✔
946
}
12✔
947

948

949
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
950
{
92,152✔
951
    // Stop sending messages if an websocket error was received.
46,060✔
952
    if (m_websocket_error_received)
92,152✔
953
        return;
×
954

46,060✔
955
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
92,152✔
956
        if (sentinel->destroyed) {
90,666✔
957
            return;
×
958
        }
×
959
        if (status == ErrorCodes::OperationAborted)
90,666✔
960
            return;
×
961
        else if (!status.is_ok())
90,666✔
962
            throw Exception(status);
×
963

45,120✔
964
        handle_write_message(); // Throws
90,666✔
965
    });                         // Throws
90,666✔
966
    m_sending_session = sess;
92,152✔
967
    m_sending = true;
92,152✔
968
}
92,152✔
969

970

971
void Connection::handle_write_message()
972
{
90,666✔
973
    m_sending_session->message_sent(); // Throws
90,666✔
974
    if (m_sending_session->m_state == Session::Deactivated) {
90,666✔
975
        finish_session_deactivation(m_sending_session);
96✔
976
    }
96✔
977
    m_sending_session = nullptr;
90,666✔
978
    m_sending = false;
90,666✔
979
    send_next_message(); // Throws
90,666✔
980
}
90,666✔
981

982

983
void Connection::send_next_message()
984
{
149,774✔
985
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
149,774✔
986
    REALM_ASSERT(!m_sending_session);
149,774✔
987
    REALM_ASSERT(!m_sending);
149,774✔
988
    if (m_send_ping) {
149,774✔
989
        send_ping(); // Throws
184✔
990
        return;
184✔
991
    }
184✔
992
    while (!m_sessions_enlisted_to_send.empty()) {
210,400✔
993
        // The state of being connected is not supposed to be able to change
75,692✔
994
        // across this loop thanks to the "no callback reentrance" guarantee
75,692✔
995
        // provided by Websocket::async_write_text(), and friends.
75,692✔
996
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
153,284✔
997

75,692✔
998
        Session& sess = *m_sessions_enlisted_to_send.front();
153,284✔
999
        m_sessions_enlisted_to_send.pop_front();
153,284✔
1000
        sess.send_message(); // Throws
153,284✔
1001

75,692✔
1002
        if (sess.m_state == Session::Deactivated) {
153,284✔
1003
            finish_session_deactivation(&sess);
2,662✔
1004
        }
2,662✔
1005

75,692✔
1006
        // An enlisted session may choose to not send a message. In that case,
75,692✔
1007
        // we should pass the opportunity to the next enlisted session.
75,692✔
1008
        if (m_sending)
153,284✔
1009
            break;
92,474✔
1010
    }
153,284✔
1011
}
149,590✔
1012

1013

1014
void Connection::send_ping()
1015
{
184✔
1016
    REALM_ASSERT(!m_ping_delay_in_progress);
184✔
1017
    REALM_ASSERT(m_waiting_for_pong);
184✔
1018
    REALM_ASSERT(m_send_ping);
184✔
1019

60✔
1020
    m_send_ping = false;
184✔
1021
    if (m_reconnect_info.scheduled_reset)
184✔
1022
        m_ping_after_scheduled_reset_of_reconnect_info = true;
152✔
1023

60✔
1024
    m_last_ping_sent_at = monotonic_clock_now();
184✔
1025
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
184✔
1026
                 m_previous_ping_rtt); // Throws
184✔
1027

60✔
1028
    ClientProtocol& protocol = get_client_protocol();
184✔
1029
    OutputBuffer& out = get_output_buffer();
184✔
1030
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
184✔
1031
    initiate_write_ping(out);                                          // Throws
184✔
1032
    m_ping_sent = true;
184✔
1033
}
184✔
1034

1035

1036
void Connection::initiate_write_ping(const OutputBuffer& out)
1037
{
184✔
1038
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
184✔
1039
        if (sentinel->destroyed) {
184✔
1040
            return;
×
1041
        }
×
1042
        if (status == ErrorCodes::OperationAborted)
184✔
1043
            return;
×
1044
        else if (!status.is_ok())
184✔
1045
            throw Exception(status);
×
1046

60✔
1047
        handle_write_ping(); // Throws
184✔
1048
    });                      // Throws
184✔
1049
    m_sending = true;
184✔
1050
}
184✔
1051

1052

1053
void Connection::handle_write_ping()
1054
{
184✔
1055
    REALM_ASSERT(m_sending);
184✔
1056
    REALM_ASSERT(!m_sending_session);
184✔
1057
    m_sending = false;
184✔
1058
    send_next_message(); // Throws
184✔
1059
}
184✔
1060

1061

1062
void Connection::handle_message_received(util::Span<const char> data)
1063
{
72,678✔
1064
    // parse_message_received() parses the message and calls the proper handler
38,716✔
1065
    // on the Connection object (this).
38,716✔
1066
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
72,678✔
1067
}
72,678✔
1068

1069

1070
void Connection::initiate_disconnect_wait()
1071
{
4,290✔
1072
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,290✔
1073

2,020✔
1074
    if (m_disconnect_delay_in_progress) {
4,290✔
1075
        m_reconnect_disconnect_timer.reset();
2,080✔
1076
        m_disconnect_delay_in_progress = false;
2,080✔
1077
    }
2,080✔
1078

2,020✔
1079
    milliseconds_type time = m_client.m_connection_linger_time;
4,290✔
1080

2,020✔
1081
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,290✔
1082
        // If the operation is aborted, the connection object may have been
2,020✔
1083
        // destroyed.
2,020✔
1084
        if (status != ErrorCodes::OperationAborted)
4,290✔
1085
            handle_disconnect_wait(status); // Throws
12✔
1086
    });                                     // Throws
4,290✔
1087
    m_disconnect_delay_in_progress = true;
4,290✔
1088
}
4,290✔
1089

1090

1091
void Connection::handle_disconnect_wait(Status status)
1092
{
12✔
1093
    if (!status.is_ok()) {
12✔
1094
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1095
        throw Exception(status);
×
1096
    }
×
1097

6✔
1098
    m_disconnect_delay_in_progress = false;
12✔
1099

6✔
1100
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1101
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1102
        if (m_client.m_connection_linger_time > 0)
12✔
1103
            logger.detail("Linger time expired"); // Throws
×
1104
        voluntary_disconnect();                   // Throws
12✔
1105
        logger.info("Disconnected");              // Throws
12✔
1106
    }
12✔
1107
}
12✔
1108

1109

1110
void Connection::close_due_to_protocol_error(Status status)
1111
{
4✔
1112
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
4✔
1113
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
4✔
1114
    involuntary_disconnect(std::move(error_info),
4✔
1115
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
4✔
1116
}
4✔
1117

1118

1119
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1120
{
400✔
1121
    logger.info("Connection closed due to error: %1", status); // Throws
400✔
1122

252✔
1123
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
400✔
1124
}
400✔
1125

1126

1127
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1128
{
564✔
1129
    logger.info("Connection closed due to transient error: %1", status); // Throws
564✔
1130
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
564✔
1131
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
564✔
1132

308✔
1133
    involuntary_disconnect(std::move(error_info), reason); // Throw
564✔
1134
}
564✔
1135

1136

1137
// Close connection due to error discovered on the server-side, and then
1138
// reported to the client by way of a connection-level ERROR message.
1139
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1140
{
66✔
1141
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1142
                int(error_code)); // Throws
66✔
1143

32✔
1144
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
42✔
1145
                                      : ConnectionTerminationReason::server_said_try_again_later;
56✔
1146
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1147
                           reason); // Throws
66✔
1148
}
66✔
1149

1150

1151
void Connection::disconnect(const SessionErrorInfo& info)
1152
{
3,286✔
1153
    // Cancel connect timeout watchdog
1,652✔
1154
    m_connect_timer.reset();
3,286✔
1155

1,652✔
1156
    if (m_state == ConnectionState::connected) {
3,286✔
1157
        m_disconnect_time = monotonic_clock_now();
3,176✔
1158
        m_disconnect_has_occurred = true;
3,176✔
1159

1,596✔
1160
        // Sessions that are in the Deactivating state at this time can be
1,596✔
1161
        // immediately discarded, in part because they are no longer enlisted to
1,596✔
1162
        // send. Such sessions will be taken to the Deactivated state by
1,596✔
1163
        // Session::connection_lost(), and then they will be removed from
1,596✔
1164
        // `m_sessions`.
1,596✔
1165
        auto i = m_sessions.begin(), end = m_sessions.end();
3,176✔
1166
        while (i != end) {
7,048✔
1167
            // Prevent invalidation of the main iterator when erasing elements
2,210✔
1168
            auto j = i++;
3,872✔
1169
            Session& sess = *j->second;
3,872✔
1170
            sess.connection_lost(); // Throws
3,872✔
1171
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,872✔
1172
                m_sessions.erase(j);
1,904✔
1173
        }
3,872✔
1174
    }
3,176✔
1175

1,652✔
1176
    change_state_to_disconnected();
3,286✔
1177

1,652✔
1178
    m_ping_delay_in_progress = false;
3,286✔
1179
    m_waiting_for_pong = false;
3,286✔
1180
    m_send_ping = false;
3,286✔
1181
    m_minimize_next_ping_delay = false;
3,286✔
1182
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,286✔
1183
    m_ping_sent = false;
3,286✔
1184
    m_heartbeat_timer.reset();
3,286✔
1185
    m_previous_ping_rtt = 0;
3,286✔
1186

1,652✔
1187
    m_websocket_sentinel->destroyed = true;
3,286✔
1188
    m_websocket_sentinel.reset();
3,286✔
1189
    m_websocket.reset();
3,286✔
1190
    m_input_body_buffer.reset();
3,286✔
1191
    m_sending_session = nullptr;
3,286✔
1192
    m_sessions_enlisted_to_send.clear();
3,286✔
1193
    m_sending = false;
3,286✔
1194

1,652✔
1195
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,286✔
1196
    initiate_reconnect_wait();                                           // Throws
3,286✔
1197
}
3,286✔
1198

1199
bool Connection::is_flx_sync_connection() const noexcept
1200
{
101,610✔
1201
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
101,610✔
1202
}
101,610✔
1203

1204
void Connection::receive_pong(milliseconds_type timestamp)
1205
{
178✔
1206
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
178✔
1207

56✔
1208
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
178✔
1209
    if (REALM_UNLIKELY(!legal_at_this_time)) {
178✔
1210
        close_due_to_protocol_error(
×
1211
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1212
        return;
×
1213
    }
×
1214

56✔
1215
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
178✔
1216
        close_due_to_protocol_error(
×
1217
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1218
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1219
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1220
        return;
×
1221
    }
×
1222

56✔
1223
    milliseconds_type now = monotonic_clock_now();
178✔
1224
    milliseconds_type round_trip_time = now - timestamp;
178✔
1225
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
178✔
1226
    m_previous_ping_rtt = round_trip_time;
178✔
1227

56✔
1228
    // If this PONG message is a response to a PING mesage that was sent after
56✔
1229
    // the last invocation of cancel_reconnect_delay(), then the connection is
56✔
1230
    // still good, and we do not have to skip the next reconnect delay.
56✔
1231
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
178✔
1232
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
144✔
1233
        m_ping_after_scheduled_reset_of_reconnect_info = false;
144✔
1234
        m_reconnect_info.scheduled_reset = false;
144✔
1235
    }
144✔
1236

56✔
1237
    m_heartbeat_timer.reset();
178✔
1238
    m_waiting_for_pong = false;
178✔
1239

56✔
1240
    initiate_ping_delay(now); // Throws
178✔
1241

56✔
1242
    if (m_client.m_roundtrip_time_handler)
178✔
1243
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1244
}
178✔
1245

1246
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1247
{
66,676✔
1248
    if (session_ident == 0) {
66,676✔
1249
        return nullptr;
×
1250
    }
×
1251

35,656✔
1252
    auto* sess = get_session(session_ident);
66,676✔
1253
    if (REALM_LIKELY(sess)) {
66,678✔
1254
        return sess;
66,676✔
1255
    }
66,676✔
1256
    // Check the history to see if the message received was for a previous session
2,147,483,647✔
1257
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,649✔
1258
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1259
        close_due_to_protocol_error(
×
1260
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1261
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1262
                          session_ident)});
×
1263
    }
×
1264
    else {
2,147,483,649✔
1265
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,649✔
1266
                     session_ident); // Throws
2,147,483,649✔
1267
    }
2,147,483,649✔
1268
    return nullptr;
2,147,483,649✔
1269
}
2,147,483,649✔
1270

1271
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1272
{
972✔
1273
    Session* sess = nullptr;
972✔
1274
    if (session_ident != 0) {
972✔
1275
        sess = find_and_validate_session(session_ident, "ERROR");
902✔
1276
        if (REALM_UNLIKELY(!sess)) {
902✔
1277
            return;
×
1278
        }
×
1279
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
902✔
1280
            close_due_to_protocol_error(std::move(status)); // Throws
×
1281
            return;
×
1282
        }
×
1283

452✔
1284
        if (sess->m_state == Session::Deactivated) {
902✔
1285
            finish_session_deactivation(sess);
×
1286
        }
×
1287
        return;
902✔
1288
    }
902✔
1289

34✔
1290
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1291
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1292
                info.server_requests_action); // Throws
70✔
1293

34✔
1294
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1295
    if (REALM_LIKELY(known_error_code)) {
70✔
1296
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1297
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1298
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1299
            return;
66✔
1300
        }
66✔
1301
        close_due_to_protocol_error(
×
1302
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1303
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1304
                          info.raw_error_code)});
×
1305
    }
×
1306
    else {
4✔
1307
        close_due_to_protocol_error(
4✔
1308
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1309
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1310
    }
4✔
1311
}
70✔
1312

1313

1314
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1315
                                             session_ident_type session_ident)
1316
{
16✔
1317
    if (session_ident == 0) {
16✔
1318
        return close_due_to_protocol_error(
×
1319
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1320
    }
×
1321

8✔
1322
    if (!is_flx_sync_connection()) {
16✔
1323
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1324
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1325
    }
×
1326

8✔
1327
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1328
    if (REALM_UNLIKELY(!sess)) {
16✔
1329
        return;
×
1330
    }
×
1331

8✔
1332
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1333
        close_due_to_protocol_error(std::move(status));
×
1334
    }
×
1335
}
16✔
1336

1337

1338
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1339
{
3,268✔
1340
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,268✔
1341
    if (REALM_UNLIKELY(!sess)) {
3,268✔
1342
        return;
×
1343
    }
×
1344

1,518✔
1345
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,268✔
1346
        close_due_to_protocol_error(std::move(status)); // Throws
×
1347
}
3,268✔
1348

1349
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1350
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1351
                                          DownloadBatchState batch_state,
1352
                                          const ReceivedChangesets& received_changesets)
1353
{
42,770✔
1354
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
42,770✔
1355
    if (REALM_UNLIKELY(!sess)) {
42,770✔
1356
        return;
×
1357
    }
×
1358

24,042✔
1359
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
42,770✔
1360
                                                     received_changesets);
42,770✔
1361
        !status.is_ok()) {
42,770✔
1362
        close_due_to_protocol_error(std::move(status));
×
1363
    }
×
1364
}
42,770✔
1365

1366
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1367
{
15,686✔
1368
    Session* sess = find_and_validate_session(session_ident, "MARK");
15,686✔
1369
    if (REALM_UNLIKELY(!sess)) {
15,686✔
1370
        return;
×
1371
    }
×
1372

7,752✔
1373
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
15,686✔
1374
        close_due_to_protocol_error(std::move(status)); // Throws
×
1375
}
15,686✔
1376

1377

1378
void Connection::receive_unbound_message(session_ident_type session_ident)
1379
{
3,992✔
1380
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,992✔
1381
    if (REALM_UNLIKELY(!sess)) {
3,992✔
1382
        return;
×
1383
    }
×
1384

1,862✔
1385
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,992✔
1386
        close_due_to_protocol_error(std::move(status)); // Throws
×
1387
        return;
×
1388
    }
×
1389

1,862✔
1390
    if (sess->m_state == Session::Deactivated) {
3,992✔
1391
        finish_session_deactivation(sess);
3,992✔
1392
    }
3,992✔
1393
}
3,992✔
1394

1395

1396
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1397
                                               std::string_view body)
1398
{
44✔
1399
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
44✔
1400
    if (REALM_UNLIKELY(!sess)) {
44✔
1401
        return;
×
1402
    }
×
1403

22✔
1404
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
44✔
1405
        close_due_to_protocol_error(std::move(status));
×
1406
    }
×
1407
}
44✔
1408

1409

1410
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1411
                                            std::string_view message)
1412
{
5,762✔
1413
    std::string prefix;
5,762✔
1414
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,762✔
1415
        prefix = util::format("Server[%1]", m_appservices_coid);
5,762✔
1416
    }
5,762✔
1417
    else {
×
1418
        prefix = "Server";
×
1419
    }
×
1420

2,976✔
1421
    if (session_ident != 0) {
5,762✔
1422
        if (auto sess = get_session(session_ident)) {
3,878✔
1423
            sess->logger.log(level, "%1 log: %2", prefix, message);
3,878✔
1424
            return;
3,878✔
1425
        }
3,878✔
1426

1427
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1428
        return;
×
1429
    }
×
1430

964✔
1431
    logger.log(level, "%1 log: %2", prefix, message);
1,884✔
1432
}
1,884✔
1433

1434

1435
void Connection::receive_appservices_request_id(std::string_view coid)
1436
{
5,060✔
1437
    // Only set once per connection
2,560✔
1438
    if (!coid.empty() && m_appservices_coid.empty()) {
5,060✔
1439
        m_appservices_coid = coid;
2,230✔
1440
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,230✔
1441
    }
2,230✔
1442
}
5,060✔
1443

1444

1445
void Connection::handle_protocol_error(Status status)
1446
{
×
1447
    close_due_to_protocol_error(std::move(status));
×
1448
}
×
1449

1450

1451
// Sessions are guaranteed to be granted the opportunity to send a message in
1452
// the order that they enlist. Note that this is important to ensure
1453
// nonoverlapping communication with the server for consecutive sessions
1454
// associated with the same Realm file.
1455
//
1456
// CAUTION: The specified session may get destroyed before this function
1457
// returns, but only if its Session::send_message() puts it into the Deactivated
1458
// state.
1459
void Connection::enlist_to_send(Session* sess)
1460
{
154,776✔
1461
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
154,776✔
1462
    m_sessions_enlisted_to_send.push_back(sess); // Throws
154,776✔
1463
    if (!m_sending)
154,776✔
1464
        send_next_message(); // Throws
58,756✔
1465
}
154,776✔
1466

1467

1468
std::string Connection::get_active_appservices_connection_id()
1469
{
72✔
1470
    return m_appservices_coid;
72✔
1471
}
72✔
1472

1473
void Session::cancel_resumption_delay()
1474
{
4,058✔
1475
    REALM_ASSERT_EX(m_state == Active, m_state);
4,058✔
1476

2,340✔
1477
    if (!m_suspended)
4,058✔
1478
        return;
3,656✔
1479

204✔
1480
    m_suspended = false;
402✔
1481

204✔
1482
    logger.debug("Resumed"); // Throws
402✔
1483

204✔
1484
    if (unbind_process_complete())
402✔
1485
        initiate_rebind(); // Throws
394✔
1486

204✔
1487
    m_conn.one_more_active_unsuspended_session(); // Throws
402✔
1488

204✔
1489
    on_resumed(); // Throws
402✔
1490
}
402✔
1491

1492

1493
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1494
                                                 std::vector<ProtocolErrorInfo>* out)
1495
{
20,814✔
1496
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
20,814✔
1497
        return;
20,776✔
1498
    }
20,776✔
1499

20✔
1500
#ifdef REALM_DEBUG
38✔
1501
    REALM_ASSERT_DEBUG(
38✔
1502
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
38✔
1503
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
38✔
1504
                           return lhs.compensating_write_server_version < rhs.compensating_write_server_version;
38✔
1505
                       }));
38✔
1506
#endif
38✔
1507

20✔
1508
    while (!m_pending_compensating_write_errors.empty() &&
78✔
1509
           m_pending_compensating_write_errors.front().compensating_write_server_version <=
60✔
1510
               changesets.back().version) {
40✔
1511
        auto& cur_error = m_pending_compensating_write_errors.front();
40✔
1512
        REALM_ASSERT_3(cur_error.compensating_write_server_version, >=, changesets.front().version);
40✔
1513
        out->push_back(std::move(cur_error));
40✔
1514
        m_pending_compensating_write_errors.pop_front();
40✔
1515
    }
40✔
1516
}
38✔
1517

1518

1519
void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& progress,
1520
                                   std::uint_fast64_t downloadable_bytes,
1521
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1522
                                   DownloadBatchState download_batch_state)
1523
{
40,740✔
1524
    auto& history = repl.get_history();
40,740✔
1525
    if (received_changesets.empty()) {
40,740✔
1526
        if (download_batch_state == DownloadBatchState::MoreToCome) {
19,904✔
1527
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1528
                                       "received empty download message that was not the last in batch",
×
1529
                                       ProtocolError::bad_progress);
×
1530
        }
×
1531
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
19,904✔
1532
        return;
19,904✔
1533
    }
19,904✔
1534

11,896✔
1535
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
20,836✔
1536
    auto transact = get_db()->start_read();
20,836✔
1537
    history.integrate_server_changesets(
20,836✔
1538
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
20,836✔
1539
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
20,826✔
1540
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
20,814✔
1541
        },
20,814✔
1542
        get_transact_reporter()); // Throws
20,836✔
1543
    if (received_changesets.size() == 1) {
20,836✔
1544
        logger.debug("1 remote changeset integrated, producing client version %1",
13,958✔
1545
                     version_info.sync_version.version); // Throws
13,958✔
1546
    }
13,958✔
1547
    else {
6,878✔
1548
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,878✔
1549
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,878✔
1550
    }
6,878✔
1551

11,896✔
1552
    for (const auto& pending_error : pending_compensating_write_errors) {
11,916✔
1553
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
40✔
1554
                    pending_error.compensating_write_rejected_client_version,
40✔
1555
                    pending_error.compensating_write_server_version, pending_error.message);
40✔
1556
        try {
40✔
1557
            on_connection_state_changed(
40✔
1558
                m_conn.get_state(),
40✔
1559
                SessionErrorInfo{pending_error,
40✔
1560
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
40✔
1561
                                                          pending_error.message)});
40✔
1562
        }
40✔
1563
        catch (...) {
20✔
1564
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1565
        }
×
1566
    }
40✔
1567
}
20,836✔
1568

1569

1570
void Session::on_integration_failure(const IntegrationException& error)
1571
{
32✔
1572
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1573
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1574
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1575

16✔
1576
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1577
    m_error_to_send = true;
32✔
1578

16✔
1579
    // Surface the error to the user otherwise is lost.
16✔
1580
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1581

16✔
1582
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1583
    // message cannot have been sent unless an ERROR message was received.
16✔
1584
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1585
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1586
        ensure_enlisted_to_send(); // Throws
32✔
1587
    }
32✔
1588
}
32✔
1589

1590
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1591
{
42,106✔
1592
    REALM_ASSERT_EX(m_state == Active, m_state);
42,106✔
1593
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
42,106✔
1594
    m_download_progress = progress.download;
42,106✔
1595
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
42,106✔
1596
    m_progress = progress;
42,106✔
1597
    if (upload_progressed) {
42,106✔
1598
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
30,570✔
1599
            if (progress.upload.client_version > m_upload_progress.client_version)
12,982✔
1600
                m_upload_progress = progress.upload;
868✔
1601
            m_last_version_selected_for_upload = progress.upload.client_version;
12,982✔
1602
        }
12,982✔
1603

16,480✔
1604
        check_for_upload_completion();
30,570✔
1605
    }
30,570✔
1606

23,684✔
1607
    do_recognize_sync_version(client_version); // Allows upload process to resume
42,106✔
1608
    check_for_download_completion();           // Throws
42,106✔
1609

23,684✔
1610
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,684✔
1611
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
42,106✔
1612
        auto& flx_subscription_store = *get_flx_subscription_store();
2,396✔
1613
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,396✔
1614
    }
2,396✔
1615

23,684✔
1616
    // Since the deactivation process has not been initiated, the UNBIND
23,684✔
1617
    // message cannot have been sent unless an ERROR message was received.
23,684✔
1618
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
42,106✔
1619
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
42,106✔
1620
        ensure_enlisted_to_send(); // Throws
42,102✔
1621
    }
42,102✔
1622
}
42,106✔
1623

1624

1625
Session::~Session()
1626
{
9,596✔
1627
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,622✔
1628
}
9,596✔
1629

1630

1631
std::string Session::make_logger_prefix(session_ident_type ident)
1632
{
9,596✔
1633
    std::ostringstream out;
9,596✔
1634
    out.imbue(std::locale::classic());
9,596✔
1635
    out << "Session[" << ident << "]: "; // Throws
9,596✔
1636
    return out.str();                    // Throws
9,596✔
1637
}
9,596✔
1638

1639

1640
void Session::activate()
1641
{
9,596✔
1642
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,596✔
1643

4,622✔
1644
    logger.debug("Activating"); // Throws
9,596✔
1645

4,622✔
1646
    bool has_pending_client_reset = false;
9,596✔
1647
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,596✔
1648
        // The reason we need a mutable reference from get_client_reset_config() is because we
4,622✔
1649
        // don't want the session to keep a strong reference to the client_reset_config->fresh_copy
4,622✔
1650
        // DB. If it did, then the fresh DB would stay alive for the duration of this sync session
4,622✔
1651
        // and we want to clean it up once the reset is finished. Additionally, the fresh copy will
4,622✔
1652
        // be set to a new copy on every reset so there is no reason to keep a reference to it.
4,622✔
1653
        // The modification to the client reset config happens via std::move(client_reset_config->fresh_copy).
4,622✔
1654
        // If the client reset config were a `const &` then this std::move would create another strong
4,622✔
1655
        // reference which we don't want to happen.
4,622✔
1656
        util::Optional<ClientReset>& client_reset_config = get_client_reset_config();
9,596✔
1657

4,622✔
1658
        bool file_exists = util::File::exists(get_realm_path());
9,596✔
1659

4,622✔
1660
        logger.info("client_reset_config = %1, Realm exists = %2, "
9,596✔
1661
                    "client reset = %3",
9,596✔
1662
                    client_reset_config ? "true" : "false", file_exists ? "true" : "false",
9,588✔
1663
                    (client_reset_config && file_exists) ? "true" : "false"); // Throws
9,596✔
1664
        if (client_reset_config && !m_client_reset_operation) {
9,596✔
1665
            m_client_reset_operation = std::make_unique<_impl::ClientResetOperation>(
336✔
1666
                logger, get_db(), std::move(client_reset_config->fresh_copy), client_reset_config->mode,
336✔
1667
                std::move(client_reset_config->notify_before_client_reset),
336✔
1668
                std::move(client_reset_config->notify_after_client_reset),
336✔
1669
                client_reset_config->recovery_is_allowed); // Throws
336✔
1670
        }
336✔
1671

4,622✔
1672
        if (!m_client_reset_operation) {
9,596✔
1673
            const ClientReplication& repl = access_realm(); // Throws
9,256✔
1674
            repl.get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,256✔
1675
                                          &has_pending_client_reset); // Throws
9,256✔
1676
        }
9,256✔
1677
    }
9,596✔
1678
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,596✔
1679
                 m_client_file_ident.salt); // Throws
9,596✔
1680
    m_upload_target_version = m_last_version_available;
9,596✔
1681
    m_upload_progress = m_progress.upload;
9,596✔
1682
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,596✔
1683
    m_download_progress = m_progress.download;
9,596✔
1684
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,596✔
1685

4,622✔
1686
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,596✔
1687
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,596✔
1688
    logger.debug("progress_download_client_version = %1",
9,596✔
1689
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,596✔
1690
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,596✔
1691
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,596✔
1692

4,622✔
1693
    reset_protocol_state();
9,596✔
1694
    m_state = Active;
9,596✔
1695

4,622✔
1696
    REALM_ASSERT(!m_suspended);
9,596✔
1697
    m_conn.one_more_active_unsuspended_session(); // Throws
9,596✔
1698

4,622✔
1699
    try {
9,596✔
1700
        process_pending_flx_bootstrap();
9,596✔
1701
    }
9,596✔
1702
    catch (const IntegrationException& error) {
4,624✔
1703
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1704
        m_suspended = true;
4✔
1705
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1706
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
4✔
1707
    }
4✔
1708

4,622✔
1709
    if (has_pending_client_reset) {
9,596✔
1710
        handle_pending_client_reset_acknowledgement();
24✔
1711
    }
24✔
1712
}
9,596✔
1713

1714

1715
// The caller (Connection) must discard the session if the session has become
1716
// deactivated upon return.
1717
void Session::initiate_deactivation()
1718
{
9,596✔
1719
    REALM_ASSERT_EX(m_state == Active, m_state);
9,596✔
1720

4,622✔
1721
    logger.debug("Initiating deactivation"); // Throws
9,596✔
1722

4,622✔
1723
    m_state = Deactivating;
9,596✔
1724

4,622✔
1725
    if (!m_suspended)
9,596✔
1726
        m_conn.one_less_active_unsuspended_session(); // Throws
9,072✔
1727

4,622✔
1728
    if (m_enlisted_to_send) {
9,596✔
1729
        REALM_ASSERT(!unbind_process_complete());
4,644✔
1730
        return;
4,644✔
1731
    }
4,644✔
1732

2,456✔
1733
    // Deactivate immediately if the BIND message has not yet been sent and the
2,456✔
1734
    // session is not enlisted to send, or if the unbinding process has already
2,456✔
1735
    // completed.
2,456✔
1736
    if (!m_bind_message_sent || unbind_process_complete()) {
4,952✔
1737
        complete_deactivation(); // Throws
942✔
1738
        // Life cycle state is now Deactivated
430✔
1739
        return;
942✔
1740
    }
942✔
1741

2,026✔
1742
    // Ready to send the UNBIND message, if it has not already been sent
2,026✔
1743
    if (!m_unbind_message_sent) {
4,010✔
1744
        enlist_to_send(); // Throws
3,842✔
1745
        return;
3,842✔
1746
    }
3,842✔
1747
}
4,010✔
1748

1749

1750
void Session::complete_deactivation()
1751
{
9,596✔
1752
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,596✔
1753
    m_state = Deactivated;
9,596✔
1754

4,622✔
1755
    logger.debug("Deactivation completed"); // Throws
9,596✔
1756
}
9,596✔
1757

1758

1759
// Called by the associated Connection object when this session is granted an
1760
// opportunity to send a message.
1761
//
1762
// The caller (Connection) must discard the session if the session has become
1763
// deactivated upon return.
1764
void Session::send_message()
1765
{
153,286✔
1766
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
153,286✔
1767
    REALM_ASSERT(m_enlisted_to_send);
153,286✔
1768
    m_enlisted_to_send = false;
153,286✔
1769
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
153,286✔
1770
        // Deactivation has been initiated. If the UNBIND message has not been
4,324✔
1771
        // sent yet, there is no point in sending it. Instead, we can let the
4,324✔
1772
        // deactivation process complete.
4,324✔
1773
        if (!m_bind_message_sent) {
8,964✔
1774
            return complete_deactivation(); // Throws
2,662✔
1775
            // Life cycle state is now Deactivated
1,212✔
1776
        }
2,662✔
1777

3,112✔
1778
        // Session life cycle state is Deactivating or the unbinding process has
3,112✔
1779
        // been initiated by a session specific ERROR message
3,112✔
1780
        if (!m_unbind_message_sent)
6,302✔
1781
            send_unbind_message(); // Throws
6,302✔
1782
        return;
6,302✔
1783
    }
6,302✔
1784

71,368✔
1785
    // Session life cycle state is Active and the unbinding process has
71,368✔
1786
    // not been initiated
71,368✔
1787
    REALM_ASSERT(!m_unbind_message_sent);
144,322✔
1788

71,368✔
1789
    if (!m_bind_message_sent)
144,322✔
1790
        return send_bind_message(); // Throws
8,626✔
1791

66,912✔
1792
    if (!m_ident_message_sent) {
135,696✔
1793
        if (have_client_file_ident())
6,602✔
1794
            send_ident_message(); // Throws
6,602✔
1795
        return;
6,602✔
1796
    }
6,602✔
1797

63,548✔
1798
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
129,094✔
1799
                                                      [](const PendingTestCommand& command) {
63,620✔
1800
                                                          return command.pending;
144✔
1801
                                                      });
144✔
1802
    if (has_pending_test_command) {
129,094✔
1803
        return send_test_command_message();
44✔
1804
    }
44✔
1805

63,526✔
1806
    if (m_error_to_send)
129,050✔
1807
        return send_json_error_message(); // Throws
28✔
1808

63,514✔
1809
    // Stop sending upload, mark and query messages when the client detects an error.
63,514✔
1810
    if (m_client_error) {
129,022✔
1811
        return;
16✔
1812
    }
16✔
1813

63,506✔
1814
    if (m_target_download_mark > m_last_download_mark_sent)
129,006✔
1815
        return send_mark_message(); // Throws
16,252✔
1816

55,464✔
1817
    auto is_upload_allowed = [&]() -> bool {
112,760✔
1818
        if (!m_is_flx_sync_session) {
112,756✔
1819
            return true;
103,550✔
1820
        }
103,550✔
1821

4,678✔
1822
        auto migration_store = get_migration_store();
9,206✔
1823
        if (!migration_store) {
9,206✔
1824
            return true;
×
1825
        }
×
1826

4,678✔
1827
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
9,206✔
1828
        if (!sentinel_query_version) {
9,206✔
1829
            return true;
9,180✔
1830
        }
9,180✔
1831

14✔
1832
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
14✔
1833
        return m_last_sent_flx_query_version != *sentinel_query_version;
26✔
1834
    };
26✔
1835

55,464✔
1836
    if (!is_upload_allowed()) {
112,754✔
1837
        return;
12✔
1838
    }
12✔
1839

55,458✔
1840
    auto check_pending_flx_version = [&]() -> bool {
112,748✔
1841
        if (!m_is_flx_sync_session) {
112,744✔
1842
            return false;
103,550✔
1843
        }
103,550✔
1844

4,672✔
1845
        if (!m_allow_upload) {
9,194✔
1846
            return false;
1,560✔
1847
        }
1,560✔
1848

3,888✔
1849
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(
7,634✔
1850
            m_last_sent_flx_query_version, m_upload_progress.client_version);
7,634✔
1851

3,888✔
1852
        if (!m_pending_flx_sub_set) {
7,634✔
1853
            return false;
6,032✔
1854
        }
6,032✔
1855

800✔
1856
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,602✔
1857
    };
1,602✔
1858

55,458✔
1859
    if (check_pending_flx_version()) {
112,742✔
1860
        return send_query_change_message(); // throws
838✔
1861
    }
838✔
1862

55,040✔
1863
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_upload_target_version);
111,904✔
1864
    REALM_ASSERT_3(m_upload_target_version, <=, m_last_version_available);
111,904✔
1865
    if (m_allow_upload && (m_upload_target_version > m_upload_progress.client_version)) {
111,904✔
1866
        return send_upload_message(); // Throws
53,784✔
1867
    }
53,784✔
1868
}
111,904✔
1869

1870

1871
void Session::send_bind_message()
1872
{
8,626✔
1873
    REALM_ASSERT_EX(m_state == Active, m_state);
8,626✔
1874

4,456✔
1875
    session_ident_type session_ident = m_ident;
8,626✔
1876
    bool need_client_file_ident = !have_client_file_ident();
8,626✔
1877
    const bool is_subserver = false;
8,626✔
1878

4,456✔
1879

4,456✔
1880
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,626✔
1881
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,626✔
1882
    OutputBuffer& out = m_conn.get_output_buffer();
8,626✔
1883
    // Discard the token since it's ignored by the server.
4,456✔
1884
    std::string empty_access_token;
8,626✔
1885
    if (m_is_flx_sync_session) {
8,626✔
1886
        nlohmann::json bind_json_data;
1,366✔
1887
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,366✔
1888
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1889
        }
60✔
1890
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,366✔
1891
        if (logger.would_log(util::Logger::Level::debug)) {
1,366✔
1892
            std::string json_data_dump;
1,366✔
1893
            if (!bind_json_data.empty()) {
1,366✔
1894
                json_data_dump = bind_json_data.dump();
1,366✔
1895
            }
1,366✔
1896
            logger.debug(
1,366✔
1897
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,366✔
1898
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,366✔
1899
        }
1,366✔
1900
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,366✔
1901
                                       need_client_file_ident, is_subserver); // Throws
1,366✔
1902
    }
1,366✔
1903
    else {
7,260✔
1904
        std::string server_path = get_virt_path();
7,260✔
1905
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,260✔
1906
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,260✔
1907
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,260✔
1908
                                       need_client_file_ident, is_subserver); // Throws
7,260✔
1909
    }
7,260✔
1910
    m_conn.initiate_write_message(out, this); // Throws
8,626✔
1911

4,456✔
1912
    m_bind_message_sent = true;
8,626✔
1913

4,456✔
1914
    // Ready to send the IDENT message if the file identifier pair is already
4,456✔
1915
    // available.
4,456✔
1916
    if (!need_client_file_ident)
8,626✔
1917
        enlist_to_send(); // Throws
3,514✔
1918
}
8,626✔
1919

1920

1921
void Session::send_ident_message()
1922
{
6,602✔
1923
    REALM_ASSERT_EX(m_state == Active, m_state);
6,602✔
1924
    REALM_ASSERT(m_bind_message_sent);
6,602✔
1925
    REALM_ASSERT(!m_unbind_message_sent);
6,602✔
1926
    REALM_ASSERT(have_client_file_ident());
6,602✔
1927

3,364✔
1928

3,364✔
1929
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,602✔
1930
    OutputBuffer& out = m_conn.get_output_buffer();
6,602✔
1931
    session_ident_type session_ident = m_ident;
6,602✔
1932

3,364✔
1933
    if (m_is_flx_sync_session) {
6,602✔
1934
        const auto active_query_set = get_flx_subscription_store()->get_active();
956✔
1935
        const auto active_query_body = active_query_set.to_ext_json();
956✔
1936
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
956✔
1937
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
956✔
1938
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
956✔
1939
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
956✔
1940
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
956✔
1941
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
956✔
1942
                     active_query_body); // Throws
956✔
1943
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
956✔
1944
                                        active_query_set.version(), active_query_body); // Throws
956✔
1945
        m_last_sent_flx_query_version = active_query_set.version();
956✔
1946
    }
956✔
1947
    else {
5,646✔
1948
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,646✔
1949
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,646✔
1950
                     "latest_server_version_salt=%6)",
5,646✔
1951
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,646✔
1952
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,646✔
1953
                     m_progress.latest_server_version.salt);                                  // Throws
5,646✔
1954
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,646✔
1955
    }
5,646✔
1956
    m_conn.initiate_write_message(out, this); // Throws
6,602✔
1957

3,364✔
1958
    m_ident_message_sent = true;
6,602✔
1959

3,364✔
1960
    // Other messages may be waiting to be sent
3,364✔
1961
    enlist_to_send(); // Throws
6,602✔
1962
}
6,602✔
1963

1964
void Session::send_query_change_message()
1965
{
838✔
1966
    REALM_ASSERT_EX(m_state == Active, m_state);
838✔
1967
    REALM_ASSERT(m_ident_message_sent);
838✔
1968
    REALM_ASSERT(!m_unbind_message_sent);
838✔
1969
    REALM_ASSERT(m_pending_flx_sub_set);
838✔
1970
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
838✔
1971

418✔
1972
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
838✔
1973
        return;
×
1974
    }
×
1975

418✔
1976
    auto sub_store = get_flx_subscription_store();
838✔
1977
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
838✔
1978
    auto latest_queries = latest_sub_set.to_ext_json();
838✔
1979
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
838✔
1980
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
838✔
1981

418✔
1982
    OutputBuffer& out = m_conn.get_output_buffer();
838✔
1983
    session_ident_type session_ident = get_ident();
838✔
1984
    ClientProtocol& protocol = m_conn.get_client_protocol();
838✔
1985
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
838✔
1986
    m_conn.initiate_write_message(out, this);
838✔
1987

418✔
1988
    m_last_sent_flx_query_version = latest_sub_set.version();
838✔
1989

418✔
1990
    request_download_completion_notification();
838✔
1991
}
838✔
1992

1993
void Session::send_upload_message()
1994
{
53,786✔
1995
    REALM_ASSERT_EX(m_state == Active, m_state);
53,786✔
1996
    REALM_ASSERT(m_ident_message_sent);
53,786✔
1997
    REALM_ASSERT(!m_unbind_message_sent);
53,786✔
1998
    REALM_ASSERT_3(m_upload_target_version, >, m_upload_progress.client_version);
53,786✔
1999

26,798✔
2000
    if (REALM_UNLIKELY(get_client().is_dry_run()))
53,786✔
2001
        return;
26,798✔
2002

26,798✔
2003
    auto target_upload_version = m_upload_target_version;
53,786✔
2004
    if (m_is_flx_sync_session) {
53,786✔
2005
        if (!m_pending_flx_sub_set || m_pending_flx_sub_set->snapshot_version < m_upload_progress.client_version) {
3,926✔
2006
            m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(
3,162✔
2007
                m_last_sent_flx_query_version, m_upload_progress.client_version);
3,162✔
2008
        }
3,162✔
2009
        if (m_pending_flx_sub_set && m_pending_flx_sub_set->snapshot_version < m_upload_target_version) {
3,926✔
2010
            target_upload_version = m_pending_flx_sub_set->snapshot_version;
764✔
2011
        }
764✔
2012
    }
3,926✔
2013

26,798✔
2014
    const ClientReplication& repl = access_realm(); // Throws
53,786✔
2015

26,798✔
2016
    std::vector<UploadChangeset> uploadable_changesets;
53,786✔
2017
    version_type locked_server_version = 0;
53,786✔
2018
    repl.get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
53,786✔
2019
                                                  locked_server_version); // Throws
53,786✔
2020

26,798✔
2021
    if (uploadable_changesets.empty()) {
53,786✔
2022
        // Nothing more to upload right now
13,022✔
2023
        check_for_upload_completion(); // Throws
26,526✔
2024
        // If we need to limit upload up to some version other than the last client version available and there are no
13,022✔
2025
        // changes to upload, then there is no need to send an empty message.
13,022✔
2026
        if (target_upload_version != m_upload_target_version) {
26,526✔
2027
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
324✔
2028
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
324✔
2029
            // Other messages may be waiting to be sent
162✔
2030
            return enlist_to_send(); // Throws
324✔
2031
        }
324✔
2032
    }
27,260✔
2033
    else {
27,260✔
2034
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
27,260✔
2035
    }
27,260✔
2036

26,798✔
2037
    if (m_is_flx_sync_session && m_pending_flx_sub_set && target_upload_version != m_upload_target_version) {
53,624✔
2038
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
440✔
2039
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
440✔
2040
    }
440✔
2041

26,636✔
2042
    version_type progress_client_version = m_upload_progress.client_version;
53,462✔
2043
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
53,462✔
2044

26,636✔
2045
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
53,462✔
2046
                 "locked_server_version=%3, num_changesets=%4)",
53,462✔
2047
                 progress_client_version, progress_server_version, locked_server_version,
53,462✔
2048
                 uploadable_changesets.size()); // Throws
53,462✔
2049

26,636✔
2050
    ClientProtocol& protocol = m_conn.get_client_protocol();
53,462✔
2051
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
53,462✔
2052

26,636✔
2053
    for (const UploadChangeset& uc : uploadable_changesets) {
48,464✔
2054
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
42,172✔
2055
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
42,172✔
2056
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
42,172✔
2057
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
42,172✔
2058
        if (logger.would_log(util::Logger::Level::trace)) {
42,172✔
2059
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2060
            if (changeset_data.size() < 1024) {
×
2061
                logger.trace("Changeset: %1",
×
2062
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2063
            }
×
2064
            else {
×
2065
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2066
                             protocol.compressed_hex_dump(changeset_data));
×
2067
            }
×
2068

2069
#if REALM_DEBUG
×
2070
            ChunkedBinaryInputStream in{changeset_data};
×
2071
            Changeset log;
×
2072
            try {
×
2073
                parse_changeset(in, log);
×
2074
                std::stringstream ss;
×
2075
                log.print(ss);
×
2076
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2077
            }
×
2078
            catch (const BadChangesetError& err) {
×
2079
                logger.error("Unable to parse changeset: %1", err.what());
×
2080
            }
×
2081
#endif
×
2082
        }
×
2083

20,344✔
2084
#if 0 // Upload log compaction is currently not implemented
2085
        if (!get_client().m_disable_upload_compaction) {
2086
            ChangesetEncoder::Buffer encode_buffer;
2087

2088
            {
2089
                // Upload compaction only takes place within single changesets to
2090
                // avoid another client seeing inconsistent snapshots.
2091
                ChunkedBinaryInputStream stream{uc.changeset};
2092
                Changeset changeset;
2093
                parse_changeset(stream, changeset); // Throws
2094
                // FIXME: What is the point of setting these? How can compaction care about them?
2095
                changeset.version = uc.progress.client_version;
2096
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2097
                changeset.origin_timestamp = uc.origin_timestamp;
2098
                changeset.origin_file_ident = uc.origin_file_ident;
2099

2100
                compact_changesets(&changeset, 1);
2101
                encode_changeset(changeset, encode_buffer);
2102

2103
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2104
                             encode_buffer.size()); // Throws
2105
            }
2106

2107
            upload_message_builder.add_changeset(
2108
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2109
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2110
        }
2111
        else
2112
#endif
2113
        {
42,172✔
2114
            upload_message_builder.add_changeset(uc.progress.client_version,
42,172✔
2115
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
42,172✔
2116
                                                 uc.origin_file_ident,
42,172✔
2117
                                                 uc.changeset); // Throws
42,172✔
2118
        }
42,172✔
2119
    }
42,172✔
2120

26,636✔
2121
    int protocol_version = m_conn.get_negotiated_protocol_version();
53,462✔
2122
    OutputBuffer& out = m_conn.get_output_buffer();
53,462✔
2123
    session_ident_type session_ident = get_ident();
53,462✔
2124
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
53,462✔
2125
                                               progress_server_version,
53,462✔
2126
                                               locked_server_version); // Throws
53,462✔
2127
    m_conn.initiate_write_message(out, this);                          // Throws
53,462✔
2128

26,636✔
2129
    // Other messages may be waiting to be sent
26,636✔
2130
    enlist_to_send(); // Throws
53,462✔
2131
}
53,462✔
2132

2133

2134
void Session::send_mark_message()
2135
{
16,252✔
2136
    REALM_ASSERT_EX(m_state == Active, m_state);
16,252✔
2137
    REALM_ASSERT(m_ident_message_sent);
16,252✔
2138
    REALM_ASSERT(!m_unbind_message_sent);
16,252✔
2139
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
16,252✔
2140

8,042✔
2141
    request_ident_type request_ident = m_target_download_mark;
16,252✔
2142
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
16,252✔
2143

8,042✔
2144
    ClientProtocol& protocol = m_conn.get_client_protocol();
16,252✔
2145
    OutputBuffer& out = m_conn.get_output_buffer();
16,252✔
2146
    session_ident_type session_ident = get_ident();
16,252✔
2147
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
16,252✔
2148
    m_conn.initiate_write_message(out, this);                      // Throws
16,252✔
2149

8,042✔
2150
    m_last_download_mark_sent = request_ident;
16,252✔
2151

8,042✔
2152
    // Other messages may be waiting to be sent
8,042✔
2153
    enlist_to_send(); // Throws
16,252✔
2154
}
16,252✔
2155

2156

2157
void Session::send_unbind_message()
2158
{
6,302✔
2159
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,302✔
2160
    REALM_ASSERT(m_bind_message_sent);
6,302✔
2161
    REALM_ASSERT(!m_unbind_message_sent);
6,302✔
2162

3,112✔
2163
    logger.debug("Sending: UNBIND"); // Throws
6,302✔
2164

3,112✔
2165
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,302✔
2166
    OutputBuffer& out = m_conn.get_output_buffer();
6,302✔
2167
    session_ident_type session_ident = get_ident();
6,302✔
2168
    protocol.make_unbind_message(out, session_ident); // Throws
6,302✔
2169
    m_conn.initiate_write_message(out, this);         // Throws
6,302✔
2170

3,112✔
2171
    m_unbind_message_sent = true;
6,302✔
2172
}
6,302✔
2173

2174

2175
void Session::send_json_error_message()
2176
{
28✔
2177
    REALM_ASSERT_EX(m_state == Active, m_state);
28✔
2178
    REALM_ASSERT(m_ident_message_sent);
28✔
2179
    REALM_ASSERT(!m_unbind_message_sent);
28✔
2180
    REALM_ASSERT(m_error_to_send);
28✔
2181
    REALM_ASSERT(m_client_error);
28✔
2182

12✔
2183
    ClientProtocol& protocol = m_conn.get_client_protocol();
28✔
2184
    OutputBuffer& out = m_conn.get_output_buffer();
28✔
2185
    session_ident_type session_ident = get_ident();
28✔
2186
    auto protocol_error = m_client_error->error_for_server;
28✔
2187

12✔
2188
    auto message = util::format("%1", m_client_error->to_status());
28✔
2189
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
28✔
2190
                session_ident); // Throws
28✔
2191

12✔
2192
    nlohmann::json error_body_json;
28✔
2193
    error_body_json["message"] = std::move(message);
28✔
2194
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
28✔
2195
                                     error_body_json.dump()); // Throws
28✔
2196
    m_conn.initiate_write_message(out, this);                 // Throws
28✔
2197

12✔
2198
    m_error_to_send = false;
28✔
2199
    enlist_to_send(); // Throws
28✔
2200
}
28✔
2201

2202

2203
void Session::send_test_command_message()
2204
{
44✔
2205
    REALM_ASSERT_EX(m_state == Active, m_state);
44✔
2206

22✔
2207
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2208
                           [](const PendingTestCommand& command) {
44✔
2209
                               return command.pending;
44✔
2210
                           });
44✔
2211
    REALM_ASSERT(it != m_pending_test_commands.end());
44✔
2212

22✔
2213
    ClientProtocol& protocol = m_conn.get_client_protocol();
44✔
2214
    OutputBuffer& out = m_conn.get_output_buffer();
44✔
2215
    auto session_ident = get_ident();
44✔
2216

22✔
2217
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
44✔
2218
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
44✔
2219

22✔
2220
    m_conn.initiate_write_message(out, this); // Throws;
44✔
2221
    it->pending = false;
44✔
2222

22✔
2223
    enlist_to_send();
44✔
2224
}
44✔
2225

2226

2227
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2228
{
3,268✔
2229
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,268✔
2230
                 client_file_ident.salt); // Throws
3,268✔
2231

1,518✔
2232
    // Ignore the message if the deactivation process has been initiated,
1,518✔
2233
    // because in that case, the associated Realm and SessionWrapper must
1,518✔
2234
    // not be accessed any longer.
1,518✔
2235
    if (m_state != Active)
3,268✔
2236
        return Status::OK(); // Success
102✔
2237

1,500✔
2238
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,166✔
2239
                               !m_unbound_message_received);
3,166✔
2240
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,166✔
2241
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2242
    }
×
2243
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,166✔
2244
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2245
    }
×
2246
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,166✔
2247
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2248
    }
×
2249

1,500✔
2250
    m_client_file_ident = client_file_ident;
3,166✔
2251

1,500✔
2252
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,166✔
2253
        // Ready to send the IDENT message
2254
        ensure_enlisted_to_send(); // Throws
×
2255
        return Status::OK();       // Success
×
2256
    }
×
2257

1,500✔
2258
    // access before the client reset (if applicable) because
1,500✔
2259
    // the reset can take a while and the sync session might have died
1,500✔
2260
    // by the time the reset finishes.
1,500✔
2261
    ClientReplication& repl = access_realm(); // Throws
3,166✔
2262

1,500✔
2263
    auto client_reset_if_needed = [&]() -> bool {
3,166✔
2264
        if (!m_client_reset_operation) {
3,166✔
2265
            return false;
2,830✔
2266
        }
2,830✔
2267

168✔
2268
        // ClientResetOperation::finalize() will return true only if the operation actually did
168✔
2269
        // a client reset. It may choose not to do a reset if the local Realm does not exist
168✔
2270
        // at this point (in that case there is nothing to reset). But in any case, we must
168✔
2271
        // clean up m_client_reset_operation at this point as sync should be able to continue from
168✔
2272
        // this point forward.
168✔
2273
        auto client_reset_operation = std::move(m_client_reset_operation);
336✔
2274
        util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
220✔
2275
            this->on_flx_sync_version_complete(version);
104✔
2276
        };
104✔
2277
        if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
336✔
2278
                                              std::move(on_flx_subscription_complete))) {
168✔
2279
            return false;
×
2280
        }
×
2281
        realm::VersionID client_reset_old_version = client_reset_operation->get_client_reset_old_version();
336✔
2282
        realm::VersionID client_reset_new_version = client_reset_operation->get_client_reset_new_version();
336✔
2283

168✔
2284
        // The fresh Realm has been used to reset the state
168✔
2285
        logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2286

168✔
2287
        SaltedFileIdent client_file_ident;
336✔
2288
        bool has_pending_client_reset = false;
336✔
2289
        repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress,
336✔
2290
                                      &has_pending_client_reset); // Throws
336✔
2291
        REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
336✔
2292
        REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
336✔
2293
        REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2294
                        m_progress.download.last_integrated_client_version);
336✔
2295
        REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
336✔
2296
        REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
336✔
2297
                        m_progress.upload.last_integrated_server_version);
336✔
2298
        logger.trace("last_version_available  = %1", m_last_version_available); // Throws
336✔
2299

168✔
2300
        m_upload_target_version = m_last_version_available;
336✔
2301
        m_upload_progress = m_progress.upload;
336✔
2302
        m_download_progress = m_progress.download;
336✔
2303
        // In recovery mode, there may be new changesets to upload and nothing left to download.
168✔
2304
        // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
168✔
2305
        // For both, we want to allow uploads again without needing external changes to download first.
168✔
2306
        m_allow_upload = true;
336✔
2307
        REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
336✔
2308

168✔
2309
        get_transact_reporter()->report_sync_transact(client_reset_old_version, client_reset_new_version);
336✔
2310

168✔
2311
        if (has_pending_client_reset) {
336✔
2312
            handle_pending_client_reset_acknowledgement();
268✔
2313
        }
268✔
2314

168✔
2315
        // If a migration or rollback is in progress, mark it complete when client reset is completed.
168✔
2316
        if (auto migration_store = get_migration_store()) {
336✔
2317
            migration_store->complete_migration_or_rollback();
240✔
2318
        }
240✔
2319

168✔
2320
        return true;
336✔
2321
    };
336✔
2322
    // if a client reset happens, it will take care of setting the file ident
1,500✔
2323
    // and if not, we do it here
1,500✔
2324
    bool did_client_reset = false;
3,166✔
2325
    try {
3,166✔
2326
        did_client_reset = client_reset_if_needed();
3,166✔
2327
    }
3,166✔
2328
    catch (const std::exception& e) {
1,534✔
2329
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
68✔
2330
        logger.error(err_msg.c_str());
68✔
2331
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
68✔
2332
        suspend(err_info);
68✔
2333
        return Status::OK();
68✔
2334
    }
68✔
2335
    if (!did_client_reset) {
3,098✔
2336
        repl.get_history().set_client_file_ident(client_file_ident,
2,830✔
2337
                                                 m_fix_up_object_ids); // Throws
2,830✔
2338
        m_progress.download.last_integrated_client_version = 0;
2,830✔
2339
        m_progress.upload.client_version = 0;
2,830✔
2340
        m_last_version_selected_for_upload = 0;
2,830✔
2341
    }
2,830✔
2342

1,466✔
2343
    // Ready to send the IDENT message
1,466✔
2344
    ensure_enlisted_to_send(); // Throws
3,098✔
2345
    return Status::OK();       // Success
3,098✔
2346
}
3,098✔
2347

2348
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2349
                                         DownloadBatchState batch_state, int64_t query_version,
2350
                                         const ReceivedChangesets& received_changesets)
2351
{
42,768✔
2352
    REALM_ASSERT_EX(query_version >= 0, query_version);
42,768✔
2353
    // Ignore the message if the deactivation process has been initiated,
24,044✔
2354
    // because in that case, the associated Realm and SessionWrapper must
24,044✔
2355
    // not be accessed any longer.
24,044✔
2356
    if (m_state != Active)
42,768✔
2357
        return Status::OK();
458✔
2358

23,788✔
2359
    if (is_steady_state_download_message(batch_state, query_version)) {
42,310✔
2360
        batch_state = DownloadBatchState::SteadyState;
40,746✔
2361
    }
40,746✔
2362

23,788✔
2363
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
42,310✔
2364
                 "latest_server_version=%3, latest_server_version_salt=%4, "
42,310✔
2365
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
42,310✔
2366
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
42,310✔
2367
                 progress.download.server_version, progress.download.last_integrated_client_version,
42,310✔
2368
                 progress.latest_server_version.version, progress.latest_server_version.salt,
42,310✔
2369
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
42,310✔
2370
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
42,310✔
2371

23,788✔
2372
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,788✔
2373
    // changeset over and over again.
23,788✔
2374
    if (m_client_error) {
42,310✔
2375
        logger.debug("Ignoring download message because the client detected an integration error");
×
2376
        return Status::OK();
×
2377
    }
×
2378

23,788✔
2379
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
42,314✔
2380
    if (REALM_UNLIKELY(!legal_at_this_time)) {
42,310✔
2381
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2382
    }
×
2383
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
42,310✔
2384
        logger.error("Bad sync progress received (%1)", status);
×
2385
        return status;
×
2386
    }
×
2387

23,788✔
2388
    version_type server_version = m_progress.download.server_version;
42,310✔
2389
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
42,310✔
2390
    for (const Transformer::RemoteChangeset& changeset : received_changesets) {
44,510✔
2391
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,548✔
2392
        // version must be increasing, but can stay the same during bootstraps.
22,548✔
2393
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,364✔
2394
                                                         : (changeset.remote_version > server_version);
42,454✔
2395
        // Each server version cannot be greater than the one in the header of the download message.
22,548✔
2396
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,270✔
2397
        if (!good_server_version) {
43,270✔
2398
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2399
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2400
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2401
        }
×
2402
        server_version = changeset.remote_version;
43,270✔
2403
        // Check that per-changeset last integrated client version is "weakly"
22,548✔
2404
        // increasing.
22,548✔
2405
        bool good_client_version =
43,270✔
2406
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,270✔
2407
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,266✔
2408
        if (!good_client_version) {
43,270✔
2409
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2410
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2411
                                 "(%1, %2, %3)",
×
2412
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2413
                                 progress.download.last_integrated_client_version)};
×
2414
        }
×
2415
        last_integrated_client_version = changeset.last_integrated_local_version;
43,270✔
2416
        // Server shouldn't send our own changes, and zero is not a valid client
22,548✔
2417
        // file identifier.
22,548✔
2418
        bool good_file_ident =
43,270✔
2419
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,270✔
2420
        if (!good_file_ident) {
43,270✔
2421
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2422
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2423
                                 changeset.origin_file_ident)};
×
2424
        }
×
2425
    }
43,270✔
2426

23,788✔
2427
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
42,310✔
2428
                                       batch_state, received_changesets.size());
42,310✔
2429
    if (hook_action == SyncClientHookAction::EarlyReturn) {
42,310✔
2430
        return Status::OK();
12✔
2431
    }
12✔
2432
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
42,298✔
2433

23,782✔
2434
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
42,298✔
2435
        clear_resumption_delay_state();
1,558✔
2436
        return Status::OK();
1,558✔
2437
    }
1,558✔
2438

23,002✔
2439
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
40,740✔
2440

23,002✔
2441
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
40,740✔
2442
                                  batch_state, received_changesets.size());
40,740✔
2443
    if (hook_action == SyncClientHookAction::EarlyReturn) {
40,740✔
2444
        return Status::OK();
×
2445
    }
×
2446
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
40,740✔
2447

23,002✔
2448
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
23,002✔
2449
    // after a retryable session error.
23,002✔
2450
    clear_resumption_delay_state();
40,740✔
2451
    return Status::OK();
40,740✔
2452
}
40,740✔
2453

2454
Status Session::receive_mark_message(request_ident_type request_ident)
2455
{
15,686✔
2456
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
15,686✔
2457

7,752✔
2458
    // Ignore the message if the deactivation process has been initiated,
7,752✔
2459
    // because in that case, the associated Realm and SessionWrapper must
7,752✔
2460
    // not be accessed any longer.
7,752✔
2461
    if (m_state != Active)
15,686✔
2462
        return Status::OK(); // Success
46✔
2463

7,716✔
2464
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
15,640✔
2465
    if (REALM_UNLIKELY(!legal_at_this_time)) {
15,640✔
2466
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
×
2467
    }
×
2468
    bool good_request_ident =
15,640✔
2469
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
15,640✔
2470
    if (REALM_UNLIKELY(!good_request_ident)) {
15,640✔
2471
        return {
×
2472
            ErrorCodes::SyncProtocolInvariantFailed,
×
2473
            util::format(
×
2474
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2475
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2476
    }
×
2477

7,716✔
2478
    m_server_version_at_last_download_mark = m_progress.download.server_version;
15,640✔
2479
    m_last_download_mark_received = request_ident;
15,640✔
2480
    check_for_download_completion(); // Throws
15,640✔
2481

7,716✔
2482
    return Status::OK(); // Success
15,640✔
2483
}
15,640✔
2484

2485

2486
// The caller (Connection) must discard the session if the session has become
2487
// deactivated upon return.
2488
Status Session::receive_unbound_message()
2489
{
3,992✔
2490
    logger.debug("Received: UNBOUND");
3,992✔
2491

1,862✔
2492
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,992✔
2493
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,992✔
2494
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2495
    }
×
2496

1,862✔
2497
    // The fact that the UNBIND message has been sent, but an ERROR message has
1,862✔
2498
    // not been received, implies that the deactivation process must have been
1,862✔
2499
    // initiated, so this session must be in the Deactivating state or the session
1,862✔
2500
    // has been suspended because of a client side error.
1,862✔
2501
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,992!
2502

1,862✔
2503
    m_unbound_message_received = true;
3,992✔
2504

1,862✔
2505
    // Detect completion of the unbinding process
1,862✔
2506
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,992✔
2507
        // The deactivation process completes when the unbinding process
1,862✔
2508
        // completes.
1,862✔
2509
        complete_deactivation(); // Throws
3,992✔
2510
        // Life cycle state is now Deactivated
1,862✔
2511
    }
3,992✔
2512

1,862✔
2513
    return Status::OK(); // Success
3,992✔
2514
}
3,992✔
2515

2516

2517
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2518
{
16✔
2519
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2520
    // Ignore the message if the deactivation process has been initiated,
8✔
2521
    // because in that case, the associated Realm and SessionWrapper must
8✔
2522
    // not be accessed any longer.
8✔
2523
    if (m_state == Active) {
16✔
2524
        on_flx_sync_error(query_version, std::string_view(message.data(), message.size())); // throws
16✔
2525
    }
16✔
2526
    return Status::OK();
16✔
2527
}
16✔
2528

2529
// The caller (Connection) must discard the session if the session has become
2530
// deactivated upon return.
2531
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2532
{
902✔
2533
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
902✔
2534
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
902✔
2535

452✔
2536
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
902✔
2537
    if (REALM_UNLIKELY(!legal_at_this_time)) {
902✔
2538
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2539
    }
×
2540

452✔
2541
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
902✔
2542
    auto status = protocol_error_to_status(protocol_error, info.message);
902✔
2543
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
902✔
2544
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2545
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2546
                             info.raw_error_code)};
×
2547
    }
×
2548

452✔
2549
    // Can't process debug hook actions once the Session is undergoing deactivation, since
452✔
2550
    // the SessionWrapper may not be available
452✔
2551
    if (m_state == Active) {
902✔
2552
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
898✔
2553
        if (debug_action == SyncClientHookAction::EarlyReturn) {
898✔
2554
            return Status::OK();
4✔
2555
        }
4✔
2556
    }
898✔
2557

450✔
2558
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
450✔
2559
    // containing the compensating write has appeared in a download message.
450✔
2560
    if (status == ErrorCodes::SyncCompensatingWrite) {
898✔
2561
        // If the client is not active, the compensating writes will not be processed now, but will be
20✔
2562
        // sent again the next time the client connects
20✔
2563
        if (m_state == Active) {
40✔
2564
            m_pending_compensating_write_errors.push_back(info);
40✔
2565
        }
40✔
2566
        return Status::OK();
40✔
2567
    }
40✔
2568

430✔
2569
    m_error_message_received = true;
858✔
2570
    suspend(SessionErrorInfo{info, std::move(status)});
858✔
2571
    return Status::OK();
858✔
2572
}
858✔
2573

2574
void Session::suspend(const SessionErrorInfo& info)
2575
{
926✔
2576
    REALM_ASSERT(!m_suspended);
926✔
2577
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
926✔
2578
    logger.debug("Suspended"); // Throws
926✔
2579

464✔
2580
    m_suspended = true;
926✔
2581

464✔
2582
    // Detect completion of the unbinding process
464✔
2583
    if (m_unbind_message_send_complete && m_error_message_received) {
926!
2584
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2585
        // we received an ERROR message implies that the deactivation process must
2586
        // have been initiated, so this session must be in the Deactivating state.
2587
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2588

2589
        // The deactivation process completes when the unbinding process
2590
        // completes.
2591
        complete_deactivation(); // Throws
×
2592
        // Life cycle state is now Deactivated
2593
    }
×
2594

464✔
2595
    // Notify the application of the suspension of the session if the session is
464✔
2596
    // still in the Active state
464✔
2597
    if (m_state == Active) {
926✔
2598
        m_conn.one_less_active_unsuspended_session(); // Throws
922✔
2599
        on_suspended(info);                           // Throws
922✔
2600
    }
922✔
2601

464✔
2602
    if (!info.is_fatal) {
926✔
2603
        begin_resumption_delay(info);
398✔
2604
    }
398✔
2605

464✔
2606
    // Ready to send the UNBIND message, if it has not been sent already
464✔
2607
    if (!m_unbind_message_sent)
926✔
2608
        ensure_enlisted_to_send(); // Throws
922✔
2609
}
926✔
2610

2611
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2612
{
44✔
2613
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
44✔
2614
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2615
                           [&](const PendingTestCommand& command) {
44✔
2616
                               return command.id == ident;
44✔
2617
                           });
44✔
2618
    if (it == m_pending_test_commands.end()) {
44✔
2619
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2620
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2621
    }
×
2622

22✔
2623
    it->promise.emplace_value(std::string{body});
44✔
2624
    m_pending_test_commands.erase(it);
44✔
2625

22✔
2626
    return Status::OK();
44✔
2627
}
44✔
2628

2629
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2630
{
398✔
2631
    REALM_ASSERT(!m_try_again_activation_timer);
398✔
2632

202✔
2633
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
398✔
2634
                                  error_info.resumption_delay_interval);
398✔
2635
    auto try_again_interval = m_try_again_delay_info.delay_interval();
398✔
2636
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
398✔
2637
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2638
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2639
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2640
        try_again_interval = std::chrono::milliseconds{1000};
20✔
2641
    }
20✔
2642
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
398✔
2643
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
398✔
2644
        if (status == ErrorCodes::OperationAborted)
398✔
2645
            return;
4✔
2646
        else if (!status.is_ok())
394✔
2647
            throw Exception(status);
×
2648

200✔
2649
        m_try_again_activation_timer.reset();
394✔
2650
        cancel_resumption_delay();
394✔
2651
    });
394✔
2652
}
398✔
2653

2654
void Session::clear_resumption_delay_state()
2655
{
42,300✔
2656
    if (m_try_again_activation_timer) {
42,300✔
2657
        logger.debug("Clearing resumption delay state after successful download");
×
2658
        m_try_again_delay_info.reset();
×
2659
    }
×
2660
}
42,300✔
2661

2662
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2663
{
42,314✔
2664
    const SyncProgress& a = m_progress;
42,314✔
2665
    const SyncProgress& b = progress;
42,314✔
2666
    std::string message;
42,314✔
2667
    if (b.latest_server_version.version < a.latest_server_version.version) {
42,314✔
2668
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2669
                               "session (current: %1, received: %2)",
×
2670
                               a.latest_server_version.version, b.latest_server_version.version);
×
2671
    }
×
2672
    if (b.upload.client_version < a.upload.client_version) {
42,314✔
2673
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2674
                               "throughout a session (current: %1, received: %2)",
×
2675
                               a.upload.client_version, b.upload.client_version);
×
2676
    }
×
2677
    if (b.upload.client_version > m_last_version_available) {
42,314✔
2678
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2679
                               "version in existence (current: %1, received: %2)",
×
2680
                               m_last_version_available, b.upload.client_version);
×
2681
    }
×
2682
    if (b.download.server_version < a.download.server_version) {
42,314✔
2683
        message =
×
2684
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2685
                         a.download.server_version, b.download.server_version);
×
2686
    }
×
2687
    if (b.download.server_version > b.latest_server_version.version) {
42,314✔
2688
        message = util::format(
×
2689
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2690
            b.download.server_version, b.latest_server_version.version);
×
2691
    }
×
2692
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
42,314✔
2693
        message = util::format(
×
2694
            "Last integrated client version on the server at the position in the server's history of the download "
×
2695
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2696
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2697
    }
×
2698
    if (b.download.last_integrated_client_version > b.upload.client_version) {
42,314✔
2699
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2700
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2701
                               "on the server (download: %1, upload: %2)",
×
2702
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2703
    }
×
2704
    if (b.download.server_version < b.upload.last_integrated_server_version) {
42,314✔
2705
        message = util::format(
×
2706
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2707
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2708
            b.download.server_version, b.upload.last_integrated_server_version);
×
2709
    }
×
2710

23,788✔
2711
    if (message.empty()) {
42,314✔
2712
        return Status::OK();
42,310✔
2713
    }
42,310✔
2714
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
4✔
2715
}
4✔
2716

2717

2718
void Session::check_for_upload_completion()
2719
{
72,342✔
2720
    REALM_ASSERT_EX(m_state == Active, m_state);
72,342✔
2721
    if (!m_upload_completion_notification_requested) {
72,342✔
2722
        return;
41,716✔
2723
    }
41,716✔
2724

15,100✔
2725
    // during an ongoing client reset operation, we never upload anything
15,100✔
2726
    if (m_client_reset_operation)
30,626✔
2727
        return;
232✔
2728

14,984✔
2729
    // Upload process must have reached end of history
14,984✔
2730
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
30,394✔
2731
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
30,394✔
2732
    if (!scan_complete)
30,394✔
2733
        return;
4,930✔
2734

12,446✔
2735
    // All uploaded changesets must have been acknowledged by the server
12,446✔
2736
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,464✔
2737
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,464✔
2738
    if (!all_uploads_accepted)
25,464✔
2739
        return;
10,806✔
2740

7,244✔
2741
    m_upload_completion_notification_requested = false;
14,658✔
2742
    on_upload_completion(); // Throws
14,658✔
2743
}
14,658✔
2744

2745

2746
void Session::check_for_download_completion()
2747
{
57,744✔
2748
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
57,744✔
2749
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
57,744✔
2750
    if (m_last_download_mark_received == m_last_triggering_download_mark)
57,744✔
2751
        return;
41,902✔
2752
    if (m_last_download_mark_received < m_target_download_mark)
15,842✔
2753
        return;
430✔
2754
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
15,412✔
2755
        return;
×
2756
    m_last_triggering_download_mark = m_target_download_mark;
15,412✔
2757
    if (REALM_UNLIKELY(!m_allow_upload)) {
15,412✔
2758
        // Activate the upload process now, and enable immediate reactivation
1,980✔
2759
        // after a subsequent fast reconnect.
1,980✔
2760
        m_allow_upload = true;
4,156✔
2761
        ensure_enlisted_to_send(); // Throws
4,156✔
2762
    }
4,156✔
2763
    on_download_completion(); // Throws
15,412✔
2764
}
15,412✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc