• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2214

10 Apr 2024 11:21PM UTC coverage: 91.813% (-0.8%) from 92.623%
2214

push

Evergreen

web-flow
Add missing availability checks for SecCopyErrorMessageString (#7577)

This requires iOS 11.3 and we currently target iOS 11.

94848 of 175770 branches covered (53.96%)

7 of 22 new or added lines in 2 files covered. (31.82%)

1815 existing lines in 77 files now uncovered.

242945 of 264608 relevant lines covered (91.81%)

6136478.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.58
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,798✔
51
    m_backoff_state.reset();
1,798✔
52
    scheduled_reset = false;
1,798✔
53
}
1,798✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,606✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,606✔
60
}
3,606✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
5,744✔
65
    if (scheduled_reset) {
5,744✔
66
        reset();
4✔
67
    }
4✔
68

2,982✔
69
    if (!m_backoff_state.triggering_error) {
5,744✔
70
        return std::chrono::milliseconds::zero();
4,420✔
71
    }
4,420✔
72

722✔
73
    switch (*m_backoff_state.triggering_error) {
1,324✔
74
        case ConnectionTerminationReason::closed_voluntarily:
80✔
75
            return std::chrono::milliseconds::zero();
80✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,226✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,226✔
80
                return std::chrono::milliseconds::max();
926✔
81
            }
926✔
82

150✔
83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
300✔
84
            return m_backoff_state.delay_interval();
300✔
85
    }
1,324✔
86
}
1,324✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
3,974✔
92
    util::Uri uri(url); // Throws
3,974✔
93
    uri.canonicalize(); // Throws
3,974✔
94
    std::string userinfo, address_2, port_2;
3,974✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,974✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,974✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,974✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,974✔
99
    if (REALM_UNLIKELY(!good))
3,974✔
100
        return false;
1,814✔
101
    ProtocolEnvelope protocol_2;
3,974✔
102
    port_type port_3;
3,974✔
103
    if (realm_scheme) {
3,974✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
3,974✔
114
        REALM_ASSERT(ws_scheme);
3,974✔
115
        if (uri.get_scheme() == "ws:") {
3,974✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,930✔
117
            port_3 = 80;
3,930✔
118
        }
3,930✔
119
        else {
44✔
120
            protocol_2 = ProtocolEnvelope::wss;
44✔
121
            port_3 = 443;
44✔
122
        }
44✔
123
    }
3,974✔
124
    if (!port_2.empty()) {
3,974✔
125
        std::istringstream in(port_2);    // Throws
3,906✔
126
        in.imbue(std::locale::classic()); // Throws
3,906✔
127
        in >> port_3;
3,906✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,906✔
129
            return false;
1,780✔
130
    }
3,974✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
3,974✔
132

1,814✔
133
    protocol = protocol_2;
3,974✔
134
    address = std::move(address_2);
3,974✔
135
    port = port_3;
3,974✔
136
    path = std::move(path_2);
3,974✔
137
    return true;
3,974✔
138
}
3,974✔
139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
142
    , logger{*logger_ptr}
143
    , m_reconnect_mode{config.reconnect_mode}
144
    , m_connect_timeout{config.connect_timeout}
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
151
    , m_dry_run{config.dry_run}
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
156
    , m_socket_provider{std::move(config.socket_provider)}
157
    , m_client_protocol{} // Throws
158
    , m_one_connection_per_session{config.one_connection_per_session}
159
    , m_random{}
160
{
9,626✔
161
    // FIXME: Would be better if seeding was up to the application.
4,744✔
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,626✔
163

4,744✔
164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,626✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,626✔
166
                 get_current_protocol_version()); // Throws
9,626✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,626✔
168
    const char* build_mode;
9,626✔
169
#if REALM_DEBUG
9,626✔
170
    build_mode = "Debug";
9,626✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,626✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,626✔
176
                 config.one_connection_per_session); // Throws
9,626✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,626✔
178
                 config.connect_timeout); // Throws
9,626✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,626✔
180
                 config.connection_linger_time); // Throws
9,626✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,626✔
182
                 config.ping_keepalive_period); // Throws
9,626✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,626✔
184
                 config.pong_keepalive_timeout); // Throws
9,626✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,626✔
186
                 config.fast_reconnect_limit); // Throws
9,626✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,626✔
188
                 config.disable_upload_compaction); // Throws
9,626✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,626✔
190
                 config.disable_sync_to_disk); // Throws
9,626✔
191
    logger.debug(
9,626✔
192
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,626✔
193
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,626✔
194
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,626✔
195
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,626✔
196

4,744✔
197
    if (config.reconnect_mode != ReconnectMode::normal) {
9,626✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
199
                    "never do this in production!");
776✔
200
    }
776✔
201

4,744✔
202
    if (config.dry_run) {
9,626✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
×
205
    }
×
206

4,744✔
207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,626✔
208

4,744✔
209
    if (m_one_connection_per_session) {
9,626✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
4✔
212
    }
4✔
213

4,744✔
214
    if (config.disable_upload_activation_delay) {
9,626✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

4,744✔
219
    if (config.disable_sync_to_disk) {
9,626✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
×
222
    }
×
223

4,744✔
224
    m_actualize_and_finalize = create_trigger([this](Status status) {
16,252✔
225
        if (status == ErrorCodes::OperationAborted)
16,252✔
226
            return;
×
227
        else if (!status.is_ok())
16,252✔
228
            throw Exception(status);
×
229
        actualize_and_finalize_session_wrappers(); // Throws
16,252✔
230
    });
16,252✔
231
}
9,626✔
232

233

234
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
235
{
178,248✔
236
    REALM_ASSERT(m_socket_provider);
178,248✔
237
    {
178,248✔
238
        std::lock_guard lock(m_drain_mutex);
178,248✔
239
        ++m_outstanding_posts;
178,248✔
240
        m_drained = false;
178,248✔
241
    }
178,248✔
242
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
178,260✔
243
        auto decr_guard = util::make_scope_exit([&]() noexcept {
178,258✔
244
            std::lock_guard lock(m_drain_mutex);
178,252✔
245
            REALM_ASSERT(m_outstanding_posts);
178,252✔
246
            --m_outstanding_posts;
178,252✔
247
            m_drain_cv.notify_all();
178,252✔
248
        });
178,252✔
249
        handler(status);
178,256✔
250
    });
178,256✔
251
}
178,248✔
252

253

254
void ClientImpl::drain_connections()
255
{
9,626✔
256
    logger.debug("Draining connections during sync client shutdown");
9,626✔
257
    for (auto& server_slot_pair : m_server_slots) {
6,112✔
258
        auto& server_slot = server_slot_pair.second;
2,602✔
259

1,234✔
260
        if (server_slot.connection) {
2,602✔
261
            auto& conn = server_slot.connection;
2,402✔
262
            conn->force_close();
2,402✔
263
        }
2,402✔
264
        else {
200✔
265
            for (auto& conn_pair : server_slot.alt_connections) {
102✔
266
                conn_pair.second->force_close();
6✔
267
            }
6✔
268
        }
200✔
269
    }
2,602✔
270
}
9,626✔
271

272

273
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
274
                                                       SyncSocketProvider::FunctionHandler&& handler)
275
{
17,172✔
276
    REALM_ASSERT(m_socket_provider);
17,172✔
277
    {
17,172✔
278
        std::lock_guard lock(m_drain_mutex);
17,172✔
279
        ++m_outstanding_posts;
17,172✔
280
        m_drained = false;
17,172✔
281
    }
17,172✔
282
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,174✔
283
        handler(status);
17,174✔
284

8,468✔
285
        std::lock_guard lock(m_drain_mutex);
17,174✔
286
        REALM_ASSERT(m_outstanding_posts);
17,174✔
287
        --m_outstanding_posts;
17,174✔
288
        m_drain_cv.notify_all();
17,174✔
289
    });
17,174✔
290
}
17,172✔
291

292

293
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
294
{
12,336✔
295
    REALM_ASSERT(m_socket_provider);
12,336✔
296
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,336✔
297
}
12,336✔
298

299
Connection::~Connection()
300
{
2,712✔
301
    if (m_websocket_sentinel) {
2,712✔
302
        m_websocket_sentinel->destroyed = true;
×
303
        m_websocket_sentinel.reset();
×
304
    }
×
305
}
2,712✔
306

307
void Connection::activate()
308
{
2,714✔
309
    REALM_ASSERT(m_on_idle);
2,714✔
310
    m_activated = true;
2,714✔
311
    if (m_num_active_sessions == 0)
2,714✔
312
        m_on_idle->trigger();
×
313
    // We cannot in general connect immediately, because a prior failure to
1,286✔
314
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,286✔
315
    initiate_reconnect_wait(); // Throws
2,714✔
316
}
2,714✔
317

318

319
void Connection::activate_session(std::unique_ptr<Session> sess)
320
{
10,250✔
321
    REALM_ASSERT(sess);
10,250✔
322
    REALM_ASSERT(&sess->m_conn == this);
10,250✔
323
    REALM_ASSERT(!m_force_closed);
10,250✔
324
    Session& sess_2 = *sess;
10,250✔
325
    session_ident_type ident = sess->m_ident;
10,250✔
326
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,250✔
327
    bool was_inserted = p.second;
10,250✔
328
    REALM_ASSERT(was_inserted);
10,250✔
329
    // Save the session ident to the historical list of session idents
4,950✔
330
    m_session_history.insert(ident);
10,250✔
331
    sess_2.activate(); // Throws
10,250✔
332
    if (m_state == ConnectionState::connected) {
10,250✔
333
        bool fast_reconnect = false;
6,948✔
334
        sess_2.connection_established(fast_reconnect); // Throws
6,948✔
335
    }
6,948✔
336
    ++m_num_active_sessions;
10,250✔
337
}
10,250✔
338

339

340
void Connection::initiate_session_deactivation(Session* sess)
341
{
10,252✔
342
    REALM_ASSERT(sess);
10,252✔
343
    REALM_ASSERT(&sess->m_conn == this);
10,252✔
344
    REALM_ASSERT(m_num_active_sessions);
10,252✔
345
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,950✔
346
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,950✔
347
    // decrementing the num active sessions value.
4,950✔
348
    sess->initiate_deactivation(); // Throws
10,252✔
349
    if (sess->m_state == Session::Deactivated) {
10,252✔
350
        finish_session_deactivation(sess);
1,184✔
351
    }
1,184✔
352
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,252✔
353
        if (m_activated && m_state == ConnectionState::disconnected)
4,564✔
354
            m_on_idle->trigger();
358✔
355
    }
4,564✔
356
}
10,252✔
357

358

359
void Connection::cancel_reconnect_delay()
360
{
2,012✔
361
    REALM_ASSERT(m_activated);
2,012✔
362

1,126✔
363
    if (m_reconnect_delay_in_progress) {
2,012✔
364
        if (m_nonzero_reconnect_delay)
1,790✔
365
            logger.detail("Canceling reconnect delay"); // Throws
892✔
366

1,016✔
367
        // Cancel the in-progress wait operation by destroying the timer
1,016✔
368
        // object. Destruction is needed in this case, because a new wait
1,016✔
369
        // operation might have to be initiated before the previous one
1,016✔
370
        // completes (its completion handler starts to execute), so the new wait
1,016✔
371
        // operation must be done on a new timer object.
1,016✔
372
        m_reconnect_disconnect_timer.reset();
1,790✔
373
        m_reconnect_delay_in_progress = false;
1,790✔
374
        m_reconnect_info.reset();
1,790✔
375
        initiate_reconnect_wait(); // Throws
1,790✔
376
        return;
1,790✔
377
    }
1,790✔
378

110✔
379
    // If we are not disconnected, then we need to make sure the next time we get disconnected
110✔
380
    // that we are allowed to re-connect as quickly as possible.
110✔
381
    //
110✔
382
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
110✔
383
    // backoff/delay state before calculating the next delay, unless a PONG message is received
110✔
384
    // for the urgent PING message we send below.
110✔
385
    //
110✔
386
    // If we get a PONG message for the urgent PING message sent below, then the connection is
110✔
387
    // healthy and we can calculate the next delay normally.
110✔
388
    if (m_state != ConnectionState::disconnected) {
222✔
389
        m_reconnect_info.scheduled_reset = true;
222✔
390
        m_ping_after_scheduled_reset_of_reconnect_info = false;
222✔
391

110✔
392
        schedule_urgent_ping(); // Throws
222✔
393
        return;
222✔
394
    }
222✔
395
    // Nothing to do in this case. The next reconnect attemp will be made as
110✔
396
    // soon as there are any sessions that are both active and unsuspended.
110✔
397
}
222✔
398

399
void Connection::finish_session_deactivation(Session* sess)
400
{
8,344✔
401
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,344✔
402
    auto ident = sess->m_ident;
8,344✔
403
    m_sessions.erase(ident);
8,344✔
404
    m_session_history.erase(ident);
8,344✔
405
}
8,344✔
406

407
void Connection::force_close()
408
{
2,408✔
409
    if (m_force_closed) {
2,408✔
410
        return;
×
411
    }
×
412

1,138✔
413
    m_force_closed = true;
2,408✔
414

1,138✔
415
    if (m_state != ConnectionState::disconnected) {
2,408✔
416
        voluntary_disconnect();
2,366✔
417
    }
2,366✔
418

1,138✔
419
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,408✔
420
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,408✔
421
        m_reconnect_disconnect_timer.reset();
42✔
422
        m_reconnect_delay_in_progress = false;
42✔
423
        m_disconnect_delay_in_progress = false;
42✔
424
    }
42✔
425

1,138✔
426
    // We must copy any session pointers we want to close to a vector because force_closing
1,138✔
427
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,138✔
428
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,138✔
429
    std::vector<Session*> to_close;
2,408✔
430
    for (auto& session_pair : m_sessions) {
1,188✔
431
        if (session_pair.second->m_state == Session::State::Active) {
102✔
432
            to_close.push_back(session_pair.second.get());
102✔
433
        }
102✔
434
    }
102✔
435

1,138✔
436
    for (auto& sess : to_close) {
1,188✔
437
        sess->force_close();
102✔
438
    }
102✔
439

1,138✔
440
    logger.debug("Force closed idle connection");
2,408✔
441
}
2,408✔
442

443

444
void Connection::websocket_connected_handler(const std::string& protocol)
445
{
3,430✔
446
    if (!protocol.empty()) {
3,430✔
447
        std::string_view expected_prefix =
3,430✔
448
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,084✔
449
        // FIXME: Use std::string_view::begins_with() in C++20.
1,702✔
450
        auto prefix_matches = [&](std::string_view other) {
3,430✔
451
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,430✔
452
        };
3,430✔
453
        if (prefix_matches(expected_prefix)) {
3,430✔
454
            util::MemoryInputStream in;
3,430✔
455
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,430✔
456
            in.imbue(std::locale::classic());
3,430✔
457
            in.unsetf(std::ios_base::skipws);
3,430✔
458
            int value_2 = 0;
3,430✔
459
            in >> value_2;
3,430✔
460
            if (in && in.eof() && value_2 >= 0) {
3,430✔
461
                bool good_version =
3,430✔
462
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,430✔
463
                if (good_version) {
3,430✔
464
                    logger.detail("Negotiated protocol version: %1", value_2);
3,430✔
465
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,702✔
466
                    // will provide the appservices connection ID via a log message.
1,702✔
467
                    // TODO: Remove once the server starts sending the connection ID
1,702✔
468
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,430✔
469
                    m_negotiated_protocol_version = value_2;
3,430✔
470
                    handle_connection_established(); // Throws
3,430✔
471
                    return;
3,430✔
472
                }
3,430✔
UNCOV
473
            }
×
474
        }
3,430✔
UNCOV
475
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
UNCOV
476
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
UNCOV
477
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
UNCOV
478
    }
×
UNCOV
479
    else {
×
UNCOV
480
        close_due_to_client_side_error(
×
UNCOV
481
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
UNCOV
482
            ConnectionTerminationReason::bad_headers_in_http_response);
×
UNCOV
483
    }
×
484
}
3,430✔
485

486

487
bool Connection::websocket_binary_message_received(util::Span<const char> data)
488
{
76,474✔
489
    if (m_force_closed) {
76,474✔
490
        logger.debug("Received binary message after connection was force closed");
×
491
        return false;
×
492
    }
×
493

39,152✔
494
    using sf = SimulatedFailure;
76,474✔
495
    if (sf::check_trigger(sf::sync_client__read_head)) {
76,474✔
496
        close_due_to_client_side_error(
416✔
497
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
416✔
498
            ConnectionTerminationReason::read_or_write_error);
416✔
499
        return bool(m_websocket);
416✔
500
    }
416✔
501

38,904✔
502
    handle_message_received(data);
76,058✔
503
    return bool(m_websocket);
76,058✔
504
}
76,058✔
505

506

507
void Connection::websocket_error_handler()
508
{
630✔
509
    m_websocket_error_received = true;
630✔
510
}
630✔
511

512
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
513
{
734✔
514
    if (m_force_closed) {
734✔
515
        logger.debug("Received websocket close message after connection was force closed");
×
516
        return false;
×
517
    }
×
518
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
734✔
519

388✔
520
    switch (error_code) {
734✔
521
        case WebSocketError::websocket_ok:
56✔
522
            break;
56✔
523
        case WebSocketError::websocket_resolve_failed:
4✔
524
            [[fallthrough]];
4✔
525
        case WebSocketError::websocket_connection_failed: {
84✔
526
            SessionErrorInfo error_info(
84✔
527
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
84✔
528
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
42✔
529
            // to make sure the websocket URL is correct
42✔
530
            if (!m_server_endpoint.is_verified) {
84✔
531
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
56✔
532
            }
56✔
533
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
84✔
534
            break;
84✔
535
        }
4✔
536
        case WebSocketError::websocket_read_error:
536✔
537
            [[fallthrough]];
536✔
538
        case WebSocketError::websocket_write_error: {
536✔
539
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
536✔
540
                                         ConnectionTerminationReason::read_or_write_error);
536✔
541
            break;
536✔
542
        }
536✔
543
        case WebSocketError::websocket_going_away:
288✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_protocol_error:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_unsupported_data:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_invalid_payload_data:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_policy_violation:
✔
552
            [[fallthrough]];
×
553
        case WebSocketError::websocket_reserved:
✔
554
            [[fallthrough]];
×
555
        case WebSocketError::websocket_no_status_received:
✔
556
            [[fallthrough]];
×
557
        case WebSocketError::websocket_invalid_extension: {
✔
558
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
559
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
560
            break;
×
561
        }
×
562
        case WebSocketError::websocket_message_too_big: {
4✔
563
            auto message = util::format(
4✔
564
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
565
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
566
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
567
            involuntary_disconnect(std::move(error_info),
4✔
568
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
569
            break;
4✔
570
        }
×
571
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
572
            close_due_to_client_side_error(
10✔
573
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
574
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
575
            break;
10✔
576
        }
×
577
        case WebSocketError::websocket_client_too_old:
✔
578
            [[fallthrough]];
×
579
        case WebSocketError::websocket_client_too_new:
✔
580
            [[fallthrough]];
×
581
        case WebSocketError::websocket_protocol_mismatch: {
✔
582
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
583
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
584
            break;
×
585
        }
×
UNCOV
586
        case WebSocketError::websocket_fatal_error: {
✔
587
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
588
            // been verified, then use a non-fatal error and try to perform a location update.
UNCOV
589
            SessionErrorInfo error_info(
×
UNCOV
590
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
UNCOV
591
                IsFatal{m_server_endpoint.is_verified});
×
UNCOV
592
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
593
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
594
            // to make sure the websocket URL is correct
UNCOV
595
            if (!m_server_endpoint.is_verified) {
×
UNCOV
596
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
UNCOV
597
                reason = ConnectionTerminationReason::connect_operation_failed;
×
UNCOV
598
            }
×
UNCOV
599
            involuntary_disconnect(std::move(error_info), reason);
×
UNCOV
600
            break;
×
601
        }
×
602
        case WebSocketError::websocket_forbidden: {
✔
603
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
605
            involuntary_disconnect(std::move(error_info),
×
606
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
607
            break;
×
608
        }
×
609
        case WebSocketError::websocket_unauthorized: {
32✔
610
            SessionErrorInfo error_info(
32✔
611
                {ErrorCodes::AuthError,
32✔
612
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
32✔
613
                IsFatal{false});
32✔
614
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
32✔
615
            involuntary_disconnect(std::move(error_info),
32✔
616
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
32✔
617
            break;
32✔
618
        }
×
619
        case WebSocketError::websocket_moved_permanently: {
12✔
620
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
621
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
622
            involuntary_disconnect(std::move(error_info),
12✔
623
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
624
            break;
12✔
625
        }
×
626
        case WebSocketError::websocket_abnormal_closure: {
✔
627
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
628
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
629
            involuntary_disconnect(std::move(error_info),
×
630
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
631
            break;
×
632
        }
×
633
        case WebSocketError::websocket_internal_server_error:
✔
634
            [[fallthrough]];
×
635
        case WebSocketError::websocket_retry_error: {
✔
636
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
637
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
638
            break;
×
639
        }
734✔
640
    }
734✔
641

388✔
642
    return bool(m_websocket);
734✔
643
}
734✔
644

645
// Guarantees that handle_reconnect_wait() is never called from within the
646
// execution of initiate_reconnect_wait() (no callback reentrance).
647
void Connection::initiate_reconnect_wait()
648
{
8,108✔
649
    REALM_ASSERT(m_activated);
8,108✔
650
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,108✔
651
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,108✔
652

4,094✔
653
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
4,094✔
654
    if (m_force_closed) {
8,108✔
655
        return;
2,366✔
656
    }
2,366✔
657

2,982✔
658
    m_reconnect_delay_in_progress = true;
5,742✔
659
    auto delay = m_reconnect_info.delay_interval();
5,742✔
660
    if (delay == std::chrono::milliseconds::max()) {
5,742✔
661
        logger.detail("Reconnection delayed indefinitely"); // Throws
944✔
662
        // Not actually starting a timer corresponds to an infinite wait
532✔
663
        m_nonzero_reconnect_delay = true;
944✔
664
        return;
944✔
665
    }
944✔
666

2,450✔
667
    if (delay == std::chrono::milliseconds::zero()) {
4,798✔
668
        m_nonzero_reconnect_delay = false;
4,498✔
669
    }
4,498✔
670
    else {
300✔
671
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
300✔
672
        m_nonzero_reconnect_delay = true;
300✔
673
    }
300✔
674

2,450✔
675
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,450✔
676
    // we need it to be cancelable in case the connection is terminated before the timer
2,450✔
677
    // callback is run.
2,450✔
678
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,800✔
679
        // If the operation is aborted, the connection object may have been
2,450✔
680
        // destroyed.
2,450✔
681
        if (status != ErrorCodes::OperationAborted)
4,800✔
682
            handle_reconnect_wait(status); // Throws
3,606✔
683
    });                                    // Throws
4,800✔
684
}
4,798✔
685

686

687
void Connection::handle_reconnect_wait(Status status)
688
{
3,604✔
689
    if (!status.is_ok()) {
3,604✔
690
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
691
        throw Exception(status);
×
692
    }
×
693

1,792✔
694
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,604✔
695
    m_reconnect_delay_in_progress = false;
3,604✔
696

1,792✔
697
    if (m_num_active_unsuspended_sessions > 0)
3,604✔
698
        initiate_reconnect(); // Throws
3,596✔
699
}
3,604✔
700

701
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
702
    explicit WebSocketObserverShim(Connection* conn)
703
        : conn(conn)
704
        , sentinel(conn->m_websocket_sentinel)
705
    {
3,604✔
706
    }
3,604✔
707

708
    Connection* conn;
709
    util::bind_ptr<LifecycleSentinel> sentinel;
710

711
    void websocket_connected_handler(const std::string& protocol) override
712
    {
3,430✔
713
        if (sentinel->destroyed) {
3,430✔
714
            return;
×
715
        }
×
716

1,702✔
717
        return conn->websocket_connected_handler(protocol);
3,430✔
718
    }
3,430✔
719

720
    void websocket_error_handler() override
721
    {
630✔
722
        if (sentinel->destroyed) {
630✔
723
            return;
×
724
        }
×
725

336✔
726
        conn->websocket_error_handler();
630✔
727
    }
630✔
728

729
    bool websocket_binary_message_received(util::Span<const char> data) override
730
    {
76,474✔
731
        if (sentinel->destroyed) {
76,474✔
732
            return false;
×
733
        }
×
734

39,152✔
735
        return conn->websocket_binary_message_received(data);
76,474✔
736
    }
76,474✔
737

738
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
739
    {
734✔
740
        if (sentinel->destroyed) {
734✔
741
            return true;
×
742
        }
×
743

388✔
744
        return conn->websocket_closed_handler(was_clean, error_code, msg);
734✔
745
    }
734✔
746
};
747

748
void Connection::initiate_reconnect()
749
{
3,606✔
750
    REALM_ASSERT(m_activated);
3,606✔
751

1,792✔
752
    m_state = ConnectionState::connecting;
3,606✔
753
    report_connection_state_change(ConnectionState::connecting); // Throws
3,606✔
754
    if (m_websocket_sentinel) {
3,606✔
755
        m_websocket_sentinel->destroyed = true;
×
756
    }
×
757
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,606✔
758
    m_websocket.reset();
3,606✔
759

1,792✔
760
    // Watchdog
1,792✔
761
    initiate_connect_wait(); // Throws
3,606✔
762

1,792✔
763
    std::vector<std::string> sec_websocket_protocol;
3,606✔
764
    {
3,606✔
765
        auto protocol_prefix =
3,606✔
766
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,212✔
767
        int min = get_oldest_supported_protocol_version();
3,606✔
768
        int max = get_current_protocol_version();
3,606✔
769
        REALM_ASSERT_3(min, <=, max);
3,606✔
770
        // List protocol version in descending order to ensure that the server
1,792✔
771
        // selects the highest possible version.
1,792✔
772
        for (int version = max; version >= min; --version) {
43,262✔
773
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
39,656✔
774
        }
39,656✔
775
    }
3,606✔
776

1,792✔
777
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,606✔
778
                m_server_endpoint.port, m_http_request_path_prefix);
3,606✔
779

1,792✔
780
    m_websocket_error_received = false;
3,606✔
781
    m_websocket =
3,606✔
782
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,606✔
783
                                            WebSocketEndpoint{
3,606✔
784
                                                m_server_endpoint.address,
3,606✔
785
                                                m_server_endpoint.port,
3,606✔
786
                                                get_http_request_path(),
3,606✔
787
                                                std::move(sec_websocket_protocol),
3,606✔
788
                                                is_ssl(m_server_endpoint.envelope),
3,606✔
789
                                                /// DEPRECATED - The following will be removed in a future release
1,792✔
790
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,606✔
791
                                                m_verify_servers_ssl_certificate,
3,606✔
792
                                                m_ssl_trust_certificate_path,
3,606✔
793
                                                m_ssl_verify_callback,
3,606✔
794
                                                m_proxy_config,
3,606✔
795
                                            });
3,606✔
796
}
3,606✔
797

798

799
void Connection::initiate_connect_wait()
800
{
3,604✔
801
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,792✔
802
    // fully establish the connection (including SSL and WebSocket
1,792✔
803
    // handshakes). Without such a watchdog, connect operations could take very
1,792✔
804
    // long, or even indefinite time.
1,792✔
805
    milliseconds_type time = m_client.m_connect_timeout;
3,604✔
806

1,792✔
807
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,606✔
808
        // If the operation is aborted, the connection object may have been
1,792✔
809
        // destroyed.
1,792✔
810
        if (status != ErrorCodes::OperationAborted)
3,606✔
811
            handle_connect_wait(status); // Throws
×
812
    });                                  // Throws
3,606✔
813
}
3,604✔
814

815

816
void Connection::handle_connect_wait(Status status)
817
{
×
818
    if (!status.is_ok()) {
×
819
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
820
        throw Exception(status);
×
821
    }
×
822

823
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
824
    logger.info("Connect timeout"); // Throws
×
825
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
826
                                IsFatal{false});
×
827
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
828
    // to make sure the websocket URL is correct
829
    if (!m_server_endpoint.is_verified) {
×
830
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
831
    }
×
832
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
833
}
×
834

835

836
void Connection::handle_connection_established()
837
{
3,430✔
838
    // Cancel connect timeout watchdog
1,702✔
839
    m_connect_timer.reset();
3,430✔
840

1,702✔
841
    m_state = ConnectionState::connected;
3,430✔
842
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,430✔
843

1,702✔
844
    milliseconds_type now = monotonic_clock_now();
3,430✔
845
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,430✔
846
    initiate_ping_delay(now);     // Throws
3,430✔
847

1,702✔
848
    bool fast_reconnect = false;
3,430✔
849
    if (m_disconnect_has_occurred) {
3,430✔
850
        milliseconds_type time = now - m_disconnect_time;
964✔
851
        if (time <= m_client.m_fast_reconnect_limit)
964✔
852
            fast_reconnect = true;
964✔
853
    }
964✔
854

1,702✔
855
    for (auto& p : m_sessions) {
4,502✔
856
        Session& sess = *p.second;
4,502✔
857
        sess.connection_established(fast_reconnect); // Throws
4,502✔
858
    }
4,502✔
859

1,702✔
860
    report_connection_state_change(ConnectionState::connected); // Throws
3,430✔
861
}
3,430✔
862

863

864
void Connection::schedule_urgent_ping()
865
{
222✔
866
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
222✔
867
    if (m_ping_delay_in_progress) {
222✔
868
        m_heartbeat_timer.reset();
206✔
869
        m_ping_delay_in_progress = false;
206✔
870
        m_minimize_next_ping_delay = true;
206✔
871
        milliseconds_type now = monotonic_clock_now();
206✔
872
        initiate_ping_delay(now); // Throws
206✔
873
        return;
206✔
874
    }
206✔
875
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
16✔
876
    if (!m_send_ping)
16✔
877
        m_minimize_next_ping_delay = true;
16✔
878
}
16✔
879

880

881
void Connection::initiate_ping_delay(milliseconds_type now)
882
{
3,880✔
883
    REALM_ASSERT(!m_ping_delay_in_progress);
3,880✔
884
    REALM_ASSERT(!m_waiting_for_pong);
3,880✔
885
    REALM_ASSERT(!m_send_ping);
3,880✔
886

1,916✔
887
    milliseconds_type delay = 0;
3,880✔
888
    if (!m_minimize_next_ping_delay) {
3,880✔
889
        delay = m_client.m_ping_keepalive_period;
3,662✔
890
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,810✔
891
        // the first PING message to be sent since the connection was
1,810✔
892
        // established. The purpose of this randomized deduction is to reduce
1,810✔
893
        // the risk of many connections sending PING messages simultaneously to
1,810✔
894
        // the server.
1,810✔
895
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,536✔
896
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,662✔
897
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,662✔
898
        delay -= randomized_deduction;
3,662✔
899
        // Deduct the time spent waiting for PONG
1,810✔
900
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,662✔
901
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,662✔
902
        if (spent_time < delay) {
3,662✔
903
            delay -= spent_time;
3,654✔
904
        }
3,654✔
905
        else {
8✔
906
            delay = 0;
8✔
907
        }
8✔
908
    }
3,662✔
909
    else {
218✔
910
        m_minimize_next_ping_delay = false;
218✔
911
    }
218✔
912

1,916✔
913

1,916✔
914
    m_ping_delay_in_progress = true;
3,880✔
915

1,916✔
916
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,880✔
917
        if (status == ErrorCodes::OperationAborted)
3,880✔
918
            return;
3,612✔
919
        else if (!status.is_ok())
268✔
920
            throw Exception(status);
×
921

126✔
922
        handle_ping_delay();                                    // Throws
268✔
923
    });                                                         // Throws
268✔
924
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,880✔
925
}
3,880✔
926

927

928
void Connection::handle_ping_delay()
929
{
266✔
930
    REALM_ASSERT(m_ping_delay_in_progress);
266✔
931
    m_ping_delay_in_progress = false;
266✔
932
    m_send_ping = true;
266✔
933

126✔
934
    initiate_pong_timeout(); // Throws
266✔
935

126✔
936
    if (m_state == ConnectionState::connected && !m_sending)
266✔
937
        send_next_message(); // Throws
226✔
938
}
266✔
939

940

941
void Connection::initiate_pong_timeout()
942
{
266✔
943
    REALM_ASSERT(!m_ping_delay_in_progress);
266✔
944
    REALM_ASSERT(!m_waiting_for_pong);
266✔
945
    REALM_ASSERT(m_send_ping);
266✔
946

126✔
947
    m_waiting_for_pong = true;
266✔
948
    m_pong_wait_started_at = monotonic_clock_now();
266✔
949

126✔
950
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
266✔
951
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
266✔
952
        if (status == ErrorCodes::OperationAborted)
266✔
953
            return;
254✔
954
        else if (!status.is_ok())
12✔
955
            throw Exception(status);
×
956

6✔
957
        handle_pong_timeout(); // Throws
12✔
958
    });                        // Throws
12✔
959
}
266✔
960

961

962
void Connection::handle_pong_timeout()
963
{
12✔
964
    REALM_ASSERT(m_waiting_for_pong);
12✔
965
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
966
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
967
                                 ConnectionTerminationReason::pong_timeout);
12✔
968
}
12✔
969

970

971
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
972
{
97,918✔
973
    // Stop sending messages if an websocket error was received.
49,140✔
974
    if (m_websocket_error_received)
97,918✔
975
        return;
×
976

49,140✔
977
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
97,918✔
978
        if (sentinel->destroyed) {
97,812✔
979
            return;
1,480✔
980
        }
1,480✔
981
        if (!status.is_ok()) {
96,332✔
982
            if (status != ErrorCodes::Error::OperationAborted) {
×
983
                // Write errors will be handled by the websocket_write_error_handler() callback
984
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
985
            }
×
986
            return;
×
987
        }
×
988
        handle_write_message(); // Throws
96,332✔
989
    });                         // Throws
96,332✔
990
    m_sending_session = sess;
97,918✔
991
    m_sending = true;
97,918✔
992
}
97,918✔
993

994

995
void Connection::handle_write_message()
996
{
96,332✔
997
    m_sending_session->message_sent(); // Throws
96,332✔
998
    if (m_sending_session->m_state == Session::Deactivated) {
96,332✔
999
        finish_session_deactivation(m_sending_session);
126✔
1000
    }
126✔
1001
    m_sending_session = nullptr;
96,332✔
1002
    m_sending = false;
96,332✔
1003
    send_next_message(); // Throws
96,332✔
1004
}
96,332✔
1005

1006

1007
void Connection::send_next_message()
1008
{
155,992✔
1009
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
155,992✔
1010
    REALM_ASSERT(!m_sending_session);
155,992✔
1011
    REALM_ASSERT(!m_sending);
155,992✔
1012
    if (m_send_ping) {
155,992✔
1013
        send_ping(); // Throws
254✔
1014
        return;
254✔
1015
    }
254✔
1016
    while (!m_sessions_enlisted_to_send.empty()) {
217,380✔
1017
        // The state of being connected is not supposed to be able to change
79,886✔
1018
        // across this loop thanks to the "no callback reentrance" guarantee
79,886✔
1019
        // provided by Websocket::async_write_text(), and friends.
79,886✔
1020
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
159,820✔
1021

79,886✔
1022
        Session& sess = *m_sessions_enlisted_to_send.front();
159,820✔
1023
        m_sessions_enlisted_to_send.pop_front();
159,820✔
1024
        sess.send_message(); // Throws
159,820✔
1025

79,886✔
1026
        if (sess.m_state == Session::Deactivated) {
159,820✔
1027
            finish_session_deactivation(&sess);
2,652✔
1028
        }
2,652✔
1029

79,886✔
1030
        // An enlisted session may choose to not send a message. In that case,
79,886✔
1031
        // we should pass the opportunity to the next enlisted session.
79,886✔
1032
        if (m_sending)
159,820✔
1033
            break;
98,178✔
1034
    }
159,820✔
1035
}
155,738✔
1036

1037

1038
void Connection::send_ping()
1039
{
254✔
1040
    REALM_ASSERT(!m_ping_delay_in_progress);
254✔
1041
    REALM_ASSERT(m_waiting_for_pong);
254✔
1042
    REALM_ASSERT(m_send_ping);
254✔
1043

120✔
1044
    m_send_ping = false;
254✔
1045
    if (m_reconnect_info.scheduled_reset)
254✔
1046
        m_ping_after_scheduled_reset_of_reconnect_info = true;
206✔
1047

120✔
1048
    m_last_ping_sent_at = monotonic_clock_now();
254✔
1049
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
254✔
1050
                 m_previous_ping_rtt); // Throws
254✔
1051

120✔
1052
    ClientProtocol& protocol = get_client_protocol();
254✔
1053
    OutputBuffer& out = get_output_buffer();
254✔
1054
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
254✔
1055
    initiate_write_ping(out);                                          // Throws
254✔
1056
    m_ping_sent = true;
254✔
1057
}
254✔
1058

1059

1060
void Connection::initiate_write_ping(const OutputBuffer& out)
1061
{
254✔
1062
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
254✔
1063
        if (sentinel->destroyed) {
254✔
1064
            return;
×
1065
        }
×
1066
        if (!status.is_ok()) {
254✔
1067
            if (status != ErrorCodes::Error::OperationAborted) {
×
1068
                // Write errors will be handled by the websocket_write_error_handler() callback
1069
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1070
            }
×
1071
            return;
×
1072
        }
×
1073
        handle_write_ping(); // Throws
254✔
1074
    });                      // Throws
254✔
1075
    m_sending = true;
254✔
1076
}
254✔
1077

1078

1079
void Connection::handle_write_ping()
1080
{
254✔
1081
    REALM_ASSERT(m_sending);
254✔
1082
    REALM_ASSERT(!m_sending_session);
254✔
1083
    m_sending = false;
254✔
1084
    send_next_message(); // Throws
254✔
1085
}
254✔
1086

1087

1088
void Connection::handle_message_received(util::Span<const char> data)
1089
{
76,058✔
1090
    // parse_message_received() parses the message and calls the proper handler
38,906✔
1091
    // on the Connection object (this).
38,906✔
1092
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
76,058✔
1093
}
76,058✔
1094

1095

1096
void Connection::initiate_disconnect_wait()
1097
{
4,566✔
1098
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,566✔
1099

2,156✔
1100
    if (m_disconnect_delay_in_progress) {
4,566✔
1101
        m_reconnect_disconnect_timer.reset();
2,116✔
1102
        m_disconnect_delay_in_progress = false;
2,116✔
1103
    }
2,116✔
1104

2,156✔
1105
    milliseconds_type time = m_client.m_connection_linger_time;
4,566✔
1106

2,156✔
1107
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,566✔
1108
        // If the operation is aborted, the connection object may have been
2,156✔
1109
        // destroyed.
2,156✔
1110
        if (status != ErrorCodes::OperationAborted)
4,566✔
1111
            handle_disconnect_wait(status); // Throws
12✔
1112
    });                                     // Throws
4,566✔
1113
    m_disconnect_delay_in_progress = true;
4,566✔
1114
}
4,566✔
1115

1116

1117
void Connection::handle_disconnect_wait(Status status)
1118
{
12✔
1119
    if (!status.is_ok()) {
12✔
1120
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1121
        throw Exception(status);
×
1122
    }
×
1123

6✔
1124
    m_disconnect_delay_in_progress = false;
12✔
1125

6✔
1126
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1127
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1128
        if (m_client.m_connection_linger_time > 0)
12✔
UNCOV
1129
            logger.detail("Linger time expired"); // Throws
×
1130
        voluntary_disconnect();                   // Throws
12✔
1131
        logger.info("Disconnected");              // Throws
12✔
1132
    }
12✔
1133
}
12✔
1134

1135

1136
void Connection::close_due_to_protocol_error(Status status)
1137
{
16✔
1138
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1139
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1140
    involuntary_disconnect(std::move(error_info),
16✔
1141
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1142
}
16✔
1143

1144

1145
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1146
{
426✔
1147
    logger.info("Connection closed due to error: %1", status); // Throws
426✔
1148

254✔
1149
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
426✔
1150
}
426✔
1151

1152

1153
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1154
{
548✔
1155
    logger.info("Connection closed due to transient error: %1", status); // Throws
548✔
1156
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
548✔
1157
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
548✔
1158

294✔
1159
    involuntary_disconnect(std::move(error_info), reason); // Throw
548✔
1160
}
548✔
1161

1162

1163
// Close connection due to error discovered on the server-side, and then
1164
// reported to the client by way of a connection-level ERROR message.
1165
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1166
{
70✔
1167
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
70✔
1168
                int(error_code)); // Throws
70✔
1169

34✔
1170
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
44✔
1171
                                      : ConnectionTerminationReason::server_said_try_again_later;
60✔
1172
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
70✔
1173
                           reason); // Throws
70✔
1174
}
70✔
1175

1176

1177
void Connection::disconnect(const SessionErrorInfo& info)
1178
{
3,606✔
1179
    // Cancel connect timeout watchdog
1,792✔
1180
    m_connect_timer.reset();
3,606✔
1181

1,792✔
1182
    if (m_state == ConnectionState::connected) {
3,606✔
1183
        m_disconnect_time = monotonic_clock_now();
3,428✔
1184
        m_disconnect_has_occurred = true;
3,428✔
1185

1,702✔
1186
        // Sessions that are in the Deactivating state at this time can be
1,702✔
1187
        // immediately discarded, in part because they are no longer enlisted to
1,702✔
1188
        // send. Such sessions will be taken to the Deactivated state by
1,702✔
1189
        // Session::connection_lost(), and then they will be removed from
1,702✔
1190
        // `m_sessions`.
1,702✔
1191
        auto i = m_sessions.begin(), end = m_sessions.end();
3,428✔
1192
        while (i != end) {
7,366✔
1193
            // Prevent invalidation of the main iterator when erasing elements
2,040✔
1194
            auto j = i++;
3,938✔
1195
            Session& sess = *j->second;
3,938✔
1196
            sess.connection_lost(); // Throws
3,938✔
1197
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,938✔
1198
                m_sessions.erase(j);
1,908✔
1199
        }
3,938✔
1200
    }
3,428✔
1201

1,792✔
1202
    change_state_to_disconnected();
3,606✔
1203

1,792✔
1204
    m_ping_delay_in_progress = false;
3,606✔
1205
    m_waiting_for_pong = false;
3,606✔
1206
    m_send_ping = false;
3,606✔
1207
    m_minimize_next_ping_delay = false;
3,606✔
1208
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,606✔
1209
    m_ping_sent = false;
3,606✔
1210
    m_heartbeat_timer.reset();
3,606✔
1211
    m_previous_ping_rtt = 0;
3,606✔
1212

1,792✔
1213
    m_websocket_sentinel->destroyed = true;
3,606✔
1214
    m_websocket_sentinel.reset();
3,606✔
1215
    m_websocket.reset();
3,606✔
1216
    m_input_body_buffer.reset();
3,606✔
1217
    m_sending_session = nullptr;
3,606✔
1218
    m_sessions_enlisted_to_send.clear();
3,606✔
1219
    m_sending = false;
3,606✔
1220

1,792✔
1221
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,606✔
1222
    initiate_reconnect_wait();                                           // Throws
3,606✔
1223
}
3,606✔
1224

1225
bool Connection::is_flx_sync_connection() const noexcept
1226
{
106,072✔
1227
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
106,072✔
1228
}
106,072✔
1229

1230
void Connection::receive_pong(milliseconds_type timestamp)
1231
{
244✔
1232
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
244✔
1233

116✔
1234
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
244✔
1235
    if (REALM_UNLIKELY(!legal_at_this_time)) {
244✔
1236
        close_due_to_protocol_error(
×
1237
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1238
        return;
×
1239
    }
×
1240

116✔
1241
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
244✔
1242
        close_due_to_protocol_error(
×
1243
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1244
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1245
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1246
        return;
×
1247
    }
×
1248

116✔
1249
    milliseconds_type now = monotonic_clock_now();
244✔
1250
    milliseconds_type round_trip_time = now - timestamp;
244✔
1251
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
244✔
1252
    m_previous_ping_rtt = round_trip_time;
244✔
1253

116✔
1254
    // If this PONG message is a response to a PING mesage that was sent after
116✔
1255
    // the last invocation of cancel_reconnect_delay(), then the connection is
116✔
1256
    // still good, and we do not have to skip the next reconnect delay.
116✔
1257
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
244✔
1258
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
194✔
1259
        m_ping_after_scheduled_reset_of_reconnect_info = false;
194✔
1260
        m_reconnect_info.scheduled_reset = false;
194✔
1261
    }
194✔
1262

116✔
1263
    m_heartbeat_timer.reset();
244✔
1264
    m_waiting_for_pong = false;
244✔
1265

116✔
1266
    initiate_ping_delay(now); // Throws
244✔
1267

116✔
1268
    if (m_client.m_roundtrip_time_handler)
244✔
1269
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1270
}
244✔
1271

1272
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1273
{
69,858✔
1274
    if (session_ident == 0) {
69,858✔
1275
        return nullptr;
×
1276
    }
×
1277

35,778✔
1278
    auto* sess = get_session(session_ident);
69,858✔
1279
    if (REALM_LIKELY(sess)) {
69,860✔
1280
        return sess;
69,860✔
1281
    }
69,860✔
1282
    // Check the history to see if the message received was for a previous session
2,147,483,647✔
1283
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,647!
1284
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1285
        close_due_to_protocol_error(
×
1286
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1287
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1288
                          session_ident)});
×
1289
    }
×
1290
    else {
2,147,483,647✔
1291
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,647✔
1292
                     session_ident); // Throws
2,147,483,647✔
1293
    }
2,147,483,647✔
1294
    return nullptr;
2,147,483,647✔
1295
}
2,147,483,647✔
1296

1297
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1298
{
766✔
1299
    Session* sess = nullptr;
766✔
1300
    if (session_ident != 0) {
766✔
1301
        sess = find_and_validate_session(session_ident, "ERROR");
692✔
1302
        if (REALM_UNLIKELY(!sess)) {
692✔
1303
            return;
×
1304
        }
×
1305
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
692✔
1306
            close_due_to_protocol_error(std::move(status)); // Throws
×
1307
            return;
×
1308
        }
×
1309

344✔
1310
        if (sess->m_state == Session::Deactivated) {
692✔
1311
            finish_session_deactivation(sess);
2✔
1312
        }
2✔
1313
        return;
692✔
1314
    }
692✔
1315

36✔
1316
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
74✔
1317
                info.message, info.raw_error_code, info.is_fatal, session_ident,
74✔
1318
                info.server_requests_action); // Throws
74✔
1319

36✔
1320
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
74✔
1321
    if (REALM_LIKELY(known_error_code)) {
74✔
1322
        ProtocolError error_code = ProtocolError(info.raw_error_code);
70✔
1323
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
70✔
1324
            close_due_to_server_side_error(error_code, info); // Throws
70✔
1325
            return;
70✔
1326
        }
70✔
1327
        close_due_to_protocol_error(
×
1328
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1329
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1330
                          info.raw_error_code)});
×
1331
    }
×
1332
    else {
4✔
1333
        close_due_to_protocol_error(
4✔
1334
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1335
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1336
    }
4✔
1337
}
74✔
1338

1339

1340
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1341
                                             session_ident_type session_ident)
1342
{
20✔
1343
    if (session_ident == 0) {
20✔
1344
        return close_due_to_protocol_error(
×
1345
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1346
    }
×
1347

10✔
1348
    if (!is_flx_sync_connection()) {
20✔
1349
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1350
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1351
    }
×
1352

10✔
1353
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1354
    if (REALM_UNLIKELY(!sess)) {
20✔
1355
        return;
×
1356
    }
×
1357

10✔
1358
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1359
        close_due_to_protocol_error(std::move(status));
×
1360
    }
×
1361
}
20✔
1362

1363

1364
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1365
{
3,658✔
1366
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,658✔
1367
    if (REALM_UNLIKELY(!sess)) {
3,658✔
1368
        return;
×
1369
    }
×
1370

1,734✔
1371
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,658✔
1372
        close_due_to_protocol_error(std::move(status)); // Throws
×
1373
}
3,658✔
1374

1375
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1376
{
44,622✔
1377
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
44,622✔
1378
    if (REALM_UNLIKELY(!sess)) {
44,622✔
1379
        return;
×
1380
    }
×
1381

23,512✔
1382
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
44,622✔
1383
        close_due_to_protocol_error(std::move(status));
×
1384
    }
×
1385
}
44,622✔
1386

1387
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1388
{
16,436✔
1389
    Session* sess = find_and_validate_session(session_ident, "MARK");
16,436✔
1390
    if (REALM_UNLIKELY(!sess)) {
16,436✔
1391
        return;
×
1392
    }
×
1393

8,132✔
1394
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
16,436✔
1395
        close_due_to_protocol_error(std::move(status)); // Throws
12✔
1396
}
16,436✔
1397

1398

1399
void Connection::receive_unbound_message(session_ident_type session_ident)
1400
{
4,380✔
1401
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
4,380✔
1402
    if (REALM_UNLIKELY(!sess)) {
4,380✔
1403
        return;
×
1404
    }
×
1405

2,020✔
1406
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,380✔
UNCOV
1407
        close_due_to_protocol_error(std::move(status)); // Throws
×
UNCOV
1408
        return;
×
UNCOV
1409
    }
×
1410

2,020✔
1411
    if (sess->m_state == Session::Deactivated) {
4,380✔
1412
        finish_session_deactivation(sess);
4,380✔
1413
    }
4,380✔
1414
}
4,380✔
1415

1416

1417
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1418
                                               std::string_view body)
1419
{
52✔
1420
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
52✔
1421
    if (REALM_UNLIKELY(!sess)) {
52✔
1422
        return;
×
1423
    }
×
1424

26✔
1425
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
52✔
1426
        close_due_to_protocol_error(std::move(status));
×
1427
    }
×
1428
}
52✔
1429

1430

1431
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1432
                                            std::string_view message)
1433
{
5,878✔
1434
    std::string prefix;
5,878✔
1435
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,878✔
1436
        prefix = util::format("Server[%1]", m_appservices_coid);
5,878✔
1437
    }
5,878✔
UNCOV
1438
    else {
×
UNCOV
1439
        prefix = "Server";
×
UNCOV
1440
    }
×
1441

2,972✔
1442
    if (session_ident != 0) {
5,878✔
1443
        if (auto sess = get_session(session_ident)) {
3,956✔
1444
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,956✔
1445
            return;
3,956✔
1446
        }
3,956✔
1447

UNCOV
1448
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
×
UNCOV
1449
                   message);
×
UNCOV
1450
        return;
×
UNCOV
1451
    }
×
1452

960✔
1453
    logger.log(level, "%1 log: %2", prefix, message);
1,922✔
1454
}
1,922✔
1455

1456

1457
void Connection::receive_appservices_request_id(std::string_view coid)
1458
{
5,354✔
1459
    // Only set once per connection
2,664✔
1460
    if (!coid.empty() && m_appservices_coid.empty()) {
5,354✔
1461
        m_appservices_coid = coid;
2,448✔
1462
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,448✔
1463
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,448✔
1464
    }
2,448✔
1465
}
5,354✔
1466

1467

1468
void Connection::handle_protocol_error(Status status)
1469
{
×
1470
    close_due_to_protocol_error(std::move(status));
×
1471
}
×
1472

1473

1474
// Sessions are guaranteed to be granted the opportunity to send a message in
1475
// the order that they enlist. Note that this is important to ensure
1476
// nonoverlapping communication with the server for consecutive sessions
1477
// associated with the same Realm file.
1478
//
1479
// CAUTION: The specified session may get destroyed before this function
1480
// returns, but only if its Session::send_message() puts it into the Deactivated
1481
// state.
1482
void Connection::enlist_to_send(Session* sess)
1483
{
161,512✔
1484
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
161,512✔
1485
    m_sessions_enlisted_to_send.push_back(sess); // Throws
161,512✔
1486
    if (!m_sending)
161,512✔
1487
        send_next_message(); // Throws
59,176✔
1488
}
161,512✔
1489

1490

1491
std::string Connection::get_active_appservices_connection_id()
1492
{
72✔
1493
    return m_appservices_coid;
72✔
1494
}
72✔
1495

1496
void Session::cancel_resumption_delay()
1497
{
3,828✔
1498
    REALM_ASSERT_EX(m_state == Active, m_state);
3,828✔
1499

2,154✔
1500
    if (!m_suspended)
3,828✔
1501
        return;
3,768✔
1502

30✔
1503
    m_suspended = false;
60✔
1504

30✔
1505
    logger.debug("Resumed"); // Throws
60✔
1506

30✔
1507
    if (unbind_process_complete())
60✔
1508
        initiate_rebind(); // Throws
36✔
1509

30✔
1510
    m_conn.one_more_active_unsuspended_session(); // Throws
60✔
1511
    if (m_try_again_activation_timer) {
60✔
1512
        m_try_again_activation_timer.reset();
8✔
1513
    }
8✔
1514

30✔
1515
    on_resumed(); // Throws
60✔
1516
}
60✔
1517

1518

1519
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1520
                                                 std::vector<ProtocolErrorInfo>* out)
1521
{
20,822✔
1522
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
20,822✔
1523
        return;
20,780✔
1524
    }
20,780✔
1525

22✔
1526
#ifdef REALM_DEBUG
42✔
1527
    REALM_ASSERT_DEBUG(
42✔
1528
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
42✔
1529
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
42✔
1530
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
42✔
1531
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
42✔
1532
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
42✔
1533
                       }));
42✔
1534
#endif
42✔
1535

22✔
1536
    while (!m_pending_compensating_write_errors.empty() &&
86✔
1537
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1538
               changesets.back().version) {
44✔
1539
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1540
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1541
        out->push_back(std::move(cur_error));
44✔
1542
        m_pending_compensating_write_errors.pop_front();
44✔
1543
    }
44✔
1544
}
42✔
1545

1546

1547
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1548
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1549
                                   DownloadBatchState download_batch_state)
1550
{
42,008✔
1551
    auto& history = get_history();
42,008✔
1552
    if (received_changesets.empty()) {
42,008✔
1553
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,162✔
1554
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1555
                                       "received empty download message that was not the last in batch",
×
1556
                                       ProtocolError::bad_progress);
×
1557
        }
×
1558
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
21,162✔
1559
        return;
21,162✔
1560
    }
21,162✔
1561

10,996✔
1562
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
20,846✔
1563
    auto transact = get_db()->start_read();
20,846✔
1564
    history.integrate_server_changesets(
20,846✔
1565
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
20,846✔
1566
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
20,834✔
1567
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
20,822✔
1568
        }); // Throws
20,822✔
1569
    if (received_changesets.size() == 1) {
20,846✔
1570
        logger.debug("1 remote changeset integrated, producing client version %1",
14,918✔
1571
                     version_info.sync_version.version); // Throws
14,918✔
1572
    }
14,918✔
1573
    else {
5,928✔
1574
        logger.debug("%2 remote changesets integrated, producing client version %1",
5,928✔
1575
                     version_info.sync_version.version, received_changesets.size()); // Throws
5,928✔
1576
    }
5,928✔
1577

10,996✔
1578
    for (const auto& pending_error : pending_compensating_write_errors) {
11,018✔
1579
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1580
                    pending_error.compensating_write_rejected_client_version,
44✔
1581
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1582
        try {
44✔
1583
            on_connection_state_changed(
44✔
1584
                m_conn.get_state(),
44✔
1585
                SessionErrorInfo{pending_error,
44✔
1586
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1587
                                                          pending_error.message)});
44✔
1588
        }
44✔
1589
        catch (...) {
22✔
1590
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1591
        }
×
1592
    }
44✔
1593
}
20,846✔
1594

1595

1596
void Session::on_integration_failure(const IntegrationException& error)
1597
{
40✔
1598
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1599
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1600
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1601

20✔
1602
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1603
    m_error_to_send = true;
40✔
1604
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1605
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1606
    // Surface the error to the user otherwise is lost.
20✔
1607
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1608

20✔
1609
    // Since the deactivation process has not been initiated, the UNBIND
20✔
1610
    // message cannot have been sent unless an ERROR message was received.
20✔
1611
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1612
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1613
        ensure_enlisted_to_send(); // Throws
36✔
1614
    }
36✔
1615
}
40✔
1616

1617
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress,
1618
                                       bool changesets_integrated)
1619
{
43,844✔
1620
    REALM_ASSERT_EX(m_state == Active, m_state);
43,844✔
1621
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
43,844✔
1622
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
43,844✔
1623

23,082✔
1624
    m_download_progress = progress.download;
43,844✔
1625
    m_progress = progress;
43,844✔
1626

23,082✔
1627
    if (upload_progressed) {
43,844✔
1628
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
32,518✔
1629
            if (progress.upload.client_version > m_upload_progress.client_version)
14,018✔
1630
                m_upload_progress = progress.upload;
904✔
1631
            m_last_version_selected_for_upload = progress.upload.client_version;
14,018✔
1632
        }
14,018✔
1633

16,824✔
1634
        notify_upload_progress();
32,518✔
1635
        check_for_upload_completion();
32,518✔
1636
    }
32,518✔
1637

23,082✔
1638
    bool resume_upload = do_recognize_sync_version(client_version); // Allows upload process to resume
43,844✔
1639

23,082✔
1640
    // notify also when final DOWNLOAD received with no changesets
23,082✔
1641
    bool download_progressed = changesets_integrated || (!upload_progressed && resume_upload);
43,844✔
1642
    if (download_progressed)
43,844✔
1643
        notify_download_progress();
24,636✔
1644

23,082✔
1645
    check_for_download_completion(); // Throws
43,844✔
1646

23,082✔
1647
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,082✔
1648
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
43,844✔
1649
        auto& flx_subscription_store = *get_flx_subscription_store();
3,188✔
1650
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,188✔
1651
    }
3,188✔
1652

23,082✔
1653
    // Since the deactivation process has not been initiated, the UNBIND
23,082✔
1654
    // message cannot have been sent unless an ERROR message was received.
23,082✔
1655
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
43,844✔
1656
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
43,844✔
1657
        ensure_enlisted_to_send(); // Throws
43,842✔
1658
    }
43,842✔
1659
}
43,844✔
1660

1661

1662
Session::~Session()
1663
{
10,252✔
1664
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,950✔
1665
}
10,252✔
1666

1667

1668
std::string Session::make_logger_prefix(session_ident_type ident)
1669
{
10,250✔
1670
    std::ostringstream out;
10,250✔
1671
    out.imbue(std::locale::classic());
10,250✔
1672
    out << "Session[" << ident << "]: "; // Throws
10,250✔
1673
    return out.str();                    // Throws
10,250✔
1674
}
10,250✔
1675

1676

1677
void Session::activate()
1678
{
10,244✔
1679
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,244✔
1680

4,950✔
1681
    logger.debug("Activating"); // Throws
10,244✔
1682

4,950✔
1683
    bool has_pending_client_reset = false;
10,244✔
1684
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,252✔
1685
        bool file_exists = util::File::exists(get_realm_path());
10,252✔
1686
        m_performing_client_reset = get_client_reset_config().has_value();
10,252✔
1687

4,950✔
1688
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
10,252✔
1689
        if (!m_performing_client_reset) {
10,252✔
1690
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,884✔
1691
                                     &has_pending_client_reset); // Throws
9,884✔
1692
        }
9,884✔
1693
    }
10,252✔
1694
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,244✔
1695
                 m_client_file_ident.salt); // Throws
10,244✔
1696
    m_upload_progress = m_progress.upload;
10,244✔
1697
    m_last_version_selected_for_upload = m_upload_progress.client_version;
10,244✔
1698
    m_download_progress = m_progress.download;
10,244✔
1699
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,244✔
1700
    init_progress_handler();
10,244✔
1701

4,950✔
1702
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
10,244✔
1703
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,244✔
1704
    logger.debug("progress_download_client_version = %1",
10,244✔
1705
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,244✔
1706
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,244✔
1707
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,244✔
1708

4,950✔
1709
    reset_protocol_state();
10,244✔
1710
    m_state = Active;
10,244✔
1711

4,950✔
1712
    call_debug_hook(SyncClientHookEvent::SessionActivating, m_progress, m_last_sent_flx_query_version,
10,244✔
1713
                    DownloadBatchState::SteadyState, 0);
10,244✔
1714

4,950✔
1715
    REALM_ASSERT(!m_suspended);
10,244✔
1716
    m_conn.one_more_active_unsuspended_session(); // Throws
10,244✔
1717

4,950✔
1718
    try {
10,244✔
1719
        process_pending_flx_bootstrap();
10,244✔
1720
    }
10,244✔
1721
    catch (const IntegrationException& error) {
4,950✔
1722
        on_integration_failure(error);
×
1723
    }
×
1724
    catch (...) {
4,952✔
1725
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1726
    }
4✔
1727

4,950✔
1728
    if (has_pending_client_reset) {
10,252✔
1729
        handle_pending_client_reset_acknowledgement();
20✔
1730
    }
20✔
1731
}
10,252✔
1732

1733

1734
// The caller (Connection) must discard the session if the session has become
1735
// deactivated upon return.
1736
void Session::initiate_deactivation()
1737
{
10,252✔
1738
    REALM_ASSERT_EX(m_state == Active, m_state);
10,252✔
1739

4,950✔
1740
    logger.debug("Initiating deactivation"); // Throws
10,252✔
1741

4,950✔
1742
    m_state = Deactivating;
10,252✔
1743

4,950✔
1744
    if (!m_suspended)
10,252✔
1745
        m_conn.one_less_active_unsuspended_session(); // Throws
9,652✔
1746

4,950✔
1747
    if (m_enlisted_to_send) {
10,252✔
1748
        REALM_ASSERT(!unbind_process_complete());
4,982✔
1749
        return;
4,982✔
1750
    }
4,982✔
1751

2,482✔
1752
    // Deactivate immediately if the BIND message has not yet been sent and the
2,482✔
1753
    // session is not enlisted to send, or if the unbinding process has already
2,482✔
1754
    // completed.
2,482✔
1755
    if (!m_bind_message_sent || unbind_process_complete()) {
5,270✔
1756
        complete_deactivation(); // Throws
1,184✔
1757
        // Life cycle state is now Deactivated
630✔
1758
        return;
1,184✔
1759
    }
1,184✔
1760

1,852✔
1761
    // Ready to send the UNBIND message, if it has not already been sent
1,852✔
1762
    if (!m_unbind_message_sent) {
4,086✔
1763
        enlist_to_send(); // Throws
3,872✔
1764
        return;
3,872✔
1765
    }
3,872✔
1766
}
4,086✔
1767

1768

1769
void Session::complete_deactivation()
1770
{
10,252✔
1771
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,252✔
1772
    m_state = Deactivated;
10,252✔
1773

4,950✔
1774
    logger.debug("Deactivation completed"); // Throws
10,252✔
1775
}
10,252✔
1776

1777

1778
// Called by the associated Connection object when this session is granted an
1779
// opportunity to send a message.
1780
//
1781
// The caller (Connection) must discard the session if the session has become
1782
// deactivated upon return.
1783
void Session::send_message()
1784
{
159,824✔
1785
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
159,824✔
1786
    REALM_ASSERT(m_enlisted_to_send);
159,824✔
1787
    m_enlisted_to_send = false;
159,824✔
1788
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
159,824✔
1789
        // Deactivation has been initiated. If the UNBIND message has not been
4,310✔
1790
        // sent yet, there is no point in sending it. Instead, we can let the
4,310✔
1791
        // deactivation process complete.
4,310✔
1792
        if (!m_bind_message_sent) {
9,038✔
1793
            return complete_deactivation(); // Throws
2,652✔
1794
            // Life cycle state is now Deactivated
1,330✔
1795
        }
2,652✔
1796

2,980✔
1797
        // Session life cycle state is Deactivating or the unbinding process has
2,980✔
1798
        // been initiated by a session specific ERROR message
2,980✔
1799
        if (!m_unbind_message_sent)
6,386✔
1800
            send_unbind_message(); // Throws
6,386✔
1801
        return;
6,386✔
1802
    }
6,386✔
1803

75,576✔
1804
    // Session life cycle state is Active and the unbinding process has
75,576✔
1805
    // not been initiated
75,576✔
1806
    REALM_ASSERT(!m_unbind_message_sent);
150,786✔
1807

75,576✔
1808
    if (!m_bind_message_sent)
150,786✔
1809
        return send_bind_message(); // Throws
8,764✔
1810

71,272✔
1811
    if (!m_ident_message_sent) {
142,022✔
1812
        if (have_client_file_ident())
7,204✔
1813
            send_ident_message(); // Throws
7,204✔
1814
        return;
7,204✔
1815
    }
7,204✔
1816

67,640✔
1817
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
134,818✔
1818
                                                      [](const PendingTestCommand& command) {
67,710✔
1819
                                                          return command.pending;
124✔
1820
                                                      });
124✔
1821
    if (has_pending_test_command) {
134,818✔
1822
        return send_test_command_message();
52✔
1823
    }
52✔
1824

67,614✔
1825
    if (m_error_to_send)
134,766✔
1826
        return send_json_error_message(); // Throws
34✔
1827

67,596✔
1828
    // Stop sending upload, mark and query messages when the client detects an error.
67,596✔
1829
    if (m_client_error) {
134,732✔
1830
        return;
16✔
1831
    }
16✔
1832

67,588✔
1833
    if (m_target_download_mark > m_last_download_mark_sent)
134,716✔
1834
        return send_mark_message(); // Throws
17,222✔
1835

59,058✔
1836
    auto is_upload_allowed = [&]() -> bool {
117,504✔
1837
        if (!m_is_flx_sync_session) {
117,504✔
1838
            return true;
102,504✔
1839
        }
102,504✔
1840

7,650✔
1841
        auto migration_store = get_migration_store();
15,000✔
1842
        if (!migration_store) {
15,000✔
1843
            return true;
×
1844
        }
×
1845

7,650✔
1846
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
15,000✔
1847
        if (!sentinel_query_version) {
15,000✔
1848
            return true;
14,972✔
1849
        }
14,972✔
1850

14✔
1851
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
14✔
1852
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1853
    };
28✔
1854

59,058✔
1855
    if (!is_upload_allowed()) {
117,494✔
1856
        return;
16✔
1857
    }
16✔
1858

59,050✔
1859
    auto check_pending_flx_version = [&]() -> bool {
117,488✔
1860
        if (!m_is_flx_sync_session) {
117,488✔
1861
            return false;
102,504✔
1862
        }
102,504✔
1863

7,642✔
1864
        if (!m_allow_upload) {
14,984✔
1865
            return false;
3,228✔
1866
        }
3,228✔
1867

5,946✔
1868
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
11,756✔
1869

5,946✔
1870
        if (!m_pending_flx_sub_set) {
11,756✔
1871
            return false;
9,800✔
1872
        }
9,800✔
1873

976✔
1874
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,956✔
1875
    };
1,956✔
1876

59,050✔
1877
    if (check_pending_flx_version()) {
117,478✔
1878
        return send_query_change_message(); // throws
1,118✔
1879
    }
1,118✔
1880

58,492✔
1881
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
116,360✔
1882
        return send_upload_message(); // Throws
57,400✔
1883
    }
57,400✔
1884
}
116,360✔
1885

1886

1887
void Session::send_bind_message()
1888
{
8,764✔
1889
    REALM_ASSERT_EX(m_state == Active, m_state);
8,764✔
1890

4,304✔
1891
    session_ident_type session_ident = m_ident;
8,764✔
1892
    bool need_client_file_ident = !have_client_file_ident();
8,764✔
1893
    const bool is_subserver = false;
8,764✔
1894

4,304✔
1895

4,304✔
1896
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,764✔
1897
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,764✔
1898
    OutputBuffer& out = m_conn.get_output_buffer();
8,764✔
1899
    // Discard the token since it's ignored by the server.
4,304✔
1900
    std::string empty_access_token;
8,764✔
1901
    if (m_is_flx_sync_session) {
8,764✔
1902
        nlohmann::json bind_json_data;
1,442✔
1903
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,442✔
1904
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1905
        }
60✔
1906
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,442✔
1907
        auto schema_version = get_schema_version();
1,442✔
1908
        // Send 0 if schema is not versioned.
724✔
1909
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,440✔
1910
        if (logger.would_log(util::Logger::Level::debug)) {
1,442✔
1911
            std::string json_data_dump;
1,442✔
1912
            if (!bind_json_data.empty()) {
1,442✔
1913
                json_data_dump = bind_json_data.dump();
1,442✔
1914
            }
1,442✔
1915
            logger.debug(
1,442✔
1916
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,442✔
1917
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,442✔
1918
        }
1,442✔
1919
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,442✔
1920
                                       need_client_file_ident, is_subserver); // Throws
1,442✔
1921
    }
1,442✔
1922
    else {
7,322✔
1923
        std::string server_path = get_virt_path();
7,322✔
1924
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,322✔
1925
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,322✔
1926
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,322✔
1927
                                       need_client_file_ident, is_subserver); // Throws
7,322✔
1928
    }
7,322✔
1929
    m_conn.initiate_write_message(out, this); // Throws
8,764✔
1930

4,304✔
1931
    m_bind_message_sent = true;
8,764✔
1932
    call_debug_hook(SyncClientHookEvent::BindMessageSent, m_progress, m_last_sent_flx_query_version,
8,764✔
1933
                    DownloadBatchState::SteadyState, 0);
8,764✔
1934

4,304✔
1935
    // Ready to send the IDENT message if the file identifier pair is already
4,304✔
1936
    // available.
4,304✔
1937
    if (!need_client_file_ident)
8,764✔
1938
        enlist_to_send(); // Throws
3,776✔
1939
}
8,764✔
1940

1941

1942
void Session::send_ident_message()
1943
{
7,204✔
1944
    REALM_ASSERT_EX(m_state == Active, m_state);
7,204✔
1945
    REALM_ASSERT(m_bind_message_sent);
7,204✔
1946
    REALM_ASSERT(!m_unbind_message_sent);
7,204✔
1947
    REALM_ASSERT(have_client_file_ident());
7,204✔
1948

3,632✔
1949

3,632✔
1950
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,204✔
1951
    OutputBuffer& out = m_conn.get_output_buffer();
7,204✔
1952
    session_ident_type session_ident = m_ident;
7,204✔
1953

3,632✔
1954
    if (m_is_flx_sync_session) {
7,204✔
1955
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,364✔
1956
        const auto active_query_body = active_query_set.to_ext_json();
1,364✔
1957
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,364✔
1958
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,364✔
1959
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,364✔
1960
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,364✔
1961
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,364✔
1962
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,364✔
1963
                     active_query_body); // Throws
1,364✔
1964
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,364✔
1965
                                        active_query_set.version(), active_query_body); // Throws
1,364✔
1966
        m_last_sent_flx_query_version = active_query_set.version();
1,364✔
1967
    }
1,364✔
1968
    else {
5,840✔
1969
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,840✔
1970
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,840✔
1971
                     "latest_server_version_salt=%6)",
5,840✔
1972
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,840✔
1973
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,840✔
1974
                     m_progress.latest_server_version.salt);                                  // Throws
5,840✔
1975
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,840✔
1976
    }
5,840✔
1977
    m_conn.initiate_write_message(out, this); // Throws
7,204✔
1978

3,632✔
1979
    m_ident_message_sent = true;
7,204✔
1980

3,632✔
1981
    // Other messages may be waiting to be sent
3,632✔
1982
    enlist_to_send(); // Throws
7,204✔
1983
}
7,204✔
1984

1985
void Session::send_query_change_message()
1986
{
1,118✔
1987
    REALM_ASSERT_EX(m_state == Active, m_state);
1,118✔
1988
    REALM_ASSERT(m_ident_message_sent);
1,118✔
1989
    REALM_ASSERT(!m_unbind_message_sent);
1,118✔
1990
    REALM_ASSERT(m_pending_flx_sub_set);
1,118✔
1991
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,118✔
1992

558✔
1993
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,118✔
1994
        return;
×
1995
    }
×
1996

558✔
1997
    auto sub_store = get_flx_subscription_store();
1,118✔
1998
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,118✔
1999
    auto latest_queries = latest_sub_set.to_ext_json();
1,118✔
2000
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,118✔
2001
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,118✔
2002

558✔
2003
    OutputBuffer& out = m_conn.get_output_buffer();
1,118✔
2004
    session_ident_type session_ident = get_ident();
1,118✔
2005
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,118✔
2006
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,118✔
2007
    m_conn.initiate_write_message(out, this);
1,118✔
2008

558✔
2009
    m_last_sent_flx_query_version = latest_sub_set.version();
1,118✔
2010

558✔
2011
    request_download_completion_notification();
1,118✔
2012
}
1,118✔
2013

2014
void Session::send_upload_message()
2015
{
57,398✔
2016
    REALM_ASSERT_EX(m_state == Active, m_state);
57,398✔
2017
    REALM_ASSERT(m_ident_message_sent);
57,398✔
2018
    REALM_ASSERT(!m_unbind_message_sent);
57,398✔
2019

29,224✔
2020
    if (REALM_UNLIKELY(get_client().is_dry_run()))
57,398✔
2021
        return;
29,224✔
2022

29,224✔
2023
    version_type target_upload_version = m_last_version_available;
57,398✔
2024
    if (m_pending_flx_sub_set) {
57,398✔
2025
        REALM_ASSERT(m_is_flx_sync_session);
838✔
2026
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
838✔
2027
    }
838✔
2028

29,224✔
2029
    std::vector<UploadChangeset> uploadable_changesets;
57,398✔
2030
    version_type locked_server_version = 0;
57,398✔
2031
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
57,398✔
2032
                                             locked_server_version); // Throws
57,398✔
2033

29,224✔
2034
    if (uploadable_changesets.empty()) {
57,398✔
2035
        // Nothing more to upload right now
14,636✔
2036
        check_for_upload_completion(); // Throws
29,736✔
2037
        // If we need to limit upload up to some version other than the last client version available and there are no
14,636✔
2038
        // changes to upload, then there is no need to send an empty message.
14,636✔
2039
        if (m_pending_flx_sub_set) {
29,736✔
2040
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
264✔
2041
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
264✔
2042
            // Other messages may be waiting to be sent
132✔
2043
            return enlist_to_send(); // Throws
264✔
2044
        }
264✔
2045
    }
27,662✔
2046
    else {
27,662✔
2047
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
27,662✔
2048
    }
27,662✔
2049

29,224✔
2050
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
57,266✔
2051
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
572✔
2052
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
572✔
2053
    }
572✔
2054

29,092✔
2055
    version_type progress_client_version = m_upload_progress.client_version;
57,134✔
2056
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
57,134✔
2057

29,092✔
2058
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
57,134✔
2059
                 "locked_server_version=%3, num_changesets=%4)",
57,134✔
2060
                 progress_client_version, progress_server_version, locked_server_version,
57,134✔
2061
                 uploadable_changesets.size()); // Throws
57,134✔
2062

29,092✔
2063
    ClientProtocol& protocol = m_conn.get_client_protocol();
57,134✔
2064
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
57,134✔
2065

29,092✔
2066
    for (const UploadChangeset& uc : uploadable_changesets) {
51,078✔
2067
        logger.debug(util::LogCategory::changeset,
42,670✔
2068
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
42,670✔
2069
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
42,670✔
2070
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
42,670✔
2071
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
42,670✔
2072
        if (logger.would_log(util::Logger::Level::trace)) {
42,670✔
2073
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2074
            if (changeset_data.size() < 1024) {
×
2075
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2076
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2077
            }
×
2078
            else {
×
2079
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2080
                             protocol.compressed_hex_dump(changeset_data));
×
2081
            }
×
2082

2083
#if REALM_DEBUG
×
2084
            ChunkedBinaryInputStream in{changeset_data};
×
2085
            Changeset log;
×
2086
            try {
×
2087
                parse_changeset(in, log);
×
2088
                std::stringstream ss;
×
2089
                log.print(ss);
×
2090
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2091
            }
×
2092
            catch (const BadChangesetError& err) {
×
2093
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2094
            }
×
2095
#endif
×
2096
        }
×
2097

20,684✔
2098
#if 0 // Upload log compaction is currently not implemented
2099
        if (!get_client().m_disable_upload_compaction) {
2100
            ChangesetEncoder::Buffer encode_buffer;
2101

2102
            {
2103
                // Upload compaction only takes place within single changesets to
2104
                // avoid another client seeing inconsistent snapshots.
2105
                ChunkedBinaryInputStream stream{uc.changeset};
2106
                Changeset changeset;
2107
                parse_changeset(stream, changeset); // Throws
2108
                // FIXME: What is the point of setting these? How can compaction care about them?
2109
                changeset.version = uc.progress.client_version;
2110
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2111
                changeset.origin_timestamp = uc.origin_timestamp;
2112
                changeset.origin_file_ident = uc.origin_file_ident;
2113

2114
                compact_changesets(&changeset, 1);
2115
                encode_changeset(changeset, encode_buffer);
2116

2117
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2118
                             encode_buffer.size()); // Throws
2119
            }
2120

2121
            upload_message_builder.add_changeset(
2122
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2123
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2124
        }
2125
        else
2126
#endif
2127
        {
42,670✔
2128
            upload_message_builder.add_changeset(uc.progress.client_version,
42,670✔
2129
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
42,670✔
2130
                                                 uc.origin_file_ident,
42,670✔
2131
                                                 uc.changeset); // Throws
42,670✔
2132
        }
42,670✔
2133
    }
42,670✔
2134

29,092✔
2135
    int protocol_version = m_conn.get_negotiated_protocol_version();
57,134✔
2136
    OutputBuffer& out = m_conn.get_output_buffer();
57,134✔
2137
    session_ident_type session_ident = get_ident();
57,134✔
2138
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
57,134✔
2139
                                               progress_server_version,
57,134✔
2140
                                               locked_server_version); // Throws
57,134✔
2141
    m_conn.initiate_write_message(out, this);                          // Throws
57,134✔
2142

29,092✔
2143
    // Other messages may be waiting to be sent
29,092✔
2144
    enlist_to_send(); // Throws
57,134✔
2145
}
57,134✔
2146

2147

2148
void Session::send_mark_message()
2149
{
17,222✔
2150
    REALM_ASSERT_EX(m_state == Active, m_state);
17,222✔
2151
    REALM_ASSERT(m_ident_message_sent);
17,222✔
2152
    REALM_ASSERT(!m_unbind_message_sent);
17,222✔
2153
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,222✔
2154

8,530✔
2155
    request_ident_type request_ident = m_target_download_mark;
17,222✔
2156
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,222✔
2157

8,530✔
2158
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,222✔
2159
    OutputBuffer& out = m_conn.get_output_buffer();
17,222✔
2160
    session_ident_type session_ident = get_ident();
17,222✔
2161
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,222✔
2162
    m_conn.initiate_write_message(out, this);                      // Throws
17,222✔
2163

8,530✔
2164
    m_last_download_mark_sent = request_ident;
17,222✔
2165

8,530✔
2166
    // Other messages may be waiting to be sent
8,530✔
2167
    enlist_to_send(); // Throws
17,222✔
2168
}
17,222✔
2169

2170

2171
void Session::send_unbind_message()
2172
{
6,386✔
2173
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,386✔
2174
    REALM_ASSERT(m_bind_message_sent);
6,386✔
2175
    REALM_ASSERT(!m_unbind_message_sent);
6,386✔
2176

2,980✔
2177
    logger.debug("Sending: UNBIND"); // Throws
6,386✔
2178

2,980✔
2179
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,386✔
2180
    OutputBuffer& out = m_conn.get_output_buffer();
6,386✔
2181
    session_ident_type session_ident = get_ident();
6,386✔
2182
    protocol.make_unbind_message(out, session_ident); // Throws
6,386✔
2183
    m_conn.initiate_write_message(out, this);         // Throws
6,386✔
2184

2,980✔
2185
    m_unbind_message_sent = true;
6,386✔
2186
}
6,386✔
2187

2188

2189
void Session::send_json_error_message()
2190
{
34✔
2191
    REALM_ASSERT_EX(m_state == Active, m_state);
34✔
2192
    REALM_ASSERT(m_ident_message_sent);
34✔
2193
    REALM_ASSERT(!m_unbind_message_sent);
34✔
2194
    REALM_ASSERT(m_error_to_send);
34✔
2195
    REALM_ASSERT(m_client_error);
34✔
2196

18✔
2197
    ClientProtocol& protocol = m_conn.get_client_protocol();
34✔
2198
    OutputBuffer& out = m_conn.get_output_buffer();
34✔
2199
    session_ident_type session_ident = get_ident();
34✔
2200
    auto protocol_error = m_client_error->error_for_server;
34✔
2201

18✔
2202
    auto message = util::format("%1", m_client_error->to_status());
34✔
2203
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
34✔
2204
                session_ident); // Throws
34✔
2205

18✔
2206
    nlohmann::json error_body_json;
34✔
2207
    error_body_json["message"] = std::move(message);
34✔
2208
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
34✔
2209
                                     error_body_json.dump()); // Throws
34✔
2210
    m_conn.initiate_write_message(out, this);                 // Throws
34✔
2211

18✔
2212
    m_error_to_send = false;
34✔
2213
    enlist_to_send(); // Throws
34✔
2214
}
34✔
2215

2216

2217
void Session::send_test_command_message()
2218
{
52✔
2219
    REALM_ASSERT_EX(m_state == Active, m_state);
52✔
2220

26✔
2221
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2222
                           [](const PendingTestCommand& command) {
52✔
2223
                               return command.pending;
52✔
2224
                           });
52✔
2225
    REALM_ASSERT(it != m_pending_test_commands.end());
52✔
2226

26✔
2227
    ClientProtocol& protocol = m_conn.get_client_protocol();
52✔
2228
    OutputBuffer& out = m_conn.get_output_buffer();
52✔
2229
    auto session_ident = get_ident();
52✔
2230

26✔
2231
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
52✔
2232
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
52✔
2233

26✔
2234
    m_conn.initiate_write_message(out, this); // Throws;
52✔
2235
    it->pending = false;
52✔
2236

26✔
2237
    enlist_to_send();
52✔
2238
}
52✔
2239

2240
bool Session::client_reset_if_needed()
2241
{
3,534✔
2242
    // Regardless of what happens, once we return from this function we will
1,686✔
2243
    // no longer be in the middle of a client reset
1,686✔
2244
    m_performing_client_reset = false;
3,534✔
2245

1,686✔
2246
    // Even if we end up not actually performing a client reset, consume the
1,686✔
2247
    // config to ensure that the resources it holds are released
1,686✔
2248
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,534✔
2249
    if (!client_reset_config) {
3,534✔
2250
        return false;
3,166✔
2251
    }
3,166✔
2252

184✔
2253
    auto on_flx_version_complete = [this](int64_t version) {
368✔
2254
        this->on_flx_sync_version_complete(version);
292✔
2255
    };
292✔
2256
    bool did_reset = client_reset::perform_client_reset(
368✔
2257
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
368✔
2258
        std::move(client_reset_config->notify_before_client_reset),
368✔
2259
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
368✔
2260
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
368✔
2261
    if (!did_reset) {
368✔
2262
        return false;
×
2263
    }
×
2264

184✔
2265
    // The fresh Realm has been used to reset the state
184✔
2266
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
368✔
2267

184✔
2268
    SaltedFileIdent client_file_ident;
368✔
2269
    bool has_pending_client_reset = false;
368✔
2270
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
368✔
2271
                             &has_pending_client_reset); // Throws
368✔
2272
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
368✔
2273
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
368✔
2274
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
368✔
2275
                    m_progress.download.last_integrated_client_version);
368✔
2276
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
368✔
2277
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
368✔
2278

184✔
2279
    m_upload_progress = m_progress.upload;
368✔
2280
    m_download_progress = m_progress.download;
368✔
2281
    init_progress_handler();
368✔
2282
    // In recovery mode, there may be new changesets to upload and nothing left to download.
184✔
2283
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
184✔
2284
    // For both, we want to allow uploads again without needing external changes to download first.
184✔
2285
    m_allow_upload = true;
368✔
2286
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
368✔
2287

184✔
2288
    if (has_pending_client_reset) {
368✔
2289
        handle_pending_client_reset_acknowledgement();
288✔
2290
    }
288✔
2291

184✔
2292
    update_subscription_version_info();
368✔
2293

184✔
2294
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
184✔
2295
    if (auto migration_store = get_migration_store()) {
368✔
2296
        migration_store->complete_migration_or_rollback();
260✔
2297
    }
260✔
2298

184✔
2299
    return true;
368✔
2300
}
368✔
2301

2302
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2303
{
3,658✔
2304
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,658✔
2305
                 client_file_ident.salt); // Throws
3,658✔
2306

1,734✔
2307
    // Ignore the message if the deactivation process has been initiated,
1,734✔
2308
    // because in that case, the associated Realm and SessionWrapper must
1,734✔
2309
    // not be accessed any longer.
1,734✔
2310
    if (m_state != Active)
3,658✔
2311
        return Status::OK(); // Success
124✔
2312

1,686✔
2313
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,534✔
2314
                               !m_unbound_message_received);
3,534✔
2315
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,534✔
2316
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2317
    }
×
2318
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,534✔
2319
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2320
    }
×
2321
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,534✔
2322
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2323
    }
×
2324

1,686✔
2325
    m_client_file_ident = client_file_ident;
3,534✔
2326

1,686✔
2327
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,534✔
2328
        // Ready to send the IDENT message
2329
        ensure_enlisted_to_send(); // Throws
×
2330
        return Status::OK();       // Success
×
2331
    }
×
2332

1,686✔
2333
    // if a client reset happens, it will take care of setting the file ident
1,686✔
2334
    // and if not, we do it here
1,686✔
2335
    bool did_client_reset = false;
3,534✔
2336
    try {
3,534✔
2337
        did_client_reset = client_reset_if_needed();
3,534✔
2338
    }
3,534✔
2339
    catch (const std::exception& e) {
1,726✔
2340
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
80✔
2341
        logger.error(err_msg.c_str());
80✔
2342
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2343
        suspend(err_info);
80✔
2344
        return Status::OK();
80✔
2345
    }
80✔
2346
    if (!did_client_reset) {
3,454✔
2347
        get_history().set_client_file_ident(client_file_ident,
3,166✔
2348
                                            m_fix_up_object_ids); // Throws
3,166✔
2349
        m_progress.download.last_integrated_client_version = 0;
3,166✔
2350
        m_progress.upload.client_version = 0;
3,166✔
2351
        m_last_version_selected_for_upload = 0;
3,166✔
2352
    }
3,166✔
2353

1,646✔
2354
    // Ready to send the IDENT message
1,646✔
2355
    ensure_enlisted_to_send(); // Throws
3,454✔
2356
    return Status::OK();       // Success
3,454✔
2357
}
3,454✔
2358

2359
Status Session::receive_download_message(const DownloadMessage& message)
2360
{
44,622✔
2361
    // Ignore the message if the deactivation process has been initiated,
23,514✔
2362
    // because in that case, the associated Realm and SessionWrapper must
23,514✔
2363
    // not be accessed any longer.
23,514✔
2364
    if (m_state != Active)
44,622✔
2365
        return Status::OK();
482✔
2366

23,232✔
2367
    bool is_flx = m_conn.is_flx_sync_connection();
44,140✔
2368
    int64_t query_version = is_flx ? *message.query_version : 0;
42,412✔
2369

23,232✔
2370
    // If this is a PBS connection, then every download message is its own complete batch.
23,232✔
2371
    bool last_in_batch = is_flx ? *message.last_in_batch : true;
42,412✔
2372
    auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
44,018✔
2373
    if (is_steady_state_download_message(batch_state, query_version))
44,140✔
2374
        batch_state = DownloadBatchState::SteadyState;
42,014✔
2375

23,232✔
2376
    auto&& progress = message.progress;
44,140✔
2377
    if (is_flx) {
44,140✔
2378
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
3,452✔
2379
                     "latest_server_version=%3, latest_server_version_salt=%4, "
3,452✔
2380
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
3,452✔
2381
                     "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
3,452✔
2382
                     progress.download.server_version, progress.download.last_integrated_client_version,
3,452✔
2383
                     progress.latest_server_version.version, progress.latest_server_version.salt,
3,452✔
2384
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
3,452✔
2385
                     message.progress_estimate, last_in_batch, query_version, message.changesets.size()); // Throws
3,452✔
2386
    }
3,452✔
2387
    else {
40,688✔
2388
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
40,688✔
2389
                     "latest_server_version=%3, latest_server_version_salt=%4, "
40,688✔
2390
                     "upload_client_version=%5, upload_server_version=%6, "
40,688✔
2391
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
40,688✔
2392
                     progress.download.server_version, progress.download.last_integrated_client_version,
40,688✔
2393
                     progress.latest_server_version.version, progress.latest_server_version.salt,
40,688✔
2394
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
40,688✔
2395
                     message.downloadable_bytes, message.changesets.size()); // Throws
40,688✔
2396
    }
40,688✔
2397

23,232✔
2398
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,232✔
2399
    // changeset over and over again.
23,232✔
2400
    if (m_client_error) {
44,140✔
2401
        logger.debug("Ignoring download message because the client detected an integration error");
×
2402
        return Status::OK();
×
2403
    }
×
2404

23,232✔
2405
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
44,142✔
2406
    if (REALM_UNLIKELY(!legal_at_this_time)) {
44,140✔
2407
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2408
    }
×
2409
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
44,140✔
2410
        logger.error("Bad sync progress received (%1)", status);
×
2411
        return status;
×
2412
    }
×
2413

23,232✔
2414
    version_type server_version = m_progress.download.server_version;
44,140✔
2415
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
44,140✔
2416
    for (const RemoteChangeset& changeset : message.changesets) {
44,540✔
2417
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
21,834✔
2418
        // version must be increasing, but can stay the same during bootstraps.
21,834✔
2419
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
22,962✔
2420
                                                         : (changeset.remote_version > server_version);
42,014✔
2421
        // Each server version cannot be greater than the one in the header of the download message.
21,834✔
2422
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,142✔
2423
        if (!good_server_version) {
43,142✔
2424
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2425
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2426
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2427
        }
×
2428
        server_version = changeset.remote_version;
43,142✔
2429
        // Check that per-changeset last integrated client version is "weakly"
21,834✔
2430
        // increasing.
21,834✔
2431
        bool good_client_version =
43,142✔
2432
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,142✔
2433
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,142✔
2434
        if (!good_client_version) {
43,142✔
2435
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2436
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2437
                                 "(%1, %2, %3)",
×
2438
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2439
                                 progress.download.last_integrated_client_version)};
×
2440
        }
×
2441
        last_integrated_client_version = changeset.last_integrated_local_version;
43,142✔
2442
        // Server shouldn't send our own changes, and zero is not a valid client
21,834✔
2443
        // file identifier.
21,834✔
2444
        bool good_file_ident =
43,142✔
2445
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,142✔
2446
        if (!good_file_ident) {
43,142✔
2447
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2448
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2449
                                 changeset.origin_file_ident)};
×
2450
        }
×
2451
    }
43,142✔
2452

23,232✔
2453
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
44,140✔
2454
                                       batch_state, message.changesets.size());
44,140✔
2455
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,140✔
2456
        return Status::OK();
16✔
2457
    }
16✔
2458
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,124✔
2459

23,224✔
2460
    if (is_flx)
44,124✔
2461
        update_download_estimate(message.progress_estimate);
3,440✔
2462

23,224✔
2463
    if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) {
44,124✔
2464
        clear_resumption_delay_state();
2,118✔
2465
        return Status::OK();
2,118✔
2466
    }
2,118✔
2467

22,164✔
2468
    uint64_t downloadable_bytes = is_flx ? 0 : message.downloadable_bytes;
42,006✔
2469
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, message.changesets); // Throws
42,006✔
2470

22,164✔
2471
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
42,006✔
2472
                                  batch_state, message.changesets.size());
42,006✔
2473
    if (hook_action == SyncClientHookAction::EarlyReturn) {
42,006✔
2474
        return Status::OK();
×
2475
    }
×
2476
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
42,006✔
2477

22,164✔
2478
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,164✔
2479
    // after a retryable session error.
22,164✔
2480
    clear_resumption_delay_state();
42,006✔
2481
    return Status::OK();
42,006✔
2482
}
42,006✔
2483

2484
Status Session::receive_mark_message(request_ident_type request_ident)
2485
{
16,434✔
2486
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
16,434✔
2487

8,130✔
2488
    // Ignore the message if the deactivation process has been initiated,
8,130✔
2489
    // because in that case, the associated Realm and SessionWrapper must
8,130✔
2490
    // not be accessed any longer.
8,130✔
2491
    if (m_state != Active)
16,434✔
2492
        return Status::OK(); // Success
76✔
2493

8,084✔
2494
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,360✔
2495
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,358✔
2496
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
12✔
2497
    }
12✔
2498
    bool good_request_ident =
16,346✔
2499
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,348✔
2500
    if (REALM_UNLIKELY(!good_request_ident)) {
16,346✔
2501
        return {
×
2502
            ErrorCodes::SyncProtocolInvariantFailed,
×
2503
            util::format(
×
2504
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2505
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2506
    }
×
2507

8,078✔
2508
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,346✔
2509
    m_last_download_mark_received = request_ident;
16,346✔
2510
    check_for_download_completion(); // Throws
16,346✔
2511

8,078✔
2512
    return Status::OK(); // Success
16,346✔
2513
}
16,346✔
2514

2515

2516
// The caller (Connection) must discard the session if the session has become
2517
// deactivated upon return.
2518
Status Session::receive_unbound_message()
2519
{
4,380✔
2520
    logger.debug("Received: UNBOUND");
4,380✔
2521

2,020✔
2522
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
4,380✔
2523
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,380✔
UNCOV
2524
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
UNCOV
2525
    }
×
2526

2,020✔
2527
    // The fact that the UNBIND message has been sent, but an ERROR message has
2,020✔
2528
    // not been received, implies that the deactivation process must have been
2,020✔
2529
    // initiated, so this session must be in the Deactivating state or the session
2,020✔
2530
    // has been suspended because of a client side error.
2,020✔
2531
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
4,380!
2532

2,020✔
2533
    m_unbound_message_received = true;
4,380✔
2534

2,020✔
2535
    // Detect completion of the unbinding process
2,020✔
2536
    if (m_unbind_message_send_complete && m_state == Deactivating) {
4,380✔
2537
        // The deactivation process completes when the unbinding process
2,020✔
2538
        // completes.
2,020✔
2539
        complete_deactivation(); // Throws
4,380✔
2540
        // Life cycle state is now Deactivated
2,020✔
2541
    }
4,380✔
2542

2,020✔
2543
    return Status::OK(); // Success
4,380✔
2544
}
4,380✔
2545

2546

2547
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2548
{
20✔
2549
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2550
    // Ignore the message if the deactivation process has been initiated,
10✔
2551
    // because in that case, the associated Realm and SessionWrapper must
10✔
2552
    // not be accessed any longer.
10✔
2553
    if (m_state == Active) {
20✔
2554
        on_flx_sync_error(query_version, message); // throws
20✔
2555
    }
20✔
2556
    return Status::OK();
20✔
2557
}
20✔
2558

2559
// The caller (Connection) must discard the session if the session has become
2560
// deactivated upon return.
2561
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2562
{
704✔
2563
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
704✔
2564
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
704✔
2565

350✔
2566
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
704✔
2567
    if (REALM_UNLIKELY(!legal_at_this_time)) {
704✔
2568
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2569
    }
×
2570

350✔
2571
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
704✔
2572
    auto status = protocol_error_to_status(protocol_error, info.message);
704✔
2573
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
704✔
2574
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2575
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2576
                             info.raw_error_code)};
×
2577
    }
×
2578

350✔
2579
    // Can't process debug hook actions once the Session is undergoing deactivation, since
350✔
2580
    // the SessionWrapper may not be available
350✔
2581
    if (m_state == Active) {
704✔
2582
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
702✔
2583
        if (debug_action == SyncClientHookAction::EarlyReturn) {
702✔
2584
            return Status::OK();
8✔
2585
        }
8✔
2586
    }
696✔
2587

346✔
2588
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
346✔
2589
    // containing the compensating write has appeared in a download message.
346✔
2590
    if (status == ErrorCodes::SyncCompensatingWrite) {
696✔
2591
        // If the client is not active, the compensating writes will not be processed now, but will be
22✔
2592
        // sent again the next time the client connects
22✔
2593
        if (m_state == Active) {
44✔
2594
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2595
            m_pending_compensating_write_errors.push_back(info);
44✔
2596
        }
44✔
2597
        return Status::OK();
44✔
2598
    }
44✔
2599

324✔
2600
    if (protocol_error == ProtocolError::schema_version_changed) {
652✔
2601
        // Enable upload immediately if the session is still active.
34✔
2602
        if (m_state == Active) {
70✔
2603
            auto wt = get_db()->start_write();
70✔
2604
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
70✔
2605
            wt->commit();
70✔
2606
            // Notify SyncSession a schema migration is required.
34✔
2607
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
70✔
2608
        }
70✔
2609
        // Keep the session active to upload any unsynced changes.
34✔
2610
        return Status::OK();
70✔
2611
    }
70✔
2612

290✔
2613
    m_error_message_received = true;
582✔
2614
    suspend(SessionErrorInfo{info, std::move(status)});
582✔
2615
    return Status::OK();
582✔
2616
}
582✔
2617

2618
void Session::suspend(const SessionErrorInfo& info)
2619
{
662✔
2620
    REALM_ASSERT(!m_suspended);
662✔
2621
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
662!
2622
    logger.debug("Suspended"); // Throws
662✔
2623

330✔
2624
    m_suspended = true;
662✔
2625

330✔
2626
    // Detect completion of the unbinding process
330✔
2627
    if (m_unbind_message_send_complete && m_error_message_received) {
662!
2628
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2✔
2629
        // we received an ERROR message implies that the deactivation process must
2✔
2630
        // have been initiated, so this session must be in the Deactivating state.
2✔
2631
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
2!
2632

2✔
2633
        // The deactivation process completes when the unbinding process
2✔
2634
        // completes.
2✔
2635
        complete_deactivation(); // Throws
2✔
2636
        // Life cycle state is now Deactivated
2✔
2637
    }
2✔
2638

330✔
2639
    // Notify the application of the suspension of the session if the session is
330✔
2640
    // still in the Active state
330✔
2641
    if (m_state == Active) {
662✔
2642
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
660✔
2643
        m_conn.one_less_active_unsuspended_session(); // Throws
660✔
2644
        on_suspended(info);                           // Throws
660✔
2645
    }
660✔
2646

330✔
2647
    if (!info.is_fatal) {
662✔
2648
        begin_resumption_delay(info);
56✔
2649
    }
56✔
2650

330✔
2651
    // Ready to send the UNBIND message, if it has not been sent already
330✔
2652
    if (!m_unbind_message_sent)
662✔
2653
        ensure_enlisted_to_send(); // Throws
660✔
2654
}
662✔
2655

2656
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2657
{
52✔
2658
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
52✔
2659
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2660
                           [&](const PendingTestCommand& command) {
52✔
2661
                               return command.id == ident;
52✔
2662
                           });
52✔
2663
    if (it == m_pending_test_commands.end()) {
52✔
2664
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2665
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2666
    }
×
2667

26✔
2668
    it->promise.emplace_value(std::string{body});
52✔
2669
    m_pending_test_commands.erase(it);
52✔
2670

26✔
2671
    return Status::OK();
52✔
2672
}
52✔
2673

2674
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2675
{
56✔
2676
    REALM_ASSERT(!m_try_again_activation_timer);
56✔
2677

28✔
2678
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
56✔
2679
                                  error_info.resumption_delay_interval);
56✔
2680
    auto try_again_interval = m_try_again_delay_info.delay_interval();
56✔
2681
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
56✔
2682
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
12✔
2683
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
12✔
2684
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
12✔
2685
        try_again_interval = std::chrono::milliseconds{1000};
26✔
2686
    }
26✔
2687
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
56✔
2688
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
56✔
2689
        if (status == ErrorCodes::OperationAborted)
56✔
2690
            return;
12✔
2691
        else if (!status.is_ok())
44✔
2692
            throw Exception(status);
×
2693

22✔
2694
        m_try_again_activation_timer.reset();
44✔
2695
        cancel_resumption_delay();
44✔
2696
    });
44✔
2697
}
56✔
2698

2699
void Session::clear_resumption_delay_state()
2700
{
44,122✔
2701
    if (m_try_again_activation_timer) {
44,122✔
2702
        logger.debug("Clearing resumption delay state after successful download");
×
2703
        m_try_again_delay_info.reset();
×
2704
    }
×
2705
}
44,122✔
2706

2707
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2708
{
44,142✔
2709
    const SyncProgress& a = m_progress;
44,142✔
2710
    const SyncProgress& b = progress;
44,142✔
2711
    std::string message;
44,142✔
2712
    if (b.latest_server_version.version < a.latest_server_version.version) {
44,142✔
2713
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2714
                               "session (current: %1, received: %2)",
×
2715
                               a.latest_server_version.version, b.latest_server_version.version);
×
2716
    }
×
2717
    if (b.upload.client_version < a.upload.client_version) {
44,142✔
2718
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2719
                               "throughout a session (current: %1, received: %2)",
×
2720
                               a.upload.client_version, b.upload.client_version);
×
2721
    }
×
2722
    if (b.upload.client_version > m_last_version_available) {
44,142✔
2723
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2724
                               "version in existence (current: %1, received: %2)",
×
2725
                               m_last_version_available, b.upload.client_version);
×
2726
    }
×
2727
    if (b.download.server_version < a.download.server_version) {
44,142✔
2728
        message =
×
2729
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2730
                         a.download.server_version, b.download.server_version);
×
2731
    }
×
2732
    if (b.download.server_version > b.latest_server_version.version) {
44,142✔
2733
        message = util::format(
×
2734
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2735
            b.download.server_version, b.latest_server_version.version);
×
2736
    }
×
2737
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
44,142✔
2738
        message = util::format(
×
2739
            "Last integrated client version on the server at the position in the server's history of the download "
×
2740
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2741
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2742
    }
×
2743
    if (b.download.last_integrated_client_version > b.upload.client_version) {
44,142✔
2744
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2745
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2746
                               "on the server (download: %1, upload: %2)",
×
2747
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2748
    }
×
2749
    if (b.download.server_version < b.upload.last_integrated_server_version) {
44,142✔
2750
        message = util::format(
×
2751
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2752
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2753
            b.download.server_version, b.upload.last_integrated_server_version);
×
2754
    }
×
2755

23,232✔
2756
    if (message.empty()) {
44,142✔
2757
        return Status::OK();
44,136✔
2758
    }
44,136✔
2759
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
6✔
2760
}
6✔
2761

2762

2763
void Session::check_for_upload_completion()
2764
{
77,852✔
2765
    REALM_ASSERT_EX(m_state == Active, m_state);
77,852✔
2766
    if (!m_upload_completion_notification_requested) {
77,852✔
2767
        return;
46,122✔
2768
    }
46,122✔
2769

15,584✔
2770
    // during an ongoing client reset operation, we never upload anything
15,584✔
2771
    if (m_performing_client_reset)
31,730✔
2772
        return;
256✔
2773

15,456✔
2774
    // Upload process must have reached end of history
15,456✔
2775
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
31,474✔
2776
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
31,474✔
2777
    if (!scan_complete)
31,474✔
2778
        return;
5,284✔
2779

12,780✔
2780
    // All uploaded changesets must have been acknowledged by the server
12,780✔
2781
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
26,190✔
2782
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
26,190✔
2783
    if (!all_uploads_accepted)
26,190✔
2784
        return;
11,256✔
2785

7,382✔
2786
    m_upload_completion_notification_requested = false;
14,934✔
2787
    on_upload_completion(); // Throws
14,934✔
2788
}
14,934✔
2789

2790

2791
void Session::check_for_download_completion()
2792
{
60,196✔
2793
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
60,196✔
2794
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
60,196✔
2795
    if (m_last_download_mark_received == m_last_triggering_download_mark)
60,196✔
2796
        return;
43,606✔
2797
    if (m_last_download_mark_received < m_target_download_mark)
16,590✔
2798
        return;
418✔
2799
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,172✔
2800
        return;
×
2801
    m_last_triggering_download_mark = m_target_download_mark;
16,172✔
2802
    if (REALM_UNLIKELY(!m_allow_upload)) {
16,172✔
2803
        // Activate the upload process now, and enable immediate reactivation
2,210✔
2804
        // after a subsequent fast reconnect.
2,210✔
2805
        m_allow_upload = true;
4,602✔
2806
        ensure_enlisted_to_send(); // Throws
4,602✔
2807
    }
4,602✔
2808
    on_download_completion(); // Throws
16,172✔
2809
}
16,172✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc