• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2036

14 Feb 2024 05:06PM UTC coverage: 91.881% (+0.03%) from 91.851%
2036

push

Evergreen

web-flow
Mitigate races in accessing `m_initated` and `m_finalized` in various REALM_ASSERTs (#7338)

93026 of 171514 branches covered (54.24%)

46 of 48 new or added lines in 1 file covered. (95.83%)

66 existing lines in 10 files now uncovered.

235483 of 256292 relevant lines covered (91.88%)

6194496.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.4
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,576✔
51
    m_backoff_state.reset();
1,576✔
52
    scheduled_reset = false;
1,576✔
53
}
1,576✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,404✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,404✔
60
}
3,404✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
5,296✔
65
    if (scheduled_reset) {
5,296✔
66
        reset();
4✔
67
    }
4✔
68

2,698✔
69
    if (!m_backoff_state.triggering_error) {
5,296✔
70
        return std::chrono::milliseconds::zero();
4,132✔
71
    }
4,132✔
72

620✔
73
    switch (*m_backoff_state.triggering_error) {
1,164✔
74
        case ConnectionTerminationReason::closed_voluntarily:
86✔
75
            return std::chrono::milliseconds::zero();
86✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,058✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,058✔
80
                return std::chrono::milliseconds::max();
818✔
81
            }
818✔
82

122✔
83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
240✔
84
            return m_backoff_state.delay_interval();
240✔
85
    }
1,164✔
86
}
1,164✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
3,772✔
92
    util::Uri uri(url); // Throws
3,772✔
93
    uri.canonicalize(); // Throws
3,772✔
94
    std::string userinfo, address_2, port_2;
3,772✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,772✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,772✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,772✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,772✔
99
    if (REALM_UNLIKELY(!good))
3,772✔
100
        return false;
1,712✔
101
    ProtocolEnvelope protocol_2;
3,772✔
102
    port_type port_3;
3,772✔
103
    if (realm_scheme) {
3,772✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
3,772✔
114
        REALM_ASSERT(ws_scheme);
3,772✔
115
        if (uri.get_scheme() == "ws:") {
3,772✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,760✔
117
            port_3 = 80;
3,760✔
118
        }
3,760✔
119
        else {
12✔
120
            protocol_2 = ProtocolEnvelope::wss;
12✔
121
            port_3 = 443;
12✔
122
        }
12✔
123
    }
3,772✔
124
    if (!port_2.empty()) {
3,772✔
125
        std::istringstream in(port_2);    // Throws
3,772✔
126
        in.imbue(std::locale::classic()); // Throws
3,772✔
127
        in >> port_3;
3,772✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,772✔
129
            return false;
1,712✔
130
    }
3,772✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
3,772✔
132

1,712✔
133
    protocol = protocol_2;
3,772✔
134
    address = std::move(address_2);
3,772✔
135
    port = port_3;
3,772✔
136
    path = std::move(path_2);
3,772✔
137
    return true;
3,772✔
138
}
3,772✔
139

140

141
ClientImpl::ClientImpl(ClientConfig config)
142
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
143
    , logger{*logger_ptr}
144
    , m_reconnect_mode{config.reconnect_mode}
145
    , m_connect_timeout{config.connect_timeout}
146
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
147
    , m_ping_keepalive_period{config.ping_keepalive_period}
148
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
149
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
150
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
151
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
152
    , m_dry_run{config.dry_run}
153
    , m_enable_default_port_hack{config.enable_default_port_hack}
154
    , m_disable_upload_compaction{config.disable_upload_compaction}
155
    , m_fix_up_object_ids{config.fix_up_object_ids}
156
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
157
    , m_socket_provider{std::move(config.socket_provider)}
158
    , m_client_protocol{} // Throws
159
    , m_one_connection_per_session{config.one_connection_per_session}
160
    , m_random{}
161
{
9,614✔
162
    // FIXME: Would be better if seeding was up to the application.
4,738✔
163
    util::seed_prng_nondeterministically(m_random); // Throws
9,614✔
164

4,738✔
165
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,614✔
166
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,614✔
167
                 get_current_protocol_version()); // Throws
9,614✔
168
    logger.info("Platform: %1", util::get_platform_info());
9,614✔
169
    const char* build_mode;
9,614✔
170
#if REALM_DEBUG
9,614✔
171
    build_mode = "Debug";
9,614✔
172
#else
173
    build_mode = "Release";
174
#endif
175
    logger.debug("Build mode: %1", build_mode);
9,614✔
176
    logger.debug("Config param: one_connection_per_session = %1",
9,614✔
177
                 config.one_connection_per_session); // Throws
9,614✔
178
    logger.debug("Config param: connect_timeout = %1 ms",
9,614✔
179
                 config.connect_timeout); // Throws
9,614✔
180
    logger.debug("Config param: connection_linger_time = %1 ms",
9,614✔
181
                 config.connection_linger_time); // Throws
9,614✔
182
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,614✔
183
                 config.ping_keepalive_period); // Throws
9,614✔
184
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,614✔
185
                 config.pong_keepalive_timeout); // Throws
9,614✔
186
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,614✔
187
                 config.fast_reconnect_limit); // Throws
9,614✔
188
    logger.debug("Config param: disable_upload_compaction = %1",
9,614✔
189
                 config.disable_upload_compaction); // Throws
9,614✔
190
    logger.debug("Config param: disable_sync_to_disk = %1",
9,614✔
191
                 config.disable_sync_to_disk); // Throws
9,614✔
192
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
9,614✔
193
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,614✔
194
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
9,614✔
195
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
9,614✔
196

4,738✔
197
    if (config.reconnect_mode != ReconnectMode::normal) {
9,614✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
772✔
199
                    "never do this in production!");
772✔
200
    }
772✔
201

4,738✔
202
    if (config.dry_run) {
9,614✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
×
205
    }
×
206

4,738✔
207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,614✔
208

4,738✔
209
    if (m_one_connection_per_session) {
9,614✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
4✔
212
    }
4✔
213

4,738✔
214
    if (config.disable_upload_activation_delay) {
9,614✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

4,738✔
219
    if (config.disable_sync_to_disk) {
9,614✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
×
222
    }
×
223

4,738✔
224
    m_actualize_and_finalize = create_trigger([this](Status status) {
15,800✔
225
        if (status == ErrorCodes::OperationAborted)
15,800✔
226
            return;
×
227
        else if (!status.is_ok())
15,800✔
228
            throw Exception(status);
×
229
        actualize_and_finalize_session_wrappers(); // Throws
15,800✔
230
    });
15,800✔
231
}
9,614✔
232

233

234
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
235
{
169,098✔
236
    REALM_ASSERT(m_socket_provider);
169,098✔
237
    {
169,098✔
238
        std::lock_guard lock(m_drain_mutex);
169,098✔
239
        ++m_outstanding_posts;
169,098✔
240
        m_drained = false;
169,098✔
241
    }
169,098✔
242
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
169,106✔
243
        auto decr_guard = util::make_scope_exit([&]() noexcept {
169,112✔
244
            std::lock_guard lock(m_drain_mutex);
169,112✔
245
            REALM_ASSERT(m_outstanding_posts);
169,112✔
246
            --m_outstanding_posts;
169,112✔
247
            m_drain_cv.notify_all();
169,112✔
248
        });
169,112✔
249
        handler(status);
169,106✔
250
    });
169,106✔
251
}
169,098✔
252

253

254
void ClientImpl::drain_connections()
255
{
9,614✔
256
    logger.debug("Draining connections during sync client shutdown");
9,614✔
257
    for (auto& server_slot_pair : m_server_slots) {
6,074✔
258
        auto& server_slot = server_slot_pair.second;
2,536✔
259

1,200✔
260
        if (server_slot.connection) {
2,536✔
261
            auto& conn = server_slot.connection;
2,412✔
262
            conn->force_close();
2,412✔
263
        }
2,412✔
264
        else {
124✔
265
            for (auto& conn_pair : server_slot.alt_connections) {
62✔
266
                conn_pair.second->force_close();
4✔
267
            }
4✔
268
        }
124✔
269
    }
2,536✔
270
}
9,614✔
271

272

273
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
274
                                                       SyncSocketProvider::FunctionHandler&& handler)
275
{
16,146✔
276
    REALM_ASSERT(m_socket_provider);
16,146✔
277
    {
16,146✔
278
        std::lock_guard lock(m_drain_mutex);
16,146✔
279
        ++m_outstanding_posts;
16,146✔
280
        m_drained = false;
16,146✔
281
    }
16,146✔
282
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,146✔
283
        handler(status);
16,146✔
284

7,804✔
285
        std::lock_guard lock(m_drain_mutex);
16,146✔
286
        REALM_ASSERT(m_outstanding_posts);
16,146✔
287
        --m_outstanding_posts;
16,146✔
288
        m_drain_cv.notify_all();
16,146✔
289
    });
16,146✔
290
}
16,146✔
291

292

293
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
294
{
12,268✔
295
    REALM_ASSERT(m_socket_provider);
12,268✔
296
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,268✔
297
}
12,268✔
298

299
Connection::~Connection()
300
{
2,654✔
301
    if (m_websocket_sentinel) {
2,654✔
302
        m_websocket_sentinel->destroyed = true;
×
303
        m_websocket_sentinel.reset();
×
304
    }
×
305
}
2,654✔
306

307
void Connection::activate()
308
{
2,652✔
309
    REALM_ASSERT(m_on_idle);
2,652✔
310
    m_activated = true;
2,652✔
311
    if (m_num_active_sessions == 0)
2,652✔
312
        m_on_idle->trigger();
×
313
    // We cannot in general connect immediately, because a prior failure to
1,258✔
314
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,258✔
315
    initiate_reconnect_wait(); // Throws
2,652✔
316
}
2,652✔
317

318

319
void Connection::activate_session(std::unique_ptr<Session> sess)
320
{
10,050✔
321
    REALM_ASSERT(sess);
10,050✔
322
    REALM_ASSERT(&sess->m_conn == this);
10,050✔
323
    REALM_ASSERT(!m_force_closed);
10,050✔
324
    Session& sess_2 = *sess;
10,050✔
325
    session_ident_type ident = sess->m_ident;
10,050✔
326
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,050✔
327
    bool was_inserted = p.second;
10,050✔
328
    REALM_ASSERT(was_inserted);
10,050✔
329
    // Save the session ident to the historical list of session idents
4,848✔
330
    m_session_history.insert(ident);
10,050✔
331
    sess_2.activate(); // Throws
10,050✔
332
    if (m_state == ConnectionState::connected) {
10,050✔
333
        bool fast_reconnect = false;
6,986✔
334
        sess_2.connection_established(fast_reconnect); // Throws
6,986✔
335
    }
6,986✔
336
    ++m_num_active_sessions;
10,050✔
337
}
10,050✔
338

339

340
void Connection::initiate_session_deactivation(Session* sess)
341
{
10,050✔
342
    REALM_ASSERT(sess);
10,050✔
343
    REALM_ASSERT(&sess->m_conn == this);
10,050✔
344
    REALM_ASSERT(m_num_active_sessions);
10,050✔
345
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,848✔
346
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,848✔
347
    // decrementing the num active sessions value.
4,848✔
348
    sess->initiate_deactivation(); // Throws
10,050✔
349
    if (sess->m_state == Session::Deactivated) {
10,050✔
350
        finish_session_deactivation(sess);
1,010✔
351
    }
1,010✔
352
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,050✔
353
        if (m_activated && m_state == ConnectionState::disconnected)
4,374✔
354
            m_on_idle->trigger();
358✔
355
    }
4,374✔
356
}
10,050✔
357

358

359
void Connection::cancel_reconnect_delay()
360
{
1,788✔
361
    REALM_ASSERT(m_activated);
1,788✔
362

974✔
363
    if (m_reconnect_delay_in_progress) {
1,788✔
364
        if (m_nonzero_reconnect_delay)
1,568✔
365
            logger.detail("Canceling reconnect delay"); // Throws
788✔
366

864✔
367
        // Cancel the in-progress wait operation by destroying the timer
864✔
368
        // object. Destruction is needed in this case, because a new wait
864✔
369
        // operation might have to be initiated before the previous one
864✔
370
        // completes (its completion handler starts to execute), so the new wait
864✔
371
        // operation must be done on a new timer object.
864✔
372
        m_reconnect_disconnect_timer.reset();
1,568✔
373
        m_reconnect_delay_in_progress = false;
1,568✔
374
        m_reconnect_info.reset();
1,568✔
375
        initiate_reconnect_wait(); // Throws
1,568✔
376
        return;
1,568✔
377
    }
1,568✔
378

110✔
379
    // If we are not disconnected, then we need to make sure the next time we get disconnected
110✔
380
    // that we are allowed to re-connect as quickly as possible.
110✔
381
    //
110✔
382
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
110✔
383
    // backoff/delay state before calculating the next delay, unless a PONG message is received
110✔
384
    // for the urgent PING message we send below.
110✔
385
    //
110✔
386
    // If we get a PONG message for the urgent PING message sent below, then the connection is
110✔
387
    // healthy and we can calculate the next delay normally.
110✔
388
    if (m_state != ConnectionState::disconnected) {
220✔
389
        m_reconnect_info.scheduled_reset = true;
220✔
390
        m_ping_after_scheduled_reset_of_reconnect_info = false;
220✔
391

110✔
392
        schedule_urgent_ping(); // Throws
220✔
393
        return;
220✔
394
    }
220✔
395
    // Nothing to do in this case. The next reconnect attemp will be made as
110✔
396
    // soon as there are any sessions that are both active and unsuspended.
110✔
397
}
220✔
398

399
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
400
{
8,298✔
401
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,298✔
402
    auto ident = sess->m_ident;
8,298✔
403
    m_sessions.erase(ident);
8,298✔
404
    m_session_history.erase(ident);
8,298✔
405
}
8,298✔
406

407
void Connection::force_close()
408
{
2,416✔
409
    if (m_force_closed) {
2,416✔
410
        return;
×
411
    }
×
412

1,142✔
413
    m_force_closed = true;
2,416✔
414

1,142✔
415
    if (m_state != ConnectionState::disconnected) {
2,416✔
416
        voluntary_disconnect();
2,328✔
417
    }
2,328✔
418

1,142✔
419
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,416✔
420
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,416✔
421
        m_reconnect_disconnect_timer.reset();
86✔
422
        m_reconnect_delay_in_progress = false;
86✔
423
        m_disconnect_delay_in_progress = false;
86✔
424
    }
86✔
425

1,142✔
426
    // We must copy any session pointers we want to close to a vector because force_closing
1,142✔
427
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,142✔
428
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,142✔
429
    std::vector<Session*> to_close;
2,416✔
430
    for (auto& session_pair : m_sessions) {
1,214✔
431
        if (session_pair.second->m_state == Session::State::Active) {
142✔
432
            to_close.push_back(session_pair.second.get());
142✔
433
        }
142✔
434
    }
142✔
435

1,142✔
436
    for (auto& sess : to_close) {
1,214✔
437
        sess->force_close();
142✔
438
    }
142✔
439

1,142✔
440
    logger.debug("Force closed idle connection");
2,416✔
441
}
2,416✔
442

443

444
void Connection::websocket_connected_handler(const std::string& protocol)
445
{
3,294✔
446
    if (!protocol.empty()) {
3,294✔
447
        std::string_view expected_prefix =
3,294✔
448
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,960✔
449
        // FIXME: Use std::string_view::begins_with() in C++20.
1,614✔
450
        auto prefix_matches = [&](std::string_view other) {
3,294✔
451
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,294✔
452
        };
3,294✔
453
        if (prefix_matches(expected_prefix)) {
3,294✔
454
            util::MemoryInputStream in;
3,294✔
455
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,294✔
456
            in.imbue(std::locale::classic());
3,294✔
457
            in.unsetf(std::ios_base::skipws);
3,294✔
458
            int value_2 = 0;
3,294✔
459
            in >> value_2;
3,294✔
460
            if (in && in.eof() && value_2 >= 0) {
3,294✔
461
                bool good_version =
3,294✔
462
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,294✔
463
                if (good_version) {
3,294✔
464
                    logger.detail("Negotiated protocol version: %1", value_2);
3,294✔
465
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,614✔
466
                    // will provide the appservices connection ID via a log message.
1,614✔
467
                    // TODO: Remove once the server starts sending the connection ID
1,614✔
468
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,294✔
469
                    m_negotiated_protocol_version = value_2;
3,294✔
470
                    handle_connection_established(); // Throws
3,294✔
471
                    return;
3,294✔
472
                }
3,294✔
473
            }
×
474
        }
3,294✔
475
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
476
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
477
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
478
    }
×
479
    else {
×
480
        close_due_to_client_side_error(
×
481
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
482
            ConnectionTerminationReason::bad_headers_in_http_response);
×
483
    }
×
484
}
3,294✔
485

486

487
bool Connection::websocket_binary_message_received(util::Span<const char> data)
488
{
81,320✔
489
    if (m_force_closed) {
81,320✔
490
        logger.debug("Received binary message after connection was force closed");
×
491
        return false;
×
492
    }
×
493

39,426✔
494
    using sf = SimulatedFailure;
81,320✔
495
    if (sf::check_trigger(sf::sync_client__read_head)) {
81,320✔
496
        close_due_to_client_side_error(
384✔
497
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
384✔
498
            ConnectionTerminationReason::read_or_write_error);
384✔
499
        return bool(m_websocket);
384✔
500
    }
384✔
501

39,216✔
502
    handle_message_received(data);
80,936✔
503
    return bool(m_websocket);
80,936✔
504
}
80,936✔
505

506

507
void Connection::websocket_error_handler()
508
{
490✔
509
    m_websocket_error_received = true;
490✔
510
}
490✔
511

512
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
513
{
602✔
514
    if (m_force_closed) {
602✔
515
        logger.debug("Received websocket close message after connection was force closed");
×
516
        return false;
×
517
    }
×
518
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
602✔
519

328✔
520
    switch (error_code) {
602✔
521
        case WebSocketError::websocket_ok:
60✔
522
            break;
60✔
523
        case WebSocketError::websocket_resolve_failed:
4✔
524
            [[fallthrough]];
4✔
525
        case WebSocketError::websocket_connection_failed: {
20✔
526
            SessionErrorInfo error_info(
20✔
527
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
20✔
528
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
20✔
529
            break;
20✔
530
        }
4✔
531
        case WebSocketError::websocket_read_error:
460✔
532
            [[fallthrough]];
460✔
533
        case WebSocketError::websocket_write_error: {
460✔
534
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
460✔
535
                                         ConnectionTerminationReason::read_or_write_error);
460✔
536
            break;
460✔
537
        }
460✔
538
        case WebSocketError::websocket_going_away:
254✔
539
            [[fallthrough]];
×
540
        case WebSocketError::websocket_protocol_error:
✔
541
            [[fallthrough]];
×
542
        case WebSocketError::websocket_unsupported_data:
✔
543
            [[fallthrough]];
×
544
        case WebSocketError::websocket_invalid_payload_data:
✔
545
            [[fallthrough]];
×
546
        case WebSocketError::websocket_policy_violation:
✔
547
            [[fallthrough]];
×
548
        case WebSocketError::websocket_reserved:
✔
549
            [[fallthrough]];
×
550
        case WebSocketError::websocket_no_status_received:
✔
551
            [[fallthrough]];
×
552
        case WebSocketError::websocket_invalid_extension: {
✔
553
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
554
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
555
            break;
×
556
        }
×
557
        case WebSocketError::websocket_message_too_big: {
4✔
558
            auto message = util::format(
4✔
559
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
560
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
561
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
562
            involuntary_disconnect(std::move(error_info),
4✔
563
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
564
            break;
4✔
565
        }
×
566
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
567
            close_due_to_client_side_error(
10✔
568
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
569
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
570
            break;
10✔
571
        }
×
572
        case WebSocketError::websocket_client_too_old:
✔
573
            [[fallthrough]];
×
574
        case WebSocketError::websocket_client_too_new:
✔
575
            [[fallthrough]];
×
576
        case WebSocketError::websocket_protocol_mismatch: {
✔
577
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
578
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
579
            break;
×
580
        }
×
581
        case WebSocketError::websocket_fatal_error: {
✔
582
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
583
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
584
            break;
×
585
        }
×
586
        case WebSocketError::websocket_forbidden: {
✔
587
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
588
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
589
            involuntary_disconnect(std::move(error_info),
×
590
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
591
            break;
×
592
        }
×
593
        case WebSocketError::websocket_unauthorized: {
36✔
594
            SessionErrorInfo error_info(
36✔
595
                {ErrorCodes::AuthError,
36✔
596
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
597
                IsFatal{false});
36✔
598
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
599
            involuntary_disconnect(std::move(error_info),
36✔
600
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
601
            break;
36✔
602
        }
×
603
        case WebSocketError::websocket_moved_permanently: {
12✔
604
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
605
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
606
            involuntary_disconnect(std::move(error_info),
12✔
607
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
608
            break;
12✔
609
        }
×
610
        case WebSocketError::websocket_abnormal_closure: {
✔
611
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
612
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
613
            involuntary_disconnect(std::move(error_info),
×
614
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
615
            break;
×
616
        }
×
617
        case WebSocketError::websocket_internal_server_error:
✔
618
            [[fallthrough]];
×
619
        case WebSocketError::websocket_retry_error: {
✔
620
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
621
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
622
            break;
×
623
        }
602✔
624
    }
602✔
625

328✔
626
    return bool(m_websocket);
602✔
627
}
602✔
628

629
// Guarantees that handle_reconnect_wait() is never called from within the
630
// execution of initiate_reconnect_wait() (no callback reentrance).
631
void Connection::initiate_reconnect_wait()
632
{
7,624✔
633
    REALM_ASSERT(m_activated);
7,624✔
634
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,624✔
635
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,624✔
636

3,790✔
637
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,790✔
638
    if (m_force_closed) {
7,624✔
639
        return;
2,330✔
640
    }
2,330✔
641

2,698✔
642
    m_reconnect_delay_in_progress = true;
5,294✔
643
    auto delay = m_reconnect_info.delay_interval();
5,294✔
644
    if (delay == std::chrono::milliseconds::max()) {
5,294✔
645
        logger.detail("Reconnection delayed indefinitely"); // Throws
836✔
646
        // Not actually starting a timer corresponds to an infinite wait
458✔
647
        m_nonzero_reconnect_delay = true;
836✔
648
        return;
836✔
649
    }
836✔
650

2,240✔
651
    if (delay == std::chrono::milliseconds::zero()) {
4,458✔
652
        m_nonzero_reconnect_delay = false;
4,220✔
653
    }
4,220✔
654
    else {
238✔
655
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
238✔
656
        m_nonzero_reconnect_delay = true;
238✔
657
    }
238✔
658

2,240✔
659
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,240✔
660
    // we need it to be cancelable in case the connection is terminated before the timer
2,240✔
661
    // callback is run.
2,240✔
662
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,460✔
663
        // If the operation is aborted, the connection object may have been
2,240✔
664
        // destroyed.
2,240✔
665
        if (status != ErrorCodes::OperationAborted)
4,460✔
666
            handle_reconnect_wait(status); // Throws
3,406✔
667
    });                                    // Throws
4,460✔
668
}
4,458✔
669

670

671
void Connection::handle_reconnect_wait(Status status)
672
{
3,406✔
673
    if (!status.is_ok()) {
3,406✔
674
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
675
        throw Exception(status);
×
676
    }
×
677

1,668✔
678
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,406✔
679
    m_reconnect_delay_in_progress = false;
3,406✔
680

1,668✔
681
    if (m_num_active_unsuspended_sessions > 0)
3,406✔
682
        initiate_reconnect(); // Throws
3,402✔
683
}
3,406✔
684

685
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
686
    explicit WebSocketObserverShim(Connection* conn)
687
        : conn(conn)
688
        , sentinel(conn->m_websocket_sentinel)
689
    {
3,404✔
690
    }
3,404✔
691

692
    Connection* conn;
693
    util::bind_ptr<LifecycleSentinel> sentinel;
694

695
    void websocket_connected_handler(const std::string& protocol) override
696
    {
3,294✔
697
        if (sentinel->destroyed) {
3,294✔
698
            return;
×
699
        }
×
700

1,614✔
701
        return conn->websocket_connected_handler(protocol);
3,294✔
702
    }
3,294✔
703

704
    void websocket_error_handler() override
705
    {
490✔
706
        if (sentinel->destroyed) {
490✔
707
            return;
×
708
        }
×
709

270✔
710
        conn->websocket_error_handler();
490✔
711
    }
490✔
712

713
    bool websocket_binary_message_received(util::Span<const char> data) override
714
    {
81,322✔
715
        if (sentinel->destroyed) {
81,322✔
716
            return false;
×
717
        }
×
718

39,426✔
719
        return conn->websocket_binary_message_received(data);
81,322✔
720
    }
81,322✔
721

722
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
723
    {
602✔
724
        if (sentinel->destroyed) {
602✔
725
            return true;
×
726
        }
×
727

328✔
728
        return conn->websocket_closed_handler(was_clean, error_code, msg);
602✔
729
    }
602✔
730
};
731

732
void Connection::initiate_reconnect()
733
{
3,404✔
734
    REALM_ASSERT(m_activated);
3,404✔
735

1,668✔
736
    m_state = ConnectionState::connecting;
3,404✔
737
    report_connection_state_change(ConnectionState::connecting); // Throws
3,404✔
738
    if (m_websocket_sentinel) {
3,404✔
739
        m_websocket_sentinel->destroyed = true;
×
740
    }
×
741
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,404✔
742
    m_websocket.reset();
3,404✔
743

1,668✔
744
    // Watchdog
1,668✔
745
    initiate_connect_wait(); // Throws
3,404✔
746

1,668✔
747
    std::vector<std::string> sec_websocket_protocol;
3,404✔
748
    {
3,404✔
749
        auto protocol_prefix =
3,404✔
750
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,056✔
751
        int min = get_oldest_supported_protocol_version();
3,404✔
752
        int max = get_current_protocol_version();
3,404✔
753
        REALM_ASSERT_3(min, <=, max);
3,404✔
754
        // List protocol version in descending order to ensure that the server
1,668✔
755
        // selects the highest possible version.
1,668✔
756
        for (int version = max; version >= min; --version) {
37,434✔
757
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
34,030✔
758
        }
34,030✔
759
    }
3,404✔
760

1,668✔
761
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,404✔
762
                m_server_endpoint.port, m_http_request_path_prefix);
3,404✔
763

1,668✔
764
    m_websocket_error_received = false;
3,404✔
765
    m_websocket =
3,404✔
766
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,404✔
767
                                            WebSocketEndpoint{
3,404✔
768
                                                m_server_endpoint.address,
3,404✔
769
                                                m_server_endpoint.port,
3,404✔
770
                                                get_http_request_path(),
3,404✔
771
                                                std::move(sec_websocket_protocol),
3,404✔
772
                                                is_ssl(m_server_endpoint.envelope),
3,404✔
773
                                                /// DEPRECATED - The following will be removed in a future release
1,668✔
774
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,404✔
775
                                                m_verify_servers_ssl_certificate,
3,404✔
776
                                                m_ssl_trust_certificate_path,
3,404✔
777
                                                m_ssl_verify_callback,
3,404✔
778
                                                m_proxy_config,
3,404✔
779
                                            });
3,404✔
780
}
3,404✔
781

782

783
void Connection::initiate_connect_wait()
784
{
3,404✔
785
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,668✔
786
    // fully establish the connection (including SSL and WebSocket
1,668✔
787
    // handshakes). Without such a watchdog, connect operations could take very
1,668✔
788
    // long, or even indefinite time.
1,668✔
789
    milliseconds_type time = m_client.m_connect_timeout;
3,404✔
790

1,668✔
791
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,404✔
792
        // If the operation is aborted, the connection object may have been
1,668✔
793
        // destroyed.
1,668✔
794
        if (status != ErrorCodes::OperationAborted)
3,404✔
795
            handle_connect_wait(status); // Throws
×
796
    });                                  // Throws
3,404✔
797
}
3,404✔
798

799

800
void Connection::handle_connect_wait(Status status)
801
{
×
802
    if (!status.is_ok()) {
×
803
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
804
        throw Exception(status);
×
805
    }
×
806

807
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
808
    logger.info("Connect timeout"); // Throws
×
809
    involuntary_disconnect(
×
810
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
811
                         IsFatal{false}},
×
812
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
813
}
×
814

815

816
void Connection::handle_connection_established()
817
{
3,294✔
818
    // Cancel connect timeout watchdog
1,614✔
819
    m_connect_timer.reset();
3,294✔
820

1,614✔
821
    m_state = ConnectionState::connected;
3,294✔
822

1,614✔
823
    milliseconds_type now = monotonic_clock_now();
3,294✔
824
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,294✔
825
    initiate_ping_delay(now);     // Throws
3,294✔
826

1,614✔
827
    bool fast_reconnect = false;
3,294✔
828
    if (m_disconnect_has_occurred) {
3,294✔
829
        milliseconds_type time = now - m_disconnect_time;
858✔
830
        if (time <= m_client.m_fast_reconnect_limit)
858✔
831
            fast_reconnect = true;
858✔
832
    }
858✔
833

1,614✔
834
    for (auto& p : m_sessions) {
4,222✔
835
        Session& sess = *p.second;
4,222✔
836
        sess.connection_established(fast_reconnect); // Throws
4,222✔
837
    }
4,222✔
838

1,614✔
839
    report_connection_state_change(ConnectionState::connected); // Throws
3,294✔
840
}
3,294✔
841

842

843
void Connection::schedule_urgent_ping()
844
{
220✔
845
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
220✔
846
    if (m_ping_delay_in_progress) {
220✔
847
        m_heartbeat_timer.reset();
178✔
848
        m_ping_delay_in_progress = false;
178✔
849
        m_minimize_next_ping_delay = true;
178✔
850
        milliseconds_type now = monotonic_clock_now();
178✔
851
        initiate_ping_delay(now); // Throws
178✔
852
        return;
178✔
853
    }
178✔
854
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
42✔
855
    if (!m_send_ping)
42✔
856
        m_minimize_next_ping_delay = true;
42✔
857
}
42✔
858

859

860
void Connection::initiate_ping_delay(milliseconds_type now)
861
{
3,648✔
862
    REALM_ASSERT(!m_ping_delay_in_progress);
3,648✔
863
    REALM_ASSERT(!m_waiting_for_pong);
3,648✔
864
    REALM_ASSERT(!m_send_ping);
3,648✔
865

1,744✔
866
    milliseconds_type delay = 0;
3,648✔
867
    if (!m_minimize_next_ping_delay) {
3,648✔
868
        delay = m_client.m_ping_keepalive_period;
3,462✔
869
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,668✔
870
        // the first PING message to be sent since the connection was
1,668✔
871
        // established. The purpose of this randomized deduction is to reduce
1,668✔
872
        // the risk of many connections sending PING messages simultaneously to
1,668✔
873
        // the server.
1,668✔
874
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,346✔
875
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,462✔
876
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,462✔
877
        delay -= randomized_deduction;
3,462✔
878
        // Deduct the time spent waiting for PONG
1,668✔
879
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,462✔
880
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,462✔
881
        if (spent_time < delay) {
3,462✔
882
            delay -= spent_time;
3,454✔
883
        }
3,454✔
884
        else {
8✔
885
            delay = 0;
8✔
886
        }
8✔
887
    }
3,462✔
888
    else {
186✔
889
        m_minimize_next_ping_delay = false;
186✔
890
    }
186✔
891

1,744✔
892

1,744✔
893
    m_ping_delay_in_progress = true;
3,648✔
894

1,744✔
895
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,648✔
896
        if (status == ErrorCodes::OperationAborted)
3,648✔
897
            return;
3,454✔
898
        else if (!status.is_ok())
194✔
899
            throw Exception(status);
×
900

68✔
901
        handle_ping_delay();                                    // Throws
194✔
902
    });                                                         // Throws
194✔
903
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,648✔
904
}
3,648✔
905

906

907
void Connection::handle_ping_delay()
908
{
194✔
909
    REALM_ASSERT(m_ping_delay_in_progress);
194✔
910
    m_ping_delay_in_progress = false;
194✔
911
    m_send_ping = true;
194✔
912

68✔
913
    initiate_pong_timeout(); // Throws
194✔
914

68✔
915
    if (m_state == ConnectionState::connected && !m_sending)
194✔
916
        send_next_message(); // Throws
154✔
917
}
194✔
918

919

920
void Connection::initiate_pong_timeout()
921
{
194✔
922
    REALM_ASSERT(!m_ping_delay_in_progress);
194✔
923
    REALM_ASSERT(!m_waiting_for_pong);
194✔
924
    REALM_ASSERT(m_send_ping);
194✔
925

68✔
926
    m_waiting_for_pong = true;
194✔
927
    m_pong_wait_started_at = monotonic_clock_now();
194✔
928

68✔
929
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
194✔
930
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
194✔
931
        if (status == ErrorCodes::OperationAborted)
194✔
932
            return;
182✔
933
        else if (!status.is_ok())
12✔
934
            throw Exception(status);
×
935

6✔
936
        handle_pong_timeout(); // Throws
12✔
937
    });                        // Throws
12✔
938
}
194✔
939

940

941
void Connection::handle_pong_timeout()
942
{
12✔
943
    REALM_ASSERT(m_waiting_for_pong);
12✔
944
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
945
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
946
                                 ConnectionTerminationReason::pong_timeout);
12✔
947
}
12✔
948

949

950
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
951
{
100,532✔
952
    // Stop sending messages if an websocket error was received.
47,242✔
953
    if (m_websocket_error_received)
100,532✔
954
        return;
×
955

47,242✔
956
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
100,532✔
957
        if (sentinel->destroyed) {
100,440✔
958
            return;
1,466✔
959
        }
1,466✔
960
        if (!status.is_ok()) {
98,974✔
961
            if (status != ErrorCodes::Error::OperationAborted) {
×
962
                // Write errors will be handled by the websocket_write_error_handler() callback
963
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
964
            }
×
965
            return;
×
966
        }
×
967
        handle_write_message(); // Throws
98,974✔
968
    });                         // Throws
98,974✔
969
    m_sending_session = sess;
100,532✔
970
    m_sending = true;
100,532✔
971
}
100,532✔
972

973

974
void Connection::handle_write_message()
975
{
98,974✔
976
    m_sending_session->message_sent(); // Throws
98,974✔
977
    if (m_sending_session->m_state == Session::Deactivated) {
98,974✔
978
        finish_session_deactivation(m_sending_session);
146✔
979
    }
146✔
980
    m_sending_session = nullptr;
98,974✔
981
    m_sending = false;
98,974✔
982
    send_next_message(); // Throws
98,974✔
983
}
98,974✔
984

985

986
void Connection::send_next_message()
987
{
160,176✔
988
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
160,176✔
989
    REALM_ASSERT(!m_sending_session);
160,176✔
990
    REALM_ASSERT(!m_sending);
160,176✔
991
    if (m_send_ping) {
160,176✔
992
        send_ping(); // Throws
182✔
993
        return;
182✔
994
    }
182✔
995
    while (!m_sessions_enlisted_to_send.empty()) {
222,490✔
996
        // The state of being connected is not supposed to be able to change
76,360✔
997
        // across this loop thanks to the "no callback reentrance" guarantee
76,360✔
998
        // provided by Websocket::async_write_text(), and friends.
76,360✔
999
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
163,240✔
1000

76,360✔
1001
        Session& sess = *m_sessions_enlisted_to_send.front();
163,240✔
1002
        m_sessions_enlisted_to_send.pop_front();
163,240✔
1003
        sess.send_message(); // Throws
163,240✔
1004

76,360✔
1005
        if (sess.m_state == Session::Deactivated) {
163,240✔
1006
            finish_session_deactivation(&sess);
1,758✔
1007
        }
1,758✔
1008

76,360✔
1009
        // An enlisted session may choose to not send a message. In that case,
76,360✔
1010
        // we should pass the opportunity to the next enlisted session.
76,360✔
1011
        if (m_sending)
163,240✔
1012
            break;
100,744✔
1013
    }
163,240✔
1014
}
159,994✔
1015

1016

1017
void Connection::send_ping()
1018
{
182✔
1019
    REALM_ASSERT(!m_ping_delay_in_progress);
182✔
1020
    REALM_ASSERT(m_waiting_for_pong);
182✔
1021
    REALM_ASSERT(m_send_ping);
182✔
1022

62✔
1023
    m_send_ping = false;
182✔
1024
    if (m_reconnect_info.scheduled_reset)
182✔
1025
        m_ping_after_scheduled_reset_of_reconnect_info = true;
158✔
1026

62✔
1027
    m_last_ping_sent_at = monotonic_clock_now();
182✔
1028
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
182✔
1029
                 m_previous_ping_rtt); // Throws
182✔
1030

62✔
1031
    ClientProtocol& protocol = get_client_protocol();
182✔
1032
    OutputBuffer& out = get_output_buffer();
182✔
1033
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
182✔
1034
    initiate_write_ping(out);                                          // Throws
182✔
1035
    m_ping_sent = true;
182✔
1036
}
182✔
1037

1038

1039
void Connection::initiate_write_ping(const OutputBuffer& out)
1040
{
182✔
1041
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
182✔
1042
        if (sentinel->destroyed) {
182✔
1043
            return;
×
1044
        }
×
1045
        if (!status.is_ok()) {
182✔
1046
            if (status != ErrorCodes::Error::OperationAborted) {
×
1047
                // Write errors will be handled by the websocket_write_error_handler() callback
1048
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1049
            }
×
1050
            return;
×
1051
        }
×
1052
        handle_write_ping(); // Throws
182✔
1053
    });                      // Throws
182✔
1054
    m_sending = true;
182✔
1055
}
182✔
1056

1057

1058
void Connection::handle_write_ping()
1059
{
182✔
1060
    REALM_ASSERT(m_sending);
182✔
1061
    REALM_ASSERT(!m_sending_session);
182✔
1062
    m_sending = false;
182✔
1063
    send_next_message(); // Throws
182✔
1064
}
182✔
1065

1066

1067
void Connection::handle_message_received(util::Span<const char> data)
1068
{
80,934✔
1069
    // parse_message_received() parses the message and calls the proper handler
39,216✔
1070
    // on the Connection object (this).
39,216✔
1071
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
80,934✔
1072
}
80,934✔
1073

1074

1075
void Connection::initiate_disconnect_wait()
1076
{
4,380✔
1077
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,380✔
1078

2,056✔
1079
    if (m_disconnect_delay_in_progress) {
4,380✔
1080
        m_reconnect_disconnect_timer.reset();
1,988✔
1081
        m_disconnect_delay_in_progress = false;
1,988✔
1082
    }
1,988✔
1083

2,056✔
1084
    milliseconds_type time = m_client.m_connection_linger_time;
4,380✔
1085

2,056✔
1086
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,380✔
1087
        // If the operation is aborted, the connection object may have been
2,056✔
1088
        // destroyed.
2,056✔
1089
        if (status != ErrorCodes::OperationAborted)
4,380✔
1090
            handle_disconnect_wait(status); // Throws
16✔
1091
    });                                     // Throws
4,380✔
1092
    m_disconnect_delay_in_progress = true;
4,380✔
1093
}
4,380✔
1094

1095

1096
void Connection::handle_disconnect_wait(Status status)
1097
{
16✔
1098
    if (!status.is_ok()) {
16✔
1099
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1100
        throw Exception(status);
×
1101
    }
×
1102

6✔
1103
    m_disconnect_delay_in_progress = false;
16✔
1104

6✔
1105
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
16✔
1106
    if (m_num_active_unsuspended_sessions == 0) {
16✔
1107
        if (m_client.m_connection_linger_time > 0)
16✔
1108
            logger.detail("Linger time expired"); // Throws
4✔
1109
        voluntary_disconnect();                   // Throws
16✔
1110
        logger.info("Disconnected");              // Throws
16✔
1111
    }
16✔
1112
}
16✔
1113

1114

1115
void Connection::close_due_to_protocol_error(Status status)
1116
{
16✔
1117
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1118
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1119
    involuntary_disconnect(std::move(error_info),
16✔
1120
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1121
}
16✔
1122

1123

1124
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1125
{
394✔
1126
    logger.info("Connection closed due to error: %1", status); // Throws
394✔
1127

216✔
1128
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
394✔
1129
}
394✔
1130

1131

1132
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1133
{
472✔
1134
    logger.info("Connection closed due to transient error: %1", status); // Throws
472✔
1135
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
472✔
1136
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
472✔
1137

260✔
1138
    involuntary_disconnect(std::move(error_info), reason); // Throw
472✔
1139
}
472✔
1140

1141

1142
// Close connection due to error discovered on the server-side, and then
1143
// reported to the client by way of a connection-level ERROR message.
1144
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1145
{
68✔
1146
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
68✔
1147
                int(error_code)); // Throws
68✔
1148

32✔
1149
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
42✔
1150
                                      : ConnectionTerminationReason::server_said_try_again_later;
58✔
1151
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
68✔
1152
                           reason); // Throws
68✔
1153
}
68✔
1154

1155

1156
void Connection::disconnect(const SessionErrorInfo& info)
1157
{
3,404✔
1158
    // Cancel connect timeout watchdog
1,668✔
1159
    m_connect_timer.reset();
3,404✔
1160

1,668✔
1161
    if (m_state == ConnectionState::connected) {
3,404✔
1162
        m_disconnect_time = monotonic_clock_now();
3,294✔
1163
        m_disconnect_has_occurred = true;
3,294✔
1164

1,614✔
1165
        // Sessions that are in the Deactivating state at this time can be
1,614✔
1166
        // immediately discarded, in part because they are no longer enlisted to
1,614✔
1167
        // send. Such sessions will be taken to the Deactivated state by
1,614✔
1168
        // Session::connection_lost(), and then they will be removed from
1,614✔
1169
        // `m_sessions`.
1,614✔
1170
        auto i = m_sessions.begin(), end = m_sessions.end();
3,294✔
1171
        while (i != end) {
6,876✔
1172
            // Prevent invalidation of the main iterator when erasing elements
1,830✔
1173
            auto j = i++;
3,582✔
1174
            Session& sess = *j->second;
3,582✔
1175
            sess.connection_lost(); // Throws
3,582✔
1176
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,582✔
1177
                m_sessions.erase(j);
1,752✔
1178
        }
3,582✔
1179
    }
3,294✔
1180

1,668✔
1181
    change_state_to_disconnected();
3,404✔
1182

1,668✔
1183
    m_ping_delay_in_progress = false;
3,404✔
1184
    m_waiting_for_pong = false;
3,404✔
1185
    m_send_ping = false;
3,404✔
1186
    m_minimize_next_ping_delay = false;
3,404✔
1187
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,404✔
1188
    m_ping_sent = false;
3,404✔
1189
    m_heartbeat_timer.reset();
3,404✔
1190
    m_previous_ping_rtt = 0;
3,404✔
1191

1,668✔
1192
    m_websocket_sentinel->destroyed = true;
3,404✔
1193
    m_websocket_sentinel.reset();
3,404✔
1194
    m_websocket.reset();
3,404✔
1195
    m_input_body_buffer.reset();
3,404✔
1196
    m_sending_session = nullptr;
3,404✔
1197
    m_sessions_enlisted_to_send.clear();
3,404✔
1198
    m_sending = false;
3,404✔
1199

1,668✔
1200
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,404✔
1201
    initiate_reconnect_wait();                                           // Throws
3,404✔
1202
}
3,404✔
1203

1204
bool Connection::is_flx_sync_connection() const noexcept
1205
{
110,082✔
1206
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
110,082✔
1207
}
110,082✔
1208

1209
void Connection::receive_pong(milliseconds_type timestamp)
1210
{
176✔
1211
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
176✔
1212

58✔
1213
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
176✔
1214
    if (REALM_UNLIKELY(!legal_at_this_time)) {
176✔
1215
        close_due_to_protocol_error(
×
1216
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1217
        return;
×
1218
    }
×
1219

58✔
1220
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
176✔
1221
        close_due_to_protocol_error(
×
1222
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1223
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1224
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1225
        return;
×
1226
    }
×
1227

58✔
1228
    milliseconds_type now = monotonic_clock_now();
176✔
1229
    milliseconds_type round_trip_time = now - timestamp;
176✔
1230
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
176✔
1231
    m_previous_ping_rtt = round_trip_time;
176✔
1232

58✔
1233
    // If this PONG message is a response to a PING mesage that was sent after
58✔
1234
    // the last invocation of cancel_reconnect_delay(), then the connection is
58✔
1235
    // still good, and we do not have to skip the next reconnect delay.
58✔
1236
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
176✔
1237
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
150✔
1238
        m_ping_after_scheduled_reset_of_reconnect_info = false;
150✔
1239
        m_reconnect_info.scheduled_reset = false;
150✔
1240
    }
150✔
1241

58✔
1242
    m_heartbeat_timer.reset();
176✔
1243
    m_waiting_for_pong = false;
176✔
1244

58✔
1245
    initiate_ping_delay(now); // Throws
176✔
1246

58✔
1247
    if (m_client.m_roundtrip_time_handler)
176✔
1248
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1249
}
176✔
1250

1251
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1252
{
73,828✔
1253
    if (session_ident == 0) {
73,828✔
1254
        return nullptr;
×
1255
    }
×
1256

36,376✔
1257
    auto* sess = get_session(session_ident);
73,828✔
1258
    if (REALM_LIKELY(sess)) {
73,830✔
1259
        return sess;
73,830✔
1260
    }
73,830✔
1261
    // Check the history to see if the message received was for a previous session
2,147,483,647✔
1262
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,647!
1263
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1264
        close_due_to_protocol_error(
×
1265
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1266
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1267
                          session_ident)});
×
1268
    }
×
1269
    else {
2,147,483,647✔
1270
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1271
                     session_ident); // Throws
2,147,483,647✔
1272
    }
2,147,483,647✔
1273
    return nullptr;
2,147,483,647✔
1274
}
2,147,483,647✔
1275

1276
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1277
{
758✔
1278
    Session* sess = nullptr;
758✔
1279
    if (session_ident != 0) {
758✔
1280
        sess = find_and_validate_session(session_ident, "ERROR");
686✔
1281
        if (REALM_UNLIKELY(!sess)) {
686✔
1282
            return;
×
1283
        }
×
1284
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
686✔
1285
            close_due_to_protocol_error(std::move(status)); // Throws
×
1286
            return;
×
1287
        }
×
1288

338✔
1289
        if (sess->m_state == Session::Deactivated) {
686✔
1290
            finish_session_deactivation(sess);
×
1291
        }
×
1292
        return;
686✔
1293
    }
686✔
1294

34✔
1295
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
72✔
1296
                info.message, info.raw_error_code, info.is_fatal, session_ident,
72✔
1297
                info.server_requests_action); // Throws
72✔
1298

34✔
1299
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
72✔
1300
    if (REALM_LIKELY(known_error_code)) {
72✔
1301
        ProtocolError error_code = ProtocolError(info.raw_error_code);
68✔
1302
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
68✔
1303
            close_due_to_server_side_error(error_code, info); // Throws
68✔
1304
            return;
68✔
1305
        }
68✔
1306
        close_due_to_protocol_error(
×
1307
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1308
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1309
                          info.raw_error_code)});
×
1310
    }
×
1311
    else {
4✔
1312
        close_due_to_protocol_error(
4✔
1313
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1314
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1315
    }
4✔
1316
}
72✔
1317

1318

1319
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1320
                                             session_ident_type session_ident)
1321
{
20✔
1322
    if (session_ident == 0) {
20✔
1323
        return close_due_to_protocol_error(
×
1324
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1325
    }
×
1326

10✔
1327
    if (!is_flx_sync_connection()) {
20✔
1328
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1329
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1330
    }
×
1331

10✔
1332
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1333
    if (REALM_UNLIKELY(!sess)) {
20✔
1334
        return;
×
1335
    }
×
1336

10✔
1337
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1338
        close_due_to_protocol_error(std::move(status));
×
1339
    }
×
1340
}
20✔
1341

1342

1343
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1344
{
3,510✔
1345
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,510✔
1346
    if (REALM_UNLIKELY(!sess)) {
3,510✔
1347
        return;
×
1348
    }
×
1349

1,676✔
1350
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,510✔
1351
        close_due_to_protocol_error(std::move(status)); // Throws
×
1352
}
3,510✔
1353

1354
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1355
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1356
                                          DownloadBatchState batch_state,
1357
                                          const ReceivedChangesets& received_changesets)
1358
{
46,656✔
1359
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
46,656✔
1360
    if (REALM_UNLIKELY(!sess)) {
46,656✔
1361
        return;
×
1362
    }
×
1363

24,230✔
1364
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
46,656✔
1365
                                                     received_changesets);
46,656✔
1366
        !status.is_ok()) {
46,656✔
1367
        close_due_to_protocol_error(std::move(status));
×
1368
    }
×
1369
}
46,656✔
1370

1371
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1372
{
17,524✔
1373
    Session* sess = find_and_validate_session(session_ident, "MARK");
17,524✔
1374
    if (REALM_UNLIKELY(!sess)) {
17,524✔
1375
        return;
×
1376
    }
×
1377

7,978✔
1378
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
17,524✔
1379
        close_due_to_protocol_error(std::move(status)); // Throws
12✔
1380
}
17,524✔
1381

1382

1383
void Connection::receive_unbound_message(session_ident_type session_ident)
1384
{
5,384✔
1385
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
5,384✔
1386
    if (REALM_UNLIKELY(!sess)) {
5,384✔
1387
        return;
×
1388
    }
×
1389

2,120✔
1390
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
5,384✔
1391
        close_due_to_protocol_error(std::move(status)); // Throws
×
1392
        return;
×
1393
    }
×
1394

2,120✔
1395
    if (sess->m_state == Session::Deactivated) {
5,384✔
1396
        finish_session_deactivation(sess);
5,384✔
1397
    }
5,384✔
1398
}
5,384✔
1399

1400

1401
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1402
                                               std::string_view body)
1403
{
52✔
1404
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
52✔
1405
    if (REALM_UNLIKELY(!sess)) {
52✔
1406
        return;
×
1407
    }
×
1408

26✔
1409
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
52✔
1410
        close_due_to_protocol_error(std::move(status));
×
1411
    }
×
1412
}
52✔
1413

1414

1415
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1416
                                            std::string_view message)
1417
{
6,860✔
1418
    std::string prefix;
6,860✔
1419
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
6,860✔
1420
        prefix = util::format("Server[%1]", m_appservices_coid);
6,860✔
1421
    }
6,860✔
1422
    else {
×
1423
        prefix = "Server";
×
1424
    }
×
1425

2,746✔
1426
    if (session_ident != 0) {
6,860✔
1427
        if (auto sess = get_session(session_ident)) {
5,084✔
1428
            sess->logger.log(level, "%1 log: %2", prefix, message);
5,084✔
1429
            return;
5,084✔
1430
        }
5,084✔
1431

1432
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1433
        return;
×
1434
    }
×
1435

868✔
1436
    logger.log(level, "%1 log: %2", prefix, message);
1,776✔
1437
}
1,776✔
1438

1439

1440
void Connection::receive_appservices_request_id(std::string_view coid)
1441
{
5,068✔
1442
    // Only set once per connection
2,480✔
1443
    if (!coid.empty() && m_appservices_coid.empty()) {
5,068✔
1444
        m_appservices_coid = coid;
2,416✔
1445
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,416✔
1446
    }
2,416✔
1447
}
5,068✔
1448

1449

1450
void Connection::handle_protocol_error(Status status)
1451
{
×
1452
    close_due_to_protocol_error(std::move(status));
×
1453
}
×
1454

1455

1456
// Sessions are guaranteed to be granted the opportunity to send a message in
1457
// the order that they enlist. Note that this is important to ensure
1458
// nonoverlapping communication with the server for consecutive sessions
1459
// associated with the same Realm file.
1460
//
1461
// CAUTION: The specified session may get destroyed before this function
1462
// returns, but only if its Session::send_message() puts it into the Deactivated
1463
// state.
1464
void Connection::enlist_to_send(Session* sess)
1465
{
164,754✔
1466
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
164,754✔
1467
    m_sessions_enlisted_to_send.push_back(sess); // Throws
164,754✔
1468
    if (!m_sending)
164,754✔
1469
        send_next_message(); // Throws
60,852✔
1470
}
164,754✔
1471

1472

1473
std::string Connection::get_active_appservices_connection_id()
1474
{
72✔
1475
    return m_appservices_coid;
72✔
1476
}
72✔
1477

1478
void Session::cancel_resumption_delay()
1479
{
3,392✔
1480
    REALM_ASSERT_EX(m_state == Active, m_state);
3,392✔
1481

1,854✔
1482
    if (!m_suspended)
3,392✔
1483
        return;
3,328✔
1484

30✔
1485
    m_suspended = false;
64✔
1486

30✔
1487
    logger.debug("Resumed"); // Throws
64✔
1488

30✔
1489
    if (unbind_process_complete())
64✔
1490
        initiate_rebind(); // Throws
38✔
1491

30✔
1492
    m_conn.one_more_active_unsuspended_session(); // Throws
64✔
1493
    if (m_try_again_activation_timer) {
64✔
1494
        m_try_again_activation_timer.reset();
8✔
1495
    }
8✔
1496

30✔
1497
    on_resumed(); // Throws
64✔
1498
}
64✔
1499

1500

1501
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1502
                                                 std::vector<ProtocolErrorInfo>* out)
1503
{
21,612✔
1504
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,612✔
1505
        return;
21,568✔
1506
    }
21,568✔
1507

22✔
1508
#ifdef REALM_DEBUG
44✔
1509
    REALM_ASSERT_DEBUG(
44✔
1510
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
44✔
1511
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
44✔
1512
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
44✔
1513
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1514
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1515
                       }));
44✔
1516
#endif
44✔
1517

22✔
1518
    while (!m_pending_compensating_write_errors.empty() &&
88✔
1519
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1520
               changesets.back().version) {
44✔
1521
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1522
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1523
        out->push_back(std::move(cur_error));
44✔
1524
        m_pending_compensating_write_errors.pop_front();
44✔
1525
    }
44✔
1526
}
44✔
1527

1528

1529
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1530
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1531
                                   DownloadBatchState download_batch_state)
1532
{
44,194✔
1533
    auto& history = get_history();
44,194✔
1534
    if (received_changesets.empty()) {
44,194✔
1535
        if (download_batch_state == DownloadBatchState::MoreToCome) {
22,560✔
1536
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1537
                                       "received empty download message that was not the last in batch",
×
1538
                                       ProtocolError::bad_progress);
×
1539
        }
×
1540
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
22,560✔
1541
        return;
22,560✔
1542
    }
22,560✔
1543

11,856✔
1544
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,634✔
1545
    auto transact = get_db()->start_read();
21,634✔
1546
    history.integrate_server_changesets(
21,634✔
1547
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,634✔
1548
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,622✔
1549
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,612✔
1550
        }); // Throws
21,612✔
1551
    if (received_changesets.size() == 1) {
21,634✔
1552
        logger.debug("1 remote changeset integrated, producing client version %1",
14,680✔
1553
                     version_info.sync_version.version); // Throws
14,680✔
1554
    }
14,680✔
1555
    else {
6,954✔
1556
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,954✔
1557
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,954✔
1558
    }
6,954✔
1559

11,856✔
1560
    for (const auto& pending_error : pending_compensating_write_errors) {
11,878✔
1561
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1562
                    pending_error.compensating_write_rejected_client_version,
44✔
1563
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1564
        try {
44✔
1565
            on_connection_state_changed(
44✔
1566
                m_conn.get_state(),
44✔
1567
                SessionErrorInfo{pending_error,
44✔
1568
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1569
                                                          pending_error.message)});
44✔
1570
        }
44✔
1571
        catch (...) {
22✔
1572
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1573
        }
×
1574
    }
44✔
1575
}
21,634✔
1576

1577

1578
void Session::on_integration_failure(const IntegrationException& error)
1579
{
48✔
1580
    REALM_ASSERT_EX(m_state == Active, m_state);
48✔
1581
    REALM_ASSERT(!m_client_error && !m_error_to_send);
48✔
1582
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
48✔
1583

24✔
1584
    m_client_error = util::make_optional<IntegrationException>(error);
48✔
1585
    m_error_to_send = true;
48✔
1586
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
48✔
1587
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
48✔
1588
    // Surface the error to the user otherwise is lost.
24✔
1589
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
48✔
1590

24✔
1591
    // Since the deactivation process has not been initiated, the UNBIND
24✔
1592
    // message cannot have been sent unless an ERROR message was received.
24✔
1593
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
48✔
1594
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
48✔
1595
        ensure_enlisted_to_send(); // Throws
44✔
1596
    }
44✔
1597
}
48✔
1598

1599
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1600
{
45,918✔
1601
    REALM_ASSERT_EX(m_state == Active, m_state);
45,918✔
1602
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
45,918✔
1603
    m_download_progress = progress.download;
45,918✔
1604
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
45,918✔
1605
    m_progress = progress;
45,918✔
1606
    if (upload_progressed) {
45,918✔
1607
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
33,190✔
1608
            if (progress.upload.client_version > m_upload_progress.client_version)
14,788✔
1609
                m_upload_progress = progress.upload;
1,644✔
1610
            m_last_version_selected_for_upload = progress.upload.client_version;
14,788✔
1611
        }
14,788✔
1612

16,674✔
1613
        check_for_upload_completion();
33,190✔
1614
    }
33,190✔
1615

23,804✔
1616
    do_recognize_sync_version(client_version); // Allows upload process to resume
45,918✔
1617
    check_for_download_completion();           // Throws
45,918✔
1618

23,804✔
1619
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,804✔
1620
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
45,918✔
1621
        auto& flx_subscription_store = *get_flx_subscription_store();
2,782✔
1622
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,782✔
1623
    }
2,782✔
1624

23,804✔
1625
    // Since the deactivation process has not been initiated, the UNBIND
23,804✔
1626
    // message cannot have been sent unless an ERROR message was received.
23,804✔
1627
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
45,918✔
1628
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
45,918✔
1629
        ensure_enlisted_to_send(); // Throws
45,906✔
1630
    }
45,906✔
1631
}
45,918✔
1632

1633

1634
Session::~Session()
1635
{
10,050✔
1636
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,848✔
1637
}
10,050✔
1638

1639

1640
std::string Session::make_logger_prefix(session_ident_type ident)
1641
{
10,050✔
1642
    std::ostringstream out;
10,050✔
1643
    out.imbue(std::locale::classic());
10,050✔
1644
    out << "Session[" << ident << "]: "; // Throws
10,050✔
1645
    return out.str();                    // Throws
10,050✔
1646
}
10,050✔
1647

1648

1649
void Session::activate()
1650
{
10,046✔
1651
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,046✔
1652

4,848✔
1653
    logger.debug("Activating"); // Throws
10,046✔
1654

4,848✔
1655
    bool has_pending_client_reset = false;
10,046✔
1656
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,050✔
1657
        bool file_exists = util::File::exists(get_realm_path());
10,050✔
1658
        m_performing_client_reset = get_client_reset_config().has_value();
10,050✔
1659

4,848✔
1660
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
10,050✔
1661
        if (!m_performing_client_reset) {
10,050✔
1662
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,686✔
1663
                                     &has_pending_client_reset); // Throws
9,686✔
1664
        }
9,686✔
1665
    }
10,050✔
1666
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,046✔
1667
                 m_client_file_ident.salt); // Throws
10,046✔
1668
    m_upload_progress = m_progress.upload;
10,046✔
1669
    m_last_version_selected_for_upload = m_upload_progress.client_version;
10,046✔
1670
    m_download_progress = m_progress.download;
10,046✔
1671
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,046✔
1672

4,848✔
1673
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
10,046✔
1674
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,046✔
1675
    logger.debug("progress_download_client_version = %1",
10,046✔
1676
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,046✔
1677
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,046✔
1678
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,046✔
1679

4,848✔
1680
    reset_protocol_state();
10,046✔
1681
    m_state = Active;
10,046✔
1682

4,848✔
1683
    call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
10,046✔
1684
                    DownloadBatchState::SteadyState, 0);
10,046✔
1685

4,848✔
1686
    REALM_ASSERT(!m_suspended);
10,046✔
1687
    m_conn.one_more_active_unsuspended_session(); // Throws
10,046✔
1688

4,848✔
1689
    try {
10,046✔
1690
        process_pending_flx_bootstrap();
10,046✔
1691
    }
10,046✔
1692
    catch (const IntegrationException& error) {
4,848✔
1693
        on_integration_failure(error);
×
1694
    }
×
1695
    catch (...) {
4,850✔
1696
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1697
    }
4✔
1698

4,848✔
1699
    if (has_pending_client_reset) {
10,050✔
1700
        handle_pending_client_reset_acknowledgement();
18✔
1701
    }
18✔
1702
}
10,050✔
1703

1704

1705
// The caller (Connection) must discard the session if the session has become
1706
// deactivated upon return.
1707
void Session::initiate_deactivation()
1708
{
10,048✔
1709
    REALM_ASSERT_EX(m_state == Active, m_state);
10,048✔
1710

4,848✔
1711
    logger.debug("Initiating deactivation"); // Throws
10,048✔
1712

4,848✔
1713
    m_state = Deactivating;
10,048✔
1714

4,848✔
1715
    if (!m_suspended)
10,048✔
1716
        m_conn.one_less_active_unsuspended_session(); // Throws
9,444✔
1717

4,848✔
1718
    if (m_enlisted_to_send) {
10,048✔
1719
        REALM_ASSERT(!unbind_process_complete());
4,782✔
1720
        return;
4,782✔
1721
    }
4,782✔
1722

2,396✔
1723
    // Deactivate immediately if the BIND message has not yet been sent and the
2,396✔
1724
    // session is not enlisted to send, or if the unbinding process has already
2,396✔
1725
    // completed.
2,396✔
1726
    if (!m_bind_message_sent || unbind_process_complete()) {
5,266✔
1727
        complete_deactivation(); // Throws
1,010✔
1728
        // Life cycle state is now Deactivated
460✔
1729
        return;
1,010✔
1730
    }
1,010✔
1731

1,936✔
1732
    // Ready to send the UNBIND message, if it has not already been sent
1,936✔
1733
    if (!m_unbind_message_sent) {
4,256✔
1734
        enlist_to_send(); // Throws
4,030✔
1735
        return;
4,030✔
1736
    }
4,030✔
1737
}
4,256✔
1738

1739

1740
void Session::complete_deactivation()
1741
{
10,050✔
1742
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,050✔
1743
    m_state = Deactivated;
10,050✔
1744

4,848✔
1745
    logger.debug("Deactivation completed"); // Throws
10,050✔
1746
}
10,050✔
1747

1748

1749
// Called by the associated Connection object when this session is granted an
1750
// opportunity to send a message.
1751
//
1752
// The caller (Connection) must discard the session if the session has become
1753
// deactivated upon return.
1754
void Session::send_message()
1755
{
163,244✔
1756
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
163,244✔
1757
    REALM_ASSERT(m_enlisted_to_send);
163,244✔
1758
    m_enlisted_to_send = false;
163,244✔
1759
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
163,244✔
1760
        // Deactivation has been initiated. If the UNBIND message has not been
4,342✔
1761
        // sent yet, there is no point in sending it. Instead, we can let the
4,342✔
1762
        // deactivation process complete.
4,342✔
1763
        if (!m_bind_message_sent) {
9,046✔
1764
            return complete_deactivation(); // Throws
1,758✔
1765
            // Life cycle state is now Deactivated
1,360✔
1766
        }
1,758✔
1767

2,982✔
1768
        // Session life cycle state is Deactivating or the unbinding process has
2,982✔
1769
        // been initiated by a session specific ERROR message
2,982✔
1770
        if (!m_unbind_message_sent)
7,288✔
1771
            send_unbind_message(); // Throws
7,288✔
1772
        return;
7,288✔
1773
    }
7,288✔
1774

72,018✔
1775
    // Session life cycle state is Active and the unbinding process has
72,018✔
1776
    // not been initiated
72,018✔
1777
    REALM_ASSERT(!m_unbind_message_sent);
154,198✔
1778

72,018✔
1779
    if (!m_bind_message_sent)
154,198✔
1780
        return send_bind_message(); // Throws
9,472✔
1781

67,828✔
1782
    if (!m_ident_message_sent) {
144,726✔
1783
        if (have_client_file_ident())
8,198✔
1784
            send_ident_message(); // Throws
8,198✔
1785
        return;
8,198✔
1786
    }
8,198✔
1787

64,428✔
1788
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
136,528✔
1789
                                                      [](const PendingTestCommand& command) {
64,482✔
1790
                                                          return command.pending;
108✔
1791
                                                      });
108✔
1792
    if (has_pending_test_command) {
136,528✔
1793
        return send_test_command_message();
52✔
1794
    }
52✔
1795

64,402✔
1796
    if (m_error_to_send)
136,476✔
1797
        return send_json_error_message(); // Throws
36✔
1798

64,388✔
1799
    // Stop sending upload, mark and query messages when the client detects an error.
64,388✔
1800
    if (m_client_error) {
136,440✔
1801
        return;
22✔
1802
    }
22✔
1803

64,378✔
1804
    if (m_target_download_mark > m_last_download_mark_sent)
136,418✔
1805
        return send_mark_message(); // Throws
18,314✔
1806

55,996✔
1807
    auto is_upload_allowed = [&]() -> bool {
118,108✔
1808
        if (!m_is_flx_sync_session) {
118,108✔
1809
            return true;
105,018✔
1810
        }
105,018✔
1811

6,858✔
1812
        auto migration_store = get_migration_store();
13,090✔
1813
        if (!migration_store) {
13,090✔
1814
            return true;
×
1815
        }
×
1816

6,858✔
1817
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
13,090✔
1818
        if (!sentinel_query_version) {
13,090✔
1819
            return true;
13,068✔
1820
        }
13,068✔
1821

10✔
1822
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
10✔
1823
        return m_last_sent_flx_query_version != *sentinel_query_version;
22✔
1824
    };
22✔
1825

55,996✔
1826
    if (!is_upload_allowed()) {
118,104✔
1827
        return;
12✔
1828
    }
12✔
1829

55,990✔
1830
    auto check_pending_flx_version = [&]() -> bool {
118,100✔
1831
        if (!m_is_flx_sync_session) {
118,100✔
1832
            return false;
105,020✔
1833
        }
105,020✔
1834

6,854✔
1835
        if (!m_allow_upload) {
13,080✔
1836
            return false;
2,602✔
1837
        }
2,602✔
1838

5,372✔
1839
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
10,478✔
1840

5,372✔
1841
        if (!m_pending_flx_sub_set) {
10,478✔
1842
            return false;
8,634✔
1843
        }
8,634✔
1844

922✔
1845
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,844✔
1846
    };
1,844✔
1847

55,990✔
1848
    if (check_pending_flx_version()) {
118,092✔
1849
        return send_query_change_message(); // throws
1,056✔
1850
    }
1,056✔
1851

55,462✔
1852
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
117,036✔
1853
        return send_upload_message(); // Throws
56,332✔
1854
    }
56,332✔
1855
}
117,036✔
1856

1857

1858
void Session::send_bind_message()
1859
{
9,472✔
1860
    REALM_ASSERT_EX(m_state == Active, m_state);
9,472✔
1861

4,190✔
1862
    session_ident_type session_ident = m_ident;
9,472✔
1863
    bool need_client_file_ident = !have_client_file_ident();
9,472✔
1864
    const bool is_subserver = false;
9,472✔
1865

4,190✔
1866

4,190✔
1867
    ClientProtocol& protocol = m_conn.get_client_protocol();
9,472✔
1868
    int protocol_version = m_conn.get_negotiated_protocol_version();
9,472✔
1869
    OutputBuffer& out = m_conn.get_output_buffer();
9,472✔
1870
    // Discard the token since it's ignored by the server.
4,190✔
1871
    std::string empty_access_token;
9,472✔
1872
    if (m_is_flx_sync_session) {
9,472✔
1873
        nlohmann::json bind_json_data;
1,350✔
1874
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,350✔
1875
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1876
        }
60✔
1877
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,350✔
1878
        auto schema_version = get_schema_version();
1,350✔
1879
        // Send 0 if schema is not versioned.
676✔
1880
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,348✔
1881
        if (logger.would_log(util::Logger::Level::debug)) {
1,350✔
1882
            std::string json_data_dump;
1,350✔
1883
            if (!bind_json_data.empty()) {
1,350✔
1884
                json_data_dump = bind_json_data.dump();
1,350✔
1885
            }
1,350✔
1886
            logger.debug(
1,350✔
1887
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,350✔
1888
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,350✔
1889
        }
1,350✔
1890
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,350✔
1891
                                       need_client_file_ident, is_subserver); // Throws
1,350✔
1892
    }
1,350✔
1893
    else {
8,122✔
1894
        std::string server_path = get_virt_path();
8,122✔
1895
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
8,122✔
1896
                     session_ident, need_client_file_ident, is_subserver, server_path);
8,122✔
1897
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
8,122✔
1898
                                       need_client_file_ident, is_subserver); // Throws
8,122✔
1899
    }
8,122✔
1900
    m_conn.initiate_write_message(out, this); // Throws
9,472✔
1901

4,190✔
1902
    m_bind_message_sent = true;
9,472✔
1903
    call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
9,472✔
1904
                    DownloadBatchState::SteadyState, 0);
9,472✔
1905

4,190✔
1906
    // Ready to send the IDENT message if the file identifier pair is already
4,190✔
1907
    // available.
4,190✔
1908
    if (!need_client_file_ident)
9,472✔
1909
        enlist_to_send(); // Throws
4,870✔
1910
}
9,472✔
1911

1912

1913
void Session::send_ident_message()
1914
{
8,198✔
1915
    REALM_ASSERT_EX(m_state == Active, m_state);
8,198✔
1916
    REALM_ASSERT(m_bind_message_sent);
8,198✔
1917
    REALM_ASSERT(!m_unbind_message_sent);
8,198✔
1918
    REALM_ASSERT(have_client_file_ident());
8,198✔
1919

3,400✔
1920

3,400✔
1921
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,198✔
1922
    OutputBuffer& out = m_conn.get_output_buffer();
8,198✔
1923
    session_ident_type session_ident = m_ident;
8,198✔
1924

3,400✔
1925
    if (m_is_flx_sync_session) {
8,198✔
1926
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,280✔
1927
        const auto active_query_body = active_query_set.to_ext_json();
1,280✔
1928
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,280✔
1929
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,280✔
1930
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,280✔
1931
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,280✔
1932
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,280✔
1933
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,280✔
1934
                     active_query_body); // Throws
1,280✔
1935
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,280✔
1936
                                        active_query_set.version(), active_query_body); // Throws
1,280✔
1937
        m_last_sent_flx_query_version = active_query_set.version();
1,280✔
1938
    }
1,280✔
1939
    else {
6,918✔
1940
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,918✔
1941
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,918✔
1942
                     "latest_server_version_salt=%6)",
6,918✔
1943
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,918✔
1944
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,918✔
1945
                     m_progress.latest_server_version.salt);                                  // Throws
6,918✔
1946
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
6,918✔
1947
    }
6,918✔
1948
    m_conn.initiate_write_message(out, this); // Throws
8,198✔
1949

3,400✔
1950
    m_ident_message_sent = true;
8,198✔
1951

3,400✔
1952
    // Other messages may be waiting to be sent
3,400✔
1953
    enlist_to_send(); // Throws
8,198✔
1954
}
8,198✔
1955

1956
void Session::send_query_change_message()
1957
{
1,056✔
1958
    REALM_ASSERT_EX(m_state == Active, m_state);
1,056✔
1959
    REALM_ASSERT(m_ident_message_sent);
1,056✔
1960
    REALM_ASSERT(!m_unbind_message_sent);
1,056✔
1961
    REALM_ASSERT(m_pending_flx_sub_set);
1,056✔
1962
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,056✔
1963

528✔
1964
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,056✔
1965
        return;
×
1966
    }
×
1967

528✔
1968
    auto sub_store = get_flx_subscription_store();
1,056✔
1969
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,056✔
1970
    auto latest_queries = latest_sub_set.to_ext_json();
1,056✔
1971
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,056✔
1972
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,056✔
1973

528✔
1974
    OutputBuffer& out = m_conn.get_output_buffer();
1,056✔
1975
    session_ident_type session_ident = get_ident();
1,056✔
1976
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,056✔
1977
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,056✔
1978
    m_conn.initiate_write_message(out, this);
1,056✔
1979

528✔
1980
    m_last_sent_flx_query_version = latest_sub_set.version();
1,056✔
1981

528✔
1982
    request_download_completion_notification();
1,056✔
1983
}
1,056✔
1984

1985
void Session::send_upload_message()
1986
{
56,334✔
1987
    REALM_ASSERT_EX(m_state == Active, m_state);
56,334✔
1988
    REALM_ASSERT(m_ident_message_sent);
56,334✔
1989
    REALM_ASSERT(!m_unbind_message_sent);
56,334✔
1990

27,830✔
1991
    if (REALM_UNLIKELY(get_client().is_dry_run()))
56,334✔
1992
        return;
27,830✔
1993

27,830✔
1994
    version_type target_upload_version = m_last_version_available;
56,334✔
1995
    if (m_pending_flx_sub_set) {
56,334✔
1996
        REALM_ASSERT(m_is_flx_sync_session);
788✔
1997
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
788✔
1998
    }
788✔
1999

27,830✔
2000
    std::vector<UploadChangeset> uploadable_changesets;
56,334✔
2001
    version_type locked_server_version = 0;
56,334✔
2002
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
56,334✔
2003
                                             locked_server_version); // Throws
56,334✔
2004

27,830✔
2005
    if (uploadable_changesets.empty()) {
56,334✔
2006
        // Nothing more to upload right now
14,094✔
2007
        check_for_upload_completion(); // Throws
29,850✔
2008
        // If we need to limit upload up to some version other than the last client version available and there are no
14,094✔
2009
        // changes to upload, then there is no need to send an empty message.
14,094✔
2010
        if (m_pending_flx_sub_set) {
29,850✔
2011
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
216✔
2012
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
216✔
2013
            // Other messages may be waiting to be sent
108✔
2014
            return enlist_to_send(); // Throws
216✔
2015
        }
216✔
2016
    }
26,484✔
2017
    else {
26,484✔
2018
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
26,484✔
2019
    }
26,484✔
2020

27,830✔
2021
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
56,226✔
2022
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
572✔
2023
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
572✔
2024
    }
572✔
2025

27,722✔
2026
    version_type progress_client_version = m_upload_progress.client_version;
56,118✔
2027
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
56,118✔
2028

27,722✔
2029
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
56,118✔
2030
                 "locked_server_version=%3, num_changesets=%4)",
56,118✔
2031
                 progress_client_version, progress_server_version, locked_server_version,
56,118✔
2032
                 uploadable_changesets.size()); // Throws
56,118✔
2033

27,722✔
2034
    ClientProtocol& protocol = m_conn.get_client_protocol();
56,118✔
2035
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
56,118✔
2036

27,722✔
2037
    for (const UploadChangeset& uc : uploadable_changesets) {
49,326✔
2038
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
41,750✔
2039
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
41,750✔
2040
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
41,750✔
2041
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
41,750✔
2042
        if (logger.would_log(util::Logger::Level::trace)) {
41,750✔
2043
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2044
            if (changeset_data.size() < 1024) {
×
2045
                logger.trace("Changeset: %1",
×
2046
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2047
            }
×
2048
            else {
×
2049
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2050
                             protocol.compressed_hex_dump(changeset_data));
×
2051
            }
×
2052

2053
#if REALM_DEBUG
×
2054
            ChunkedBinaryInputStream in{changeset_data};
×
2055
            Changeset log;
×
2056
            try {
×
2057
                parse_changeset(in, log);
×
2058
                std::stringstream ss;
×
2059
                log.print(ss);
×
2060
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2061
            }
×
2062
            catch (const BadChangesetError& err) {
×
2063
                logger.error("Unable to parse changeset: %1", err.what());
×
2064
            }
×
2065
#endif
×
2066
        }
×
2067

20,146✔
2068
#if 0 // Upload log compaction is currently not implemented
2069
        if (!get_client().m_disable_upload_compaction) {
2070
            ChangesetEncoder::Buffer encode_buffer;
2071

2072
            {
2073
                // Upload compaction only takes place within single changesets to
2074
                // avoid another client seeing inconsistent snapshots.
2075
                ChunkedBinaryInputStream stream{uc.changeset};
2076
                Changeset changeset;
2077
                parse_changeset(stream, changeset); // Throws
2078
                // FIXME: What is the point of setting these? How can compaction care about them?
2079
                changeset.version = uc.progress.client_version;
2080
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2081
                changeset.origin_timestamp = uc.origin_timestamp;
2082
                changeset.origin_file_ident = uc.origin_file_ident;
2083

2084
                compact_changesets(&changeset, 1);
2085
                encode_changeset(changeset, encode_buffer);
2086

2087
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2088
                             encode_buffer.size()); // Throws
2089
            }
2090

2091
            upload_message_builder.add_changeset(
2092
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2093
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2094
        }
2095
        else
2096
#endif
2097
        {
41,750✔
2098
            upload_message_builder.add_changeset(uc.progress.client_version,
41,750✔
2099
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
41,750✔
2100
                                                 uc.origin_file_ident,
41,750✔
2101
                                                 uc.changeset); // Throws
41,750✔
2102
        }
41,750✔
2103
    }
41,750✔
2104

27,722✔
2105
    int protocol_version = m_conn.get_negotiated_protocol_version();
56,118✔
2106
    OutputBuffer& out = m_conn.get_output_buffer();
56,118✔
2107
    session_ident_type session_ident = get_ident();
56,118✔
2108
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
56,118✔
2109
                                               progress_server_version,
56,118✔
2110
                                               locked_server_version); // Throws
56,118✔
2111
    m_conn.initiate_write_message(out, this);                          // Throws
56,118✔
2112

27,722✔
2113
    // Other messages may be waiting to be sent
27,722✔
2114
    enlist_to_send(); // Throws
56,118✔
2115
}
56,118✔
2116

2117

2118
void Session::send_mark_message()
2119
{
18,314✔
2120
    REALM_ASSERT_EX(m_state == Active, m_state);
18,314✔
2121
    REALM_ASSERT(m_ident_message_sent);
18,314✔
2122
    REALM_ASSERT(!m_unbind_message_sent);
18,314✔
2123
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
18,314✔
2124

8,382✔
2125
    request_ident_type request_ident = m_target_download_mark;
18,314✔
2126
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
18,314✔
2127

8,382✔
2128
    ClientProtocol& protocol = m_conn.get_client_protocol();
18,314✔
2129
    OutputBuffer& out = m_conn.get_output_buffer();
18,314✔
2130
    session_ident_type session_ident = get_ident();
18,314✔
2131
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
18,314✔
2132
    m_conn.initiate_write_message(out, this);                      // Throws
18,314✔
2133

8,382✔
2134
    m_last_download_mark_sent = request_ident;
18,314✔
2135

8,382✔
2136
    // Other messages may be waiting to be sent
8,382✔
2137
    enlist_to_send(); // Throws
18,314✔
2138
}
18,314✔
2139

2140

2141
void Session::send_unbind_message()
2142
{
7,288✔
2143
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
7,288✔
2144
    REALM_ASSERT(m_bind_message_sent);
7,288✔
2145
    REALM_ASSERT(!m_unbind_message_sent);
7,288✔
2146

2,982✔
2147
    logger.debug("Sending: UNBIND"); // Throws
7,288✔
2148

2,982✔
2149
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,288✔
2150
    OutputBuffer& out = m_conn.get_output_buffer();
7,288✔
2151
    session_ident_type session_ident = get_ident();
7,288✔
2152
    protocol.make_unbind_message(out, session_ident); // Throws
7,288✔
2153
    m_conn.initiate_write_message(out, this);         // Throws
7,288✔
2154

2,982✔
2155
    m_unbind_message_sent = true;
7,288✔
2156
}
7,288✔
2157

2158

2159
void Session::send_json_error_message()
2160
{
36✔
2161
    REALM_ASSERT_EX(m_state == Active, m_state);
36✔
2162
    REALM_ASSERT(m_ident_message_sent);
36✔
2163
    REALM_ASSERT(!m_unbind_message_sent);
36✔
2164
    REALM_ASSERT(m_error_to_send);
36✔
2165
    REALM_ASSERT(m_client_error);
36✔
2166

14✔
2167
    ClientProtocol& protocol = m_conn.get_client_protocol();
36✔
2168
    OutputBuffer& out = m_conn.get_output_buffer();
36✔
2169
    session_ident_type session_ident = get_ident();
36✔
2170
    auto protocol_error = m_client_error->error_for_server;
36✔
2171

14✔
2172
    auto message = util::format("%1", m_client_error->to_status());
36✔
2173
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
36✔
2174
                session_ident); // Throws
36✔
2175

14✔
2176
    nlohmann::json error_body_json;
36✔
2177
    error_body_json["message"] = std::move(message);
36✔
2178
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
36✔
2179
                                     error_body_json.dump()); // Throws
36✔
2180
    m_conn.initiate_write_message(out, this);                 // Throws
36✔
2181

14✔
2182
    m_error_to_send = false;
36✔
2183
    enlist_to_send(); // Throws
36✔
2184
}
36✔
2185

2186

2187
void Session::send_test_command_message()
2188
{
52✔
2189
    REALM_ASSERT_EX(m_state == Active, m_state);
52✔
2190

26✔
2191
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2192
                           [](const PendingTestCommand& command) {
52✔
2193
                               return command.pending;
52✔
2194
                           });
52✔
2195
    REALM_ASSERT(it != m_pending_test_commands.end());
52✔
2196

26✔
2197
    ClientProtocol& protocol = m_conn.get_client_protocol();
52✔
2198
    OutputBuffer& out = m_conn.get_output_buffer();
52✔
2199
    auto session_ident = get_ident();
52✔
2200

26✔
2201
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
52✔
2202
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
52✔
2203

26✔
2204
    m_conn.initiate_write_message(out, this); // Throws;
52✔
2205
    it->pending = false;
52✔
2206

26✔
2207
    enlist_to_send();
52✔
2208
}
52✔
2209

2210
bool Session::client_reset_if_needed()
2211
{
3,452✔
2212
    // Regardless of what happens, once we return from this function we will
1,644✔
2213
    // no longer be in the middle of a client reset
1,644✔
2214
    m_performing_client_reset = false;
3,452✔
2215

1,644✔
2216
    // Even if we end up not actually performing a client reset, consume the
1,644✔
2217
    // config to ensure that the resources it holds are released
1,644✔
2218
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,452✔
2219
    if (!client_reset_config) {
3,452✔
2220
        return false;
3,088✔
2221
    }
3,088✔
2222

182✔
2223
    auto on_flx_version_complete = [this](int64_t version) {
364✔
2224
        this->on_flx_sync_version_complete(version);
288✔
2225
    };
288✔
2226
    bool did_reset = client_reset::perform_client_reset(
364✔
2227
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
364✔
2228
        std::move(client_reset_config->notify_before_client_reset),
364✔
2229
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
364✔
2230
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
364✔
2231
    if (!did_reset) {
364✔
2232
        return false;
×
2233
    }
×
2234

182✔
2235
    // The fresh Realm has been used to reset the state
182✔
2236
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
364✔
2237

182✔
2238
    SaltedFileIdent client_file_ident;
364✔
2239
    bool has_pending_client_reset = false;
364✔
2240
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
364✔
2241
                             &has_pending_client_reset); // Throws
364✔
2242
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
364✔
2243
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
364✔
2244
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
364✔
2245
                    m_progress.download.last_integrated_client_version);
364✔
2246
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
364✔
2247
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
364✔
2248

182✔
2249
    m_upload_progress = m_progress.upload;
364✔
2250
    m_download_progress = m_progress.download;
364✔
2251
    // In recovery mode, there may be new changesets to upload and nothing left to download.
182✔
2252
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
182✔
2253
    // For both, we want to allow uploads again without needing external changes to download first.
182✔
2254
    m_allow_upload = true;
364✔
2255
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
364✔
2256

182✔
2257
    if (has_pending_client_reset) {
364✔
2258
        handle_pending_client_reset_acknowledgement();
284✔
2259
    }
284✔
2260

182✔
2261
    update_subscription_version_info();
364✔
2262

182✔
2263
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
182✔
2264
    if (auto migration_store = get_migration_store()) {
364✔
2265
        migration_store->complete_migration_or_rollback();
256✔
2266
    }
256✔
2267

182✔
2268
    return true;
364✔
2269
}
364✔
2270

2271
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2272
{
3,510✔
2273
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,510✔
2274
                 client_file_ident.salt); // Throws
3,510✔
2275

1,676✔
2276
    // Ignore the message if the deactivation process has been initiated,
1,676✔
2277
    // because in that case, the associated Realm and SessionWrapper must
1,676✔
2278
    // not be accessed any longer.
1,676✔
2279
    if (m_state != Active)
3,510✔
2280
        return Status::OK(); // Success
58✔
2281

1,644✔
2282
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,452✔
2283
                               !m_unbound_message_received);
3,452✔
2284
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,452✔
2285
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2286
    }
×
2287
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,452✔
2288
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2289
    }
×
2290
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,452✔
2291
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2292
    }
×
2293

1,644✔
2294
    m_client_file_ident = client_file_ident;
3,452✔
2295

1,644✔
2296
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,452✔
2297
        // Ready to send the IDENT message
2298
        ensure_enlisted_to_send(); // Throws
×
2299
        return Status::OK();       // Success
×
2300
    }
×
2301

1,644✔
2302
    // if a client reset happens, it will take care of setting the file ident
1,644✔
2303
    // and if not, we do it here
1,644✔
2304
    bool did_client_reset = false;
3,452✔
2305
    try {
3,452✔
2306
        did_client_reset = client_reset_if_needed();
3,452✔
2307
    }
3,452✔
2308
    catch (const std::exception& e) {
1,684✔
2309
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
80✔
2310
        logger.error(err_msg.c_str());
80✔
2311
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2312
        suspend(err_info);
80✔
2313
        return Status::OK();
80✔
2314
    }
80✔
2315
    if (!did_client_reset) {
3,372✔
2316
        get_history().set_client_file_ident(client_file_ident,
3,088✔
2317
                                            m_fix_up_object_ids); // Throws
3,088✔
2318
        m_progress.download.last_integrated_client_version = 0;
3,088✔
2319
        m_progress.upload.client_version = 0;
3,088✔
2320
        m_last_version_selected_for_upload = 0;
3,088✔
2321
    }
3,088✔
2322

1,604✔
2323
    // Ready to send the IDENT message
1,604✔
2324
    ensure_enlisted_to_send(); // Throws
3,372✔
2325
    return Status::OK();       // Success
3,372✔
2326
}
3,372✔
2327

2328
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2329
                                         DownloadBatchState batch_state, int64_t query_version,
2330
                                         const ReceivedChangesets& received_changesets)
2331
{
46,656✔
2332
    REALM_ASSERT_EX(query_version >= 0, query_version);
46,656✔
2333
    // Ignore the message if the deactivation process has been initiated,
24,232✔
2334
    // because in that case, the associated Realm and SessionWrapper must
24,232✔
2335
    // not be accessed any longer.
24,232✔
2336
    if (m_state != Active)
46,656✔
2337
        return Status::OK();
498✔
2338

23,926✔
2339
    if (is_steady_state_download_message(batch_state, query_version)) {
46,158✔
2340
        batch_state = DownloadBatchState::SteadyState;
44,200✔
2341
    }
44,200✔
2342

23,926✔
2343
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
46,158✔
2344
                 "latest_server_version=%3, latest_server_version_salt=%4, "
46,158✔
2345
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
46,158✔
2346
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
46,158✔
2347
                 progress.download.server_version, progress.download.last_integrated_client_version,
46,158✔
2348
                 progress.latest_server_version.version, progress.latest_server_version.salt,
46,158✔
2349
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
46,158✔
2350
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
46,158✔
2351

23,926✔
2352
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,926✔
2353
    // changeset over and over again.
23,926✔
2354
    if (m_client_error) {
46,158✔
2355
        logger.debug("Ignoring download message because the client detected an integration error");
×
2356
        return Status::OK();
×
2357
    }
×
2358

23,926✔
2359
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
46,160✔
2360
    if (REALM_UNLIKELY(!legal_at_this_time)) {
46,158✔
2361
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2362
    }
×
2363
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
46,158✔
2364
        logger.error("Bad sync progress received (%1)", status);
×
2365
        return status;
×
2366
    }
×
2367

23,926✔
2368
    version_type server_version = m_progress.download.server_version;
46,158✔
2369
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
46,158✔
2370
    for (const RemoteChangeset& changeset : received_changesets) {
44,766✔
2371
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,236✔
2372
        // version must be increasing, but can stay the same during bootstraps.
22,236✔
2373
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,252✔
2374
                                                         : (changeset.remote_version > server_version);
42,060✔
2375
        // Each server version cannot be greater than the one in the header of the download message.
22,236✔
2376
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,076✔
2377
        if (!good_server_version) {
43,076✔
2378
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2379
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2380
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2381
        }
×
2382
        server_version = changeset.remote_version;
43,076✔
2383
        // Check that per-changeset last integrated client version is "weakly"
22,236✔
2384
        // increasing.
22,236✔
2385
        bool good_client_version =
43,076✔
2386
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,076✔
2387
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,076✔
2388
        if (!good_client_version) {
43,076✔
2389
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2390
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2391
                                 "(%1, %2, %3)",
×
2392
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2393
                                 progress.download.last_integrated_client_version)};
×
2394
        }
×
2395
        last_integrated_client_version = changeset.last_integrated_local_version;
43,076✔
2396
        // Server shouldn't send our own changes, and zero is not a valid client
22,236✔
2397
        // file identifier.
22,236✔
2398
        bool good_file_ident =
43,076✔
2399
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,076✔
2400
        if (!good_file_ident) {
43,076✔
2401
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2402
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2403
                                 changeset.origin_file_ident)};
×
2404
        }
×
2405
    }
43,076✔
2406

23,926✔
2407
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
46,158✔
2408
                                       batch_state, received_changesets.size());
46,158✔
2409
    if (hook_action == SyncClientHookAction::EarlyReturn) {
46,158✔
2410
        return Status::OK();
16✔
2411
    }
16✔
2412
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
46,142✔
2413

23,918✔
2414
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
46,142✔
2415
        clear_resumption_delay_state();
1,948✔
2416
        return Status::OK();
1,948✔
2417
    }
1,948✔
2418

22,944✔
2419
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
44,194✔
2420

22,944✔
2421
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
44,194✔
2422
                                  batch_state, received_changesets.size());
44,194✔
2423
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,194✔
2424
        return Status::OK();
×
2425
    }
×
2426
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,194✔
2427

22,944✔
2428
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,944✔
2429
    // after a retryable session error.
22,944✔
2430
    clear_resumption_delay_state();
44,194✔
2431
    return Status::OK();
44,194✔
2432
}
44,194✔
2433

2434
Status Session::receive_mark_message(request_ident_type request_ident)
2435
{
17,524✔
2436
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
17,524✔
2437

7,978✔
2438
    // Ignore the message if the deactivation process has been initiated,
7,978✔
2439
    // because in that case, the associated Realm and SessionWrapper must
7,978✔
2440
    // not be accessed any longer.
7,978✔
2441
    if (m_state != Active)
17,524✔
2442
        return Status::OK(); // Success
406✔
2443

7,934✔
2444
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
17,118✔
2445
    if (REALM_UNLIKELY(!legal_at_this_time)) {
17,118✔
2446
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
12✔
2447
    }
12✔
2448
    bool good_request_ident =
17,106✔
2449
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
17,106✔
2450
    if (REALM_UNLIKELY(!good_request_ident)) {
17,106✔
2451
        return {
×
2452
            ErrorCodes::SyncProtocolInvariantFailed,
×
2453
            util::format(
×
2454
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2455
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2456
    }
×
2457

7,928✔
2458
    m_server_version_at_last_download_mark = m_progress.download.server_version;
17,106✔
2459
    m_last_download_mark_received = request_ident;
17,106✔
2460
    check_for_download_completion(); // Throws
17,106✔
2461

7,928✔
2462
    return Status::OK(); // Success
17,106✔
2463
}
17,106✔
2464

2465

2466
// The caller (Connection) must discard the session if the session has become
2467
// deactivated upon return.
2468
Status Session::receive_unbound_message()
2469
{
5,384✔
2470
    logger.debug("Received: UNBOUND");
5,384✔
2471

2,120✔
2472
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
5,384✔
2473
    if (REALM_UNLIKELY(!legal_at_this_time)) {
5,384✔
2474
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2475
    }
×
2476

2,120✔
2477
    // The fact that the UNBIND message has been sent, but an ERROR message has
2,120✔
2478
    // not been received, implies that the deactivation process must have been
2,120✔
2479
    // initiated, so this session must be in the Deactivating state or the session
2,120✔
2480
    // has been suspended because of a client side error.
2,120✔
2481
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
5,384!
2482

2,120✔
2483
    m_unbound_message_received = true;
5,384✔
2484

2,120✔
2485
    // Detect completion of the unbinding process
2,120✔
2486
    if (m_unbind_message_send_complete && m_state == Deactivating) {
5,384✔
2487
        // The deactivation process completes when the unbinding process
2,120✔
2488
        // completes.
2,120✔
2489
        complete_deactivation(); // Throws
5,384✔
2490
        // Life cycle state is now Deactivated
2,120✔
2491
    }
5,384✔
2492

2,120✔
2493
    return Status::OK(); // Success
5,384✔
2494
}
5,384✔
2495

2496

2497
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2498
{
20✔
2499
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2500
    // Ignore the message if the deactivation process has been initiated,
10✔
2501
    // because in that case, the associated Realm and SessionWrapper must
10✔
2502
    // not be accessed any longer.
10✔
2503
    if (m_state == Active) {
20✔
2504
        on_flx_sync_error(query_version, message); // throws
20✔
2505
    }
20✔
2506
    return Status::OK();
20✔
2507
}
20✔
2508

2509
// The caller (Connection) must discard the session if the session has become
2510
// deactivated upon return.
2511
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2512
{
698✔
2513
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
698✔
2514
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
698✔
2515

344✔
2516
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
698✔
2517
    if (REALM_UNLIKELY(!legal_at_this_time)) {
698✔
2518
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2519
    }
×
2520

344✔
2521
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
698✔
2522
    auto status = protocol_error_to_status(protocol_error, info.message);
698✔
2523
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
698✔
2524
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2525
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2526
                             info.raw_error_code)};
×
2527
    }
×
2528

344✔
2529
    // Can't process debug hook actions once the Session is undergoing deactivation, since
344✔
2530
    // the SessionWrapper may not be available
344✔
2531
    if (m_state == Active) {
698✔
2532
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
696✔
2533
        if (debug_action == SyncClientHookAction::EarlyReturn) {
696✔
2534
            return Status::OK();
8✔
2535
        }
8✔
2536
    }
690✔
2537

340✔
2538
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
340✔
2539
    // containing the compensating write has appeared in a download message.
340✔
2540
    if (status == ErrorCodes::SyncCompensatingWrite) {
690✔
2541
        // If the client is not active, the compensating writes will not be processed now, but will be
22✔
2542
        // sent again the next time the client connects
22✔
2543
        if (m_state == Active) {
44✔
2544
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2545
            m_pending_compensating_write_errors.push_back(info);
44✔
2546
        }
44✔
2547
        return Status::OK();
44✔
2548
    }
44✔
2549

318✔
2550
    if (protocol_error == ProtocolError::schema_version_changed) {
646✔
2551
        // Enable upload immediately if the session is still active.
26✔
2552
        if (m_state == Active) {
54✔
2553
            auto wt = get_db()->start_write();
54✔
2554
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
54✔
2555
            wt->commit();
54✔
2556
            // Notify SyncSession a schema migration is required.
26✔
2557
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
54✔
2558
        }
54✔
2559
        // Keep the session active to upload any unsynced changes.
26✔
2560
        return Status::OK();
54✔
2561
    }
54✔
2562

292✔
2563
    m_error_message_received = true;
592✔
2564
    suspend(SessionErrorInfo{info, std::move(status)});
592✔
2565
    return Status::OK();
592✔
2566
}
592✔
2567

2568
void Session::suspend(const SessionErrorInfo& info)
2569
{
672✔
2570
    REALM_ASSERT(!m_suspended);
672✔
2571
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
672!
2572
    logger.debug("Suspended"); // Throws
672✔
2573

332✔
2574
    m_suspended = true;
672✔
2575

332✔
2576
    // Detect completion of the unbinding process
332✔
2577
    if (m_unbind_message_send_complete && m_error_message_received) {
672!
2578
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2579
        // we received an ERROR message implies that the deactivation process must
2580
        // have been initiated, so this session must be in the Deactivating state.
2581
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2582

2583
        // The deactivation process completes when the unbinding process
2584
        // completes.
2585
        complete_deactivation(); // Throws
×
2586
        // Life cycle state is now Deactivated
2587
    }
×
2588

332✔
2589
    // Notify the application of the suspension of the session if the session is
332✔
2590
    // still in the Active state
332✔
2591
    if (m_state == Active) {
672✔
2592
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
670✔
2593
        m_conn.one_less_active_unsuspended_session(); // Throws
670✔
2594
        on_suspended(info);                           // Throws
670✔
2595
    }
670✔
2596

332✔
2597
    if (!info.is_fatal) {
672✔
2598
        begin_resumption_delay(info);
60✔
2599
    }
60✔
2600

332✔
2601
    // Ready to send the UNBIND message, if it has not been sent already
332✔
2602
    if (!m_unbind_message_sent)
672✔
2603
        ensure_enlisted_to_send(); // Throws
670✔
2604
}
672✔
2605

2606
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2607
{
52✔
2608
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
52✔
2609
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2610
                           [&](const PendingTestCommand& command) {
52✔
2611
                               return command.id == ident;
52✔
2612
                           });
52✔
2613
    if (it == m_pending_test_commands.end()) {
52✔
2614
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2615
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2616
    }
×
2617

26✔
2618
    it->promise.emplace_value(std::string{body});
52✔
2619
    m_pending_test_commands.erase(it);
52✔
2620

26✔
2621
    return Status::OK();
52✔
2622
}
52✔
2623

2624
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2625
{
60✔
2626
    REALM_ASSERT(!m_try_again_activation_timer);
60✔
2627

28✔
2628
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
60✔
2629
                                  error_info.resumption_delay_interval);
60✔
2630
    auto try_again_interval = m_try_again_delay_info.delay_interval();
60✔
2631
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
60✔
2632
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
14✔
2633
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
14✔
2634
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
14✔
2635
        try_again_interval = std::chrono::milliseconds{1000};
32✔
2636
    }
32✔
2637
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
60✔
2638
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
60✔
2639
        if (status == ErrorCodes::OperationAborted)
60✔
2640
            return;
12✔
2641
        else if (!status.is_ok())
48✔
2642
            throw Exception(status);
×
2643

22✔
2644
        m_try_again_activation_timer.reset();
48✔
2645
        cancel_resumption_delay();
48✔
2646
    });
48✔
2647
}
60✔
2648

2649
void Session::clear_resumption_delay_state()
2650
{
46,140✔
2651
    if (m_try_again_activation_timer) {
46,140✔
2652
        logger.debug("Clearing resumption delay state after successful download");
×
2653
        m_try_again_delay_info.reset();
×
2654
    }
×
2655
}
46,140✔
2656

2657
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2658
{
46,158✔
2659
    const SyncProgress& a = m_progress;
46,158✔
2660
    const SyncProgress& b = progress;
46,158✔
2661
    std::string message;
46,158✔
2662
    if (b.latest_server_version.version < a.latest_server_version.version) {
46,158✔
2663
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2664
                               "session (current: %1, received: %2)",
×
2665
                               a.latest_server_version.version, b.latest_server_version.version);
×
2666
    }
×
2667
    if (b.upload.client_version < a.upload.client_version) {
46,158✔
2668
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2669
                               "throughout a session (current: %1, received: %2)",
×
2670
                               a.upload.client_version, b.upload.client_version);
×
2671
    }
×
2672
    if (b.upload.client_version > m_last_version_available) {
46,158✔
2673
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2674
                               "version in existence (current: %1, received: %2)",
×
2675
                               m_last_version_available, b.upload.client_version);
×
2676
    }
×
2677
    if (b.download.server_version < a.download.server_version) {
46,158✔
2678
        message =
×
2679
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2680
                         a.download.server_version, b.download.server_version);
×
2681
    }
×
2682
    if (b.download.server_version > b.latest_server_version.version) {
46,158✔
2683
        message = util::format(
×
2684
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2685
            b.download.server_version, b.latest_server_version.version);
×
2686
    }
×
2687
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
46,158✔
2688
        message = util::format(
×
2689
            "Last integrated client version on the server at the position in the server's history of the download "
×
2690
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2691
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2692
    }
×
2693
    if (b.download.last_integrated_client_version > b.upload.client_version) {
46,158✔
2694
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2695
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2696
                               "on the server (download: %1, upload: %2)",
×
2697
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2698
    }
×
2699
    if (b.download.server_version < b.upload.last_integrated_server_version) {
46,158✔
2700
        message = util::format(
×
2701
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2702
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2703
            b.download.server_version, b.upload.last_integrated_server_version);
×
2704
    }
×
2705

23,924✔
2706
    if (message.empty()) {
46,158✔
2707
        return Status::OK();
46,158✔
2708
    }
46,158✔
UNCOV
2709
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
UNCOV
2710
}
×
2711

2712

2713
void Session::check_for_upload_completion()
2714
{
78,510✔
2715
    REALM_ASSERT_EX(m_state == Active, m_state);
78,510✔
2716
    if (!m_upload_completion_notification_requested) {
78,510✔
2717
        return;
45,940✔
2718
    }
45,940✔
2719

15,202✔
2720
    // during an ongoing client reset operation, we never upload anything
15,202✔
2721
    if (m_performing_client_reset)
32,570✔
2722
        return;
258✔
2723

15,074✔
2724
    // Upload process must have reached end of history
15,074✔
2725
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
32,312✔
2726
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
32,312✔
2727
    if (!scan_complete)
32,312✔
2728
        return;
6,776✔
2729

12,522✔
2730
    // All uploaded changesets must have been acknowledged by the server
12,522✔
2731
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,536✔
2732
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,536✔
2733
    if (!all_uploads_accepted)
25,536✔
2734
        return;
10,708✔
2735

7,328✔
2736
    m_upload_completion_notification_requested = false;
14,828✔
2737
    on_upload_completion(); // Throws
14,828✔
2738
}
14,828✔
2739

2740

2741
void Session::check_for_download_completion()
2742
{
63,026✔
2743
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
63,026✔
2744
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
63,026✔
2745
    if (m_last_download_mark_received == m_last_triggering_download_mark)
63,026✔
2746
        return;
45,724✔
2747
    if (m_last_download_mark_received < m_target_download_mark)
17,302✔
2748
        return;
344✔
2749
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,958✔
2750
        return;
×
2751
    m_last_triggering_download_mark = m_target_download_mark;
16,958✔
2752
    if (REALM_UNLIKELY(!m_allow_upload)) {
16,958✔
2753
        // Activate the upload process now, and enable immediate reactivation
2,134✔
2754
        // after a subsequent fast reconnect.
2,134✔
2755
        m_allow_upload = true;
5,504✔
2756
        ensure_enlisted_to_send(); // Throws
5,504✔
2757
    }
5,504✔
2758
    on_download_completion(); // Throws
16,958✔
2759
}
16,958✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc