• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2492

15 Jul 2024 06:38PM UTC coverage: 90.993% (+0.01%) from 90.98%
2492

push

Evergreen

web-flow
RCORE-2192 RCORE-2193 Fix FLX download progress reporting (#7870)

* Fix FLX download progress reporting

We need to store the download progress for each batch of a bootstrap and not
just at the end for it to be useful in any way.

The server will sometimes send us DOWNLOAD messages with a non-one estimate
followed by a one estimate where the byte-level information is the same (as the
final message is empty). When this happens we need to report the download
completion to the user, so add the estimate to the fields checked for changes.

A subscription change which doesn't actually change what set of objects is in
view can result in an empty DOWNLOAD message with no changes other than the
query version, and we should report that too.

* Fix a comment

* Pass the DownloadMessage to process_flx_bootstrap_message()

* Report steady-state download progress

102388 of 180586 branches covered (56.7%)

247 of 257 new or added lines in 10 files covered. (96.11%)

44 existing lines in 13 files now uncovered.

215408 of 236730 relevant lines covered (90.99%)

5309938.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.79
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

49
void ClientImpl::ReconnectInfo::reset() noexcept
50
{
1,828✔
51
    m_backoff_state.reset();
1,828✔
52
    scheduled_reset = false;
1,828✔
53
}
1,828✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
58
{
3,730✔
59
    m_backoff_state.update(new_reason, new_delay_info);
3,730✔
60
}
3,730✔
61

62

63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
64
{
5,908✔
65
    if (scheduled_reset) {
5,908✔
66
        reset();
8✔
67
    }
8✔
68

69
    if (!m_backoff_state.triggering_error) {
5,908✔
70
        return std::chrono::milliseconds::zero();
4,530✔
71
    }
4,530✔
72

73
    switch (*m_backoff_state.triggering_error) {
1,378✔
74
        case ConnectionTerminationReason::closed_voluntarily:
80✔
75
            return std::chrono::milliseconds::zero();
80✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
77
            return std::chrono::milliseconds::max();
18✔
78
        default:
1,278✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,278✔
80
                return std::chrono::milliseconds::max();
936✔
81
            }
936✔
82

83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
342✔
84
            return m_backoff_state.delay_interval();
342✔
85
    }
1,378✔
86
}
1,378✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
90
                                      port_type& port, std::string& path) const
91
{
4,070✔
92
    util::Uri uri(url); // Throws
4,070✔
93
    uri.canonicalize(); // Throws
4,070✔
94
    std::string userinfo, address_2, port_2;
4,070✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,070✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,070✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,070✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
4,070✔
99
    if (REALM_UNLIKELY(!good))
4,070✔
100
        return false;
×
101
    ProtocolEnvelope protocol_2;
4,070✔
102
    port_type port_3;
4,070✔
103
    if (realm_scheme) {
4,070✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
×
112
    }
×
113
    else {
4,070✔
114
        REALM_ASSERT(ws_scheme);
4,070✔
115
        if (uri.get_scheme() == "ws:") {
4,070✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,990✔
117
            port_3 = 80;
3,990✔
118
        }
3,990✔
119
        else {
80✔
120
            protocol_2 = ProtocolEnvelope::wss;
80✔
121
            port_3 = 443;
80✔
122
        }
80✔
123
    }
4,070✔
124
    if (!port_2.empty()) {
4,070✔
125
        std::istringstream in(port_2);    // Throws
3,966✔
126
        in.imbue(std::locale::classic()); // Throws
3,966✔
127
        in >> port_3;
3,966✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,966✔
129
            return false;
×
130
    }
3,966✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
4,070✔
132

133
    protocol = protocol_2;
4,070✔
134
    address = std::move(address_2);
4,070✔
135
    port = port_3;
4,070✔
136
    path = std::move(path_2);
4,070✔
137
    return true;
4,070✔
138
}
4,070✔
139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
4,900✔
142
    , logger{*logger_ptr}
4,900✔
143
    , m_reconnect_mode{config.reconnect_mode}
4,900✔
144
    , m_connect_timeout{config.connect_timeout}
4,900✔
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,900✔
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,900✔
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,900✔
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,900✔
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,900✔
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,900✔
151
    , m_dry_run{config.dry_run}
4,900✔
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,900✔
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
4,900✔
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,900✔
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,900✔
156
    , m_socket_provider{std::move(config.socket_provider)}
4,900✔
157
    , m_client_protocol{} // Throws
4,900✔
158
    , m_one_connection_per_session{config.one_connection_per_session}
4,900✔
159
    , m_random{}
4,900✔
160
{
9,938✔
161
    // FIXME: Would be better if seeding was up to the application.
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,938✔
163

164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,938✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,938✔
166
                 get_current_protocol_version()); // Throws
9,938✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,938✔
168
    const char* build_mode;
9,938✔
169
#if REALM_DEBUG
9,938✔
170
    build_mode = "Debug";
9,938✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,938✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,938✔
176
                 config.one_connection_per_session); // Throws
9,938✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,938✔
178
                 config.connect_timeout); // Throws
9,938✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,938✔
180
                 config.connection_linger_time); // Throws
9,938✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,938✔
182
                 config.ping_keepalive_period); // Throws
9,938✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,938✔
184
                 config.pong_keepalive_timeout); // Throws
9,938✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,938✔
186
                 config.fast_reconnect_limit); // Throws
9,938✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,938✔
188
                 config.disable_upload_compaction); // Throws
9,938✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,938✔
190
                 config.disable_sync_to_disk); // Throws
9,938✔
191
    logger.debug(
9,938✔
192
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,938✔
193
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,938✔
194
        m_reconnect_backoff_info.resumption_delay_interval.count(),
9,938✔
195
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,938✔
196

197
    if (config.reconnect_mode != ReconnectMode::normal) {
9,938✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
199
                    "never do this in production!");
776✔
200
    }
776✔
201

202
    if (config.dry_run) {
9,938✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
×
205
    }
×
206

207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,938✔
208

209
    if (m_one_connection_per_session) {
9,938✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
4✔
212
    }
4✔
213

214
    if (config.disable_upload_activation_delay) {
9,938✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
×
217
    }
×
218

219
    if (config.disable_sync_to_disk) {
9,938✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
×
222
    }
×
223

224
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,332✔
225
        if (status == ErrorCodes::OperationAborted)
14,332✔
226
            return;
×
227
        else if (!status.is_ok())
14,332✔
228
            throw Exception(status);
×
229
        actualize_and_finalize_session_wrappers(); // Throws
14,332✔
230
    });
14,332✔
231
}
9,938✔
232

233
void ClientImpl::incr_outstanding_posts()
234
{
198,904✔
235
    util::CheckedLockGuard lock(m_drain_mutex);
198,904✔
236
    ++m_outstanding_posts;
198,904✔
237
    m_drained = false;
198,904✔
238
}
198,904✔
239

240
void ClientImpl::decr_outstanding_posts()
241
{
198,896✔
242
    util::CheckedLockGuard lock(m_drain_mutex);
198,896✔
243
    REALM_ASSERT(m_outstanding_posts);
198,896✔
244
    if (--m_outstanding_posts <= 0) {
198,896✔
245
        // Notify must happen with lock held or another thread could destroy
246
        // ClientImpl between when we release the lock and when we call notify
247
        m_drain_cv.notify_all();
18,196✔
248
    }
18,196✔
249
}
198,896✔
250

251
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
252
{
55,460✔
253
    REALM_ASSERT(m_socket_provider);
55,460✔
254
    incr_outstanding_posts();
55,460✔
255
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
55,460✔
256
        auto decr_guard = util::make_scope_exit([&]() noexcept {
55,458✔
257
            decr_outstanding_posts();
55,456✔
258
        });
55,456✔
259
        handler(status);
55,456✔
260
    });
55,456✔
261
}
55,460✔
262

263
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
264
{
126,346✔
265
    REALM_ASSERT(m_socket_provider);
126,346✔
266
    incr_outstanding_posts();
126,346✔
267
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
126,346✔
268
        auto decr_guard = util::make_scope_exit([&]() noexcept {
126,346✔
269
            decr_outstanding_posts();
126,338✔
270
        });
126,338✔
271
        if (status == ErrorCodes::OperationAborted)
126,346✔
272
            return;
×
273
        if (!status.is_ok())
126,346✔
274
            throw Exception(status);
×
275
        handler();
126,346✔
276
    });
126,346✔
277
}
126,346✔
278

279

280
void ClientImpl::drain_connections()
281
{
9,936✔
282
    logger.debug("Draining connections during sync client shutdown");
9,936✔
283
    for (auto& server_slot_pair : m_server_slots) {
9,936✔
284
        auto& server_slot = server_slot_pair.second;
2,692✔
285

286
        if (server_slot.connection) {
2,692✔
287
            auto& conn = server_slot.connection;
2,468✔
288
            conn->force_close();
2,468✔
289
        }
2,468✔
290
        else {
224✔
291
            for (auto& conn_pair : server_slot.alt_connections) {
224✔
292
                conn_pair.second->force_close();
6✔
293
            }
6✔
294
        }
224✔
295
    }
2,692✔
296
}
9,936✔
297

298

299
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
300
                                                       SyncSocketProvider::FunctionHandler&& handler)
301
{
17,104✔
302
    REALM_ASSERT(m_socket_provider);
17,104✔
303
    incr_outstanding_posts();
17,104✔
304
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,104✔
305
        auto decr_guard = util::make_scope_exit([&]() noexcept {
17,106✔
306
            decr_outstanding_posts();
17,106✔
307
        });
17,106✔
308
        handler(status);
17,102✔
309
    });
17,102✔
310
}
17,104✔
311

312

313
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
314
{
12,734✔
315
    REALM_ASSERT(m_socket_provider);
12,734✔
316
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
12,734✔
317
}
12,734✔
318

319
Connection::~Connection()
320
{
2,794✔
321
    if (m_websocket_sentinel) {
2,794✔
322
        m_websocket_sentinel->destroyed = true;
×
323
        m_websocket_sentinel.reset();
×
324
    }
×
325
}
2,794✔
326

327
void Connection::activate()
328
{
2,798✔
329
    REALM_ASSERT(m_on_idle);
2,798✔
330
    m_activated = true;
2,798✔
331
    if (m_num_active_sessions == 0)
2,798✔
332
        m_on_idle->trigger();
×
333
    // We cannot in general connect immediately, because a prior failure to
334
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
335
    initiate_reconnect_wait(); // Throws
2,798✔
336
}
2,798✔
337

338

339
void Connection::activate_session(std::unique_ptr<Session> sess)
340
{
10,090✔
341
    REALM_ASSERT(sess);
10,090✔
342
    REALM_ASSERT(&sess->m_conn == this);
10,090✔
343
    REALM_ASSERT(!m_force_closed);
10,090✔
344
    Session& sess_2 = *sess;
10,090✔
345
    session_ident_type ident = sess->m_ident;
10,090✔
346
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,090✔
347
    bool was_inserted = p.second;
10,090✔
348
    REALM_ASSERT(was_inserted);
10,090✔
349
    // Save the session ident to the historical list of session idents
350
    m_session_history.insert(ident);
10,090✔
351
    sess_2.activate(); // Throws
10,090✔
352
    if (m_state == ConnectionState::connected) {
10,090✔
353
        bool fast_reconnect = false;
7,024✔
354
        sess_2.connection_established(fast_reconnect); // Throws
7,024✔
355
    }
7,024✔
356
    ++m_num_active_sessions;
10,090✔
357
}
10,090✔
358

359

360
void Connection::initiate_session_deactivation(Session* sess)
361
{
10,088✔
362
    REALM_ASSERT(sess);
10,088✔
363
    REALM_ASSERT(&sess->m_conn == this);
10,088✔
364
    REALM_ASSERT(m_num_active_sessions);
10,088✔
365
    // Since the client may be waiting for m_num_active_sessions to reach 0
366
    // in stop_and_wait() (on a separate thread), deactivate Session before
367
    // decrementing the num active sessions value.
368
    sess->initiate_deactivation(); // Throws
10,088✔
369
    if (sess->m_state == Session::Deactivated) {
10,088✔
370
        finish_session_deactivation(sess);
892✔
371
    }
892✔
372
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
10,088✔
373
        if (m_activated && m_state == ConnectionState::disconnected)
4,376✔
374
            m_on_idle->trigger();
370✔
375
    }
4,374✔
376
}
10,088✔
377

378

379
void Connection::cancel_reconnect_delay()
380
{
2,048✔
381
    REALM_ASSERT(m_activated);
2,048✔
382

383
    if (m_reconnect_delay_in_progress) {
2,048✔
384
        if (m_nonzero_reconnect_delay)
1,816✔
385
            logger.detail("Canceling reconnect delay"); // Throws
910✔
386

387
        // Cancel the in-progress wait operation by destroying the timer
388
        // object. Destruction is needed in this case, because a new wait
389
        // operation might have to be initiated before the previous one
390
        // completes (its completion handler starts to execute), so the new wait
391
        // operation must be done on a new timer object.
392
        m_reconnect_disconnect_timer.reset();
1,816✔
393
        m_reconnect_delay_in_progress = false;
1,816✔
394
        m_reconnect_info.reset();
1,816✔
395
        initiate_reconnect_wait(); // Throws
1,816✔
396
        return;
1,816✔
397
    }
1,816✔
398

399
    // If we are not disconnected, then we need to make sure the next time we get disconnected
400
    // that we are allowed to re-connect as quickly as possible.
401
    //
402
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
403
    // backoff/delay state before calculating the next delay, unless a PONG message is received
404
    // for the urgent PING message we send below.
405
    //
406
    // If we get a PONG message for the urgent PING message sent below, then the connection is
407
    // healthy and we can calculate the next delay normally.
408
    if (m_state != ConnectionState::disconnected) {
232✔
409
        m_reconnect_info.scheduled_reset = true;
232✔
410
        m_ping_after_scheduled_reset_of_reconnect_info = false;
232✔
411

412
        schedule_urgent_ping(); // Throws
232✔
413
        return;
232✔
414
    }
232✔
415
    // Nothing to do in this case. The next reconnect attemp will be made as
416
    // soon as there are any sessions that are both active and unsuspended.
417
}
232✔
418

419
void Connection::finish_session_deactivation(Session* sess)
420
{
7,586✔
421
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,586✔
422
    auto ident = sess->m_ident;
7,586✔
423
    m_sessions.erase(ident);
7,586✔
424
    m_session_history.erase(ident);
7,586✔
425
}
7,586✔
426

427
void Connection::force_close()
428
{
2,474✔
429
    if (m_force_closed) {
2,474✔
430
        return;
×
431
    }
×
432

433
    m_force_closed = true;
2,474✔
434

435
    if (m_state != ConnectionState::disconnected) {
2,474✔
436
        voluntary_disconnect();
2,436✔
437
    }
2,436✔
438

439
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,474✔
440
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,474✔
441
        m_reconnect_disconnect_timer.reset();
38✔
442
        m_reconnect_delay_in_progress = false;
38✔
443
        m_disconnect_delay_in_progress = false;
38✔
444
    }
38✔
445

446
    // We must copy any session pointers we want to close to a vector because force_closing
447
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
448
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
449
    std::vector<Session*> to_close;
2,474✔
450
    for (auto& session_pair : m_sessions) {
2,474✔
451
        if (session_pair.second->m_state == Session::State::Active) {
102✔
452
            to_close.push_back(session_pair.second.get());
102✔
453
        }
102✔
454
    }
102✔
455

456
    for (auto& sess : to_close) {
2,474✔
457
        sess->force_close();
102✔
458
    }
102✔
459

460
    logger.debug("Force closed idle connection");
2,474✔
461
}
2,474✔
462

463

464
void Connection::websocket_connected_handler(const std::string& protocol)
465
{
3,524✔
466
    if (!protocol.empty()) {
3,524✔
467
        std::string_view expected_prefix =
3,524✔
468
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,524✔
469
        // FIXME: Use std::string_view::begins_with() in C++20.
470
        auto prefix_matches = [&](std::string_view other) {
3,524✔
471
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,524✔
472
        };
3,524✔
473
        if (prefix_matches(expected_prefix)) {
3,524✔
474
            util::MemoryInputStream in;
3,524✔
475
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,524✔
476
            in.imbue(std::locale::classic());
3,524✔
477
            in.unsetf(std::ios_base::skipws);
3,524✔
478
            int value_2 = 0;
3,524✔
479
            in >> value_2;
3,524✔
480
            if (in && in.eof() && value_2 >= 0) {
3,524✔
481
                bool good_version =
3,524✔
482
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,524✔
483
                if (good_version) {
3,524✔
484
                    logger.detail("Negotiated protocol version: %1", value_2);
3,524✔
485
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
486
                    // will provide the appservices connection ID via a log message.
487
                    // TODO: Remove once the server starts sending the connection ID
488
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,524✔
489
                    m_negotiated_protocol_version = value_2;
3,524✔
490
                    handle_connection_established(); // Throws
3,524✔
491
                    return;
3,524✔
492
                }
3,524✔
493
            }
3,524✔
494
        }
3,524✔
495
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
496
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
497
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
498
    }
×
499
    else {
×
500
        close_due_to_client_side_error(
×
501
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
502
            ConnectionTerminationReason::bad_headers_in_http_response);
×
503
    }
×
504
}
3,524✔
505

506

507
bool Connection::websocket_binary_message_received(util::Span<const char> data)
508
{
78,378✔
509
    if (m_force_closed) {
78,378✔
510
        logger.debug("Received binary message after connection was force closed");
×
511
        return false;
×
512
    }
×
513

514
    using sf = SimulatedFailure;
78,378✔
515
    if (sf::check_trigger(sf::sync_client__read_head)) {
78,378✔
516
        close_due_to_client_side_error(
436✔
517
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
436✔
518
            ConnectionTerminationReason::read_or_write_error);
436✔
519
        return bool(m_websocket);
436✔
520
    }
436✔
521

522
    handle_message_received(data);
77,942✔
523
    return bool(m_websocket);
77,942✔
524
}
78,378✔
525

526

527
void Connection::websocket_error_handler()
528
{
668✔
529
    m_websocket_error_received = true;
668✔
530
}
668✔
531

532
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
533
{
776✔
534
    if (m_force_closed) {
776✔
535
        logger.debug("Received websocket close message after connection was force closed");
×
536
        return false;
×
537
    }
×
538
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
776✔
539

540
    switch (error_code) {
776✔
541
        case WebSocketError::websocket_ok:
60✔
542
            break;
60✔
543
        case WebSocketError::websocket_resolve_failed:
4✔
544
            [[fallthrough]];
4✔
545
        case WebSocketError::websocket_connection_failed: {
112✔
546
            SessionErrorInfo error_info(
112✔
547
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
112✔
548
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
549
            // to make sure the websocket URL is correct
550
            if (!m_server_endpoint.is_verified) {
112✔
551
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
84✔
552
            }
84✔
553
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
112✔
554
            break;
112✔
555
        }
4✔
556
        case WebSocketError::websocket_read_error:
534✔
557
            [[fallthrough]];
534✔
558
        case WebSocketError::websocket_write_error: {
534✔
559
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
534✔
560
                                         ConnectionTerminationReason::read_or_write_error);
534✔
561
            break;
534✔
562
        }
534✔
563
        case WebSocketError::websocket_going_away:
✔
564
            [[fallthrough]];
×
565
        case WebSocketError::websocket_protocol_error:
✔
566
            [[fallthrough]];
×
567
        case WebSocketError::websocket_unsupported_data:
✔
568
            [[fallthrough]];
×
569
        case WebSocketError::websocket_invalid_payload_data:
✔
570
            [[fallthrough]];
×
571
        case WebSocketError::websocket_policy_violation:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_reserved:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_no_status_received:
✔
576
            [[fallthrough]];
×
577
        case WebSocketError::websocket_invalid_extension: {
✔
578
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
579
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
580
            break;
×
581
        }
×
582
        case WebSocketError::websocket_message_too_big: {
4✔
583
            auto message = util::format(
4✔
584
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
585
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
586
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
587
            involuntary_disconnect(std::move(error_info),
4✔
588
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
589
            break;
4✔
590
        }
×
591
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
592
            close_due_to_client_side_error(
10✔
593
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
594
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
595
            break;
10✔
596
        }
×
597
        case WebSocketError::websocket_client_too_old:
✔
598
            [[fallthrough]];
×
599
        case WebSocketError::websocket_client_too_new:
✔
600
            [[fallthrough]];
×
601
        case WebSocketError::websocket_protocol_mismatch: {
✔
602
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
603
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
604
            break;
×
605
        }
×
606
        case WebSocketError::websocket_fatal_error: {
✔
607
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
608
            // been verified, then use a non-fatal error and try to perform a location update.
609
            SessionErrorInfo error_info(
×
610
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
611
                IsFatal{m_server_endpoint.is_verified});
×
612
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
×
613
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
614
            // to make sure the websocket URL is correct
615
            if (!m_server_endpoint.is_verified) {
×
616
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
617
                reason = ConnectionTerminationReason::connect_operation_failed;
×
618
            }
×
619
            involuntary_disconnect(std::move(error_info), reason);
×
620
            break;
×
621
        }
×
622
        case WebSocketError::websocket_forbidden: {
✔
623
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
624
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
625
            involuntary_disconnect(std::move(error_info),
×
626
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
627
            break;
×
628
        }
×
629
        case WebSocketError::websocket_unauthorized: {
44✔
630
            SessionErrorInfo error_info(
44✔
631
                {ErrorCodes::AuthError,
44✔
632
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
633
                IsFatal{false});
44✔
634
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
635
            involuntary_disconnect(std::move(error_info),
44✔
636
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
44✔
637
            break;
44✔
638
        }
×
639
        case WebSocketError::websocket_moved_permanently: {
12✔
640
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
641
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
642
            involuntary_disconnect(std::move(error_info),
12✔
643
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
644
            break;
12✔
645
        }
×
646
        case WebSocketError::websocket_abnormal_closure: {
✔
647
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
648
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
649
            involuntary_disconnect(std::move(error_info),
×
650
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
651
            break;
×
652
        }
×
653
        case WebSocketError::websocket_internal_server_error:
✔
654
            [[fallthrough]];
×
655
        case WebSocketError::websocket_retry_error: {
✔
656
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
657
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
658
            break;
×
659
        }
×
660
    }
776✔
661

662
    return bool(m_websocket);
776✔
663
}
776✔
664

665
// Guarantees that handle_reconnect_wait() is never called from within the
666
// execution of initiate_reconnect_wait() (no callback reentrance).
667
void Connection::initiate_reconnect_wait()
668
{
8,342✔
669
    REALM_ASSERT(m_activated);
8,342✔
670
    REALM_ASSERT(!m_reconnect_delay_in_progress);
8,342✔
671
    REALM_ASSERT(!m_disconnect_delay_in_progress);
8,342✔
672

673
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
674
    if (m_force_closed) {
8,342✔
675
        return;
2,434✔
676
    }
2,434✔
677

678
    m_reconnect_delay_in_progress = true;
5,908✔
679
    auto delay = m_reconnect_info.delay_interval();
5,908✔
680
    if (delay == std::chrono::milliseconds::max()) {
5,908✔
681
        logger.detail("Reconnection delayed indefinitely"); // Throws
954✔
682
        // Not actually starting a timer corresponds to an infinite wait
683
        m_nonzero_reconnect_delay = true;
954✔
684
        return;
954✔
685
    }
954✔
686

687
    if (delay == std::chrono::milliseconds::zero()) {
4,954✔
688
        m_nonzero_reconnect_delay = false;
4,608✔
689
    }
4,608✔
690
    else {
346✔
691
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
346✔
692
        m_nonzero_reconnect_delay = true;
346✔
693
    }
346✔
694

695
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
696
    // we need it to be cancelable in case the connection is terminated before the timer
697
    // callback is run.
698
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,954✔
699
        // If the operation is aborted, the connection object may have been
700
        // destroyed.
701
        if (status != ErrorCodes::OperationAborted)
4,954✔
702
            handle_reconnect_wait(status); // Throws
3,728✔
703
    });                                    // Throws
4,954✔
704
}
4,954✔
705

706

707
void Connection::handle_reconnect_wait(Status status)
708
{
3,730✔
709
    if (!status.is_ok()) {
3,730✔
710
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
711
        throw Exception(status);
×
712
    }
×
713

714
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,730✔
715
    m_reconnect_delay_in_progress = false;
3,730✔
716

717
    if (m_num_active_unsuspended_sessions > 0)
3,730✔
718
        initiate_reconnect(); // Throws
3,722✔
719
}
3,730✔
720

721
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
722
    explicit WebSocketObserverShim(Connection* conn)
723
        : conn(conn)
1,722✔
724
        , sentinel(conn->m_websocket_sentinel)
1,722✔
725
    {
3,730✔
726
    }
3,730✔
727

728
    Connection* conn;
729
    util::bind_ptr<LifecycleSentinel> sentinel;
730

731
    void websocket_connected_handler(const std::string& protocol) override
732
    {
3,524✔
733
        if (sentinel->destroyed) {
3,524✔
734
            return;
×
735
        }
×
736

737
        return conn->websocket_connected_handler(protocol);
3,524✔
738
    }
3,524✔
739

740
    void websocket_error_handler() override
741
    {
668✔
742
        if (sentinel->destroyed) {
668✔
743
            return;
×
744
        }
×
745

746
        conn->websocket_error_handler();
668✔
747
    }
668✔
748

749
    bool websocket_binary_message_received(util::Span<const char> data) override
750
    {
78,378✔
751
        if (sentinel->destroyed) {
78,378✔
752
            return false;
×
753
        }
×
754

755
        return conn->websocket_binary_message_received(data);
78,378✔
756
    }
78,378✔
757

758
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
759
    {
776✔
760
        if (sentinel->destroyed) {
776✔
761
            return true;
×
762
        }
×
763

764
        return conn->websocket_closed_handler(was_clean, error_code, msg);
776✔
765
    }
776✔
766
};
767

768
void Connection::initiate_reconnect()
769
{
3,730✔
770
    REALM_ASSERT(m_activated);
3,730✔
771

772
    m_state = ConnectionState::connecting;
3,730✔
773
    report_connection_state_change(ConnectionState::connecting); // Throws
3,730✔
774
    if (m_websocket_sentinel) {
3,730✔
775
        m_websocket_sentinel->destroyed = true;
×
776
    }
×
777
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,730✔
778
    m_websocket.reset();
3,730✔
779

780
    // Watchdog
781
    initiate_connect_wait(); // Throws
3,730✔
782

783
    std::vector<std::string> sec_websocket_protocol;
3,730✔
784
    {
3,730✔
785
        auto protocol_prefix =
3,730✔
786
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,730✔
787
        int min = get_oldest_supported_protocol_version();
3,730✔
788
        int max = get_current_protocol_version();
3,730✔
789
        REALM_ASSERT_3(min, <=, max);
3,730✔
790
        // List protocol version in descending order to ensure that the server
791
        // selects the highest possible version.
792
        for (int version = max; version >= min; --version) {
48,484✔
793
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
44,754✔
794
        }
44,754✔
795
    }
3,730✔
796

797
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,730✔
798
                m_server_endpoint.port, m_http_request_path_prefix);
3,730✔
799

800
    m_websocket_error_received = false;
3,730✔
801
    m_websocket =
3,730✔
802
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,730✔
803
                                            WebSocketEndpoint{
3,730✔
804
                                                m_server_endpoint.address,
3,730✔
805
                                                m_server_endpoint.port,
3,730✔
806
                                                get_http_request_path(),
3,730✔
807
                                                std::move(sec_websocket_protocol),
3,730✔
808
                                                is_ssl(m_server_endpoint.envelope),
3,730✔
809
                                                /// DEPRECATED - The following will be removed in a future release
810
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,730✔
811
                                                m_verify_servers_ssl_certificate,
3,730✔
812
                                                m_ssl_trust_certificate_path,
3,730✔
813
                                                m_ssl_verify_callback,
3,730✔
814
                                                m_proxy_config,
3,730✔
815
                                            });
3,730✔
816
}
3,730✔
817

818

819
void Connection::initiate_connect_wait()
820
{
3,730✔
821
    // Deploy a watchdog to enforce an upper bound on the time it can take to
822
    // fully establish the connection (including SSL and WebSocket
823
    // handshakes). Without such a watchdog, connect operations could take very
824
    // long, or even indefinite time.
825
    milliseconds_type time = m_client.m_connect_timeout;
3,730✔
826

827
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,730✔
828
        // If the operation is aborted, the connection object may have been
829
        // destroyed.
830
        if (status != ErrorCodes::OperationAborted)
3,728✔
831
            handle_connect_wait(status); // Throws
×
832
    });                                  // Throws
3,728✔
833
}
3,730✔
834

835

836
void Connection::handle_connect_wait(Status status)
837
{
×
838
    if (!status.is_ok()) {
×
839
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
840
        throw Exception(status);
×
841
    }
×
842

843
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
844
    logger.info("Connect timeout"); // Throws
×
845
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
846
                                IsFatal{false});
×
847
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
848
    // to make sure the websocket URL is correct
849
    if (!m_server_endpoint.is_verified) {
×
850
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
851
    }
×
852
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
×
853
}
×
854

855

856
void Connection::handle_connection_established()
857
{
3,524✔
858
    // Cancel connect timeout watchdog
859
    m_connect_timer.reset();
3,524✔
860

861
    m_state = ConnectionState::connected;
3,524✔
862
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,524✔
863

864
    milliseconds_type now = monotonic_clock_now();
3,524✔
865
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,524✔
866
    initiate_ping_delay(now);     // Throws
3,524✔
867

868
    bool fast_reconnect = false;
3,524✔
869
    if (m_disconnect_has_occurred) {
3,524✔
870
        milliseconds_type time = now - m_disconnect_time;
982✔
871
        if (time <= m_client.m_fast_reconnect_limit)
982✔
872
            fast_reconnect = true;
982✔
873
    }
982✔
874

875
    for (auto& p : m_sessions) {
4,594✔
876
        Session& sess = *p.second;
4,594✔
877
        sess.connection_established(fast_reconnect); // Throws
4,594✔
878
    }
4,594✔
879

880
    report_connection_state_change(ConnectionState::connected); // Throws
3,524✔
881
}
3,524✔
882

883

884
void Connection::schedule_urgent_ping()
885
{
232✔
886
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
232✔
887
    if (m_ping_delay_in_progress) {
232✔
888
        m_heartbeat_timer.reset();
124✔
889
        m_ping_delay_in_progress = false;
124✔
890
        m_minimize_next_ping_delay = true;
124✔
891
        milliseconds_type now = monotonic_clock_now();
124✔
892
        initiate_ping_delay(now); // Throws
124✔
893
        return;
124✔
894
    }
124✔
895
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
108✔
896
    if (!m_send_ping)
108✔
897
        m_minimize_next_ping_delay = true;
108✔
898
}
108✔
899

900

901
void Connection::initiate_ping_delay(milliseconds_type now)
902
{
3,806✔
903
    REALM_ASSERT(!m_ping_delay_in_progress);
3,806✔
904
    REALM_ASSERT(!m_waiting_for_pong);
3,806✔
905
    REALM_ASSERT(!m_send_ping);
3,806✔
906

907
    milliseconds_type delay = 0;
3,806✔
908
    if (!m_minimize_next_ping_delay) {
3,806✔
909
        delay = m_client.m_ping_keepalive_period;
3,666✔
910
        // Make a randomized deduction of up to 10%, or up to 100% if this is
911
        // the first PING message to be sent since the connection was
912
        // established. The purpose of this randomized deduction is to reduce
913
        // the risk of many connections sending PING messages simultaneously to
914
        // the server.
915
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,666✔
916
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,666✔
917
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,666✔
918
        delay -= randomized_deduction;
3,666✔
919
        // Deduct the time spent waiting for PONG
920
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,666✔
921
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,666✔
922
        if (spent_time < delay) {
3,666✔
923
            delay -= spent_time;
3,658✔
924
        }
3,658✔
925
        else {
8✔
926
            delay = 0;
8✔
927
        }
8✔
928
    }
3,666✔
929
    else {
140✔
930
        m_minimize_next_ping_delay = false;
140✔
931
    }
140✔
932

933

934
    m_ping_delay_in_progress = true;
3,806✔
935

936
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,806✔
937
        if (status == ErrorCodes::OperationAborted)
3,804✔
938
            return;
3,624✔
939
        else if (!status.is_ok())
180✔
940
            throw Exception(status);
×
941

942
        handle_ping_delay();                                    // Throws
180✔
943
    });                                                         // Throws
180✔
944
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,806✔
945
}
3,806✔
946

947

948
void Connection::handle_ping_delay()
949
{
182✔
950
    REALM_ASSERT(m_ping_delay_in_progress);
182✔
951
    m_ping_delay_in_progress = false;
182✔
952
    m_send_ping = true;
182✔
953

954
    initiate_pong_timeout(); // Throws
182✔
955

956
    if (m_state == ConnectionState::connected && !m_sending)
182✔
957
        send_next_message(); // Throws
126✔
958
}
182✔
959

960

961
void Connection::initiate_pong_timeout()
962
{
182✔
963
    REALM_ASSERT(!m_ping_delay_in_progress);
182✔
964
    REALM_ASSERT(!m_waiting_for_pong);
182✔
965
    REALM_ASSERT(m_send_ping);
182✔
966

967
    m_waiting_for_pong = true;
182✔
968
    m_pong_wait_started_at = monotonic_clock_now();
182✔
969

970
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
182✔
971
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
182✔
972
        if (status == ErrorCodes::OperationAborted)
182✔
973
            return;
170✔
974
        else if (!status.is_ok())
12✔
975
            throw Exception(status);
×
976

977
        handle_pong_timeout(); // Throws
12✔
978
    });                        // Throws
12✔
979
}
182✔
980

981

982
void Connection::handle_pong_timeout()
983
{
12✔
984
    REALM_ASSERT(m_waiting_for_pong);
12✔
985
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
986
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
987
                                 ConnectionTerminationReason::pong_timeout);
12✔
988
}
12✔
989

990

991
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
992
{
99,338✔
993
    // Stop sending messages if an websocket error was received.
994
    if (m_websocket_error_received)
99,338✔
995
        return;
×
996

997
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
99,338✔
998
        if (sentinel->destroyed) {
99,268✔
999
            return;
1,428✔
1000
        }
1,428✔
1001
        if (!status.is_ok()) {
97,840✔
1002
            if (status != ErrorCodes::Error::OperationAborted) {
×
1003
                // Write errors will be handled by the websocket_write_error_handler() callback
1004
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
1005
            }
×
1006
            return;
×
1007
        }
×
1008
        handle_write_message(); // Throws
97,840✔
1009
    });                         // Throws
97,840✔
1010
    m_sending_session = sess;
99,338✔
1011
    m_sending = true;
99,338✔
1012
}
99,338✔
1013

1014

1015
void Connection::handle_write_message()
1016
{
97,842✔
1017
    m_sending_session->message_sent(); // Throws
97,842✔
1018
    if (m_sending_session->m_state == Session::Deactivated) {
97,842✔
1019
        finish_session_deactivation(m_sending_session);
130✔
1020
    }
130✔
1021
    m_sending_session = nullptr;
97,842✔
1022
    m_sending = false;
97,842✔
1023
    send_next_message(); // Throws
97,842✔
1024
}
97,842✔
1025

1026

1027
void Connection::send_next_message()
1028
{
164,778✔
1029
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
164,778✔
1030
    REALM_ASSERT(!m_sending_session);
164,778✔
1031
    REALM_ASSERT(!m_sending);
164,778✔
1032
    if (m_send_ping) {
164,778✔
1033
        send_ping(); // Throws
170✔
1034
        return;
170✔
1035
    }
170✔
1036
    while (!m_sessions_enlisted_to_send.empty()) {
232,870✔
1037
        // The state of being connected is not supposed to be able to change
1038
        // across this loop thanks to the "no callback reentrance" guarantee
1039
        // provided by Websocket::async_write_text(), and friends.
1040
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
167,874✔
1041

1042
        Session& sess = *m_sessions_enlisted_to_send.front();
167,874✔
1043
        m_sessions_enlisted_to_send.pop_front();
167,874✔
1044
        sess.send_message(); // Throws
167,874✔
1045

1046
        if (sess.m_state == Session::Deactivated) {
167,874✔
1047
            finish_session_deactivation(&sess);
2,910✔
1048
        }
2,910✔
1049

1050
        // An enlisted session may choose to not send a message. In that case,
1051
        // we should pass the opportunity to the next enlisted session.
1052
        if (m_sending)
167,874✔
1053
            break;
99,612✔
1054
    }
167,874✔
1055
}
164,608✔
1056

1057

1058
void Connection::send_ping()
1059
{
170✔
1060
    REALM_ASSERT(!m_ping_delay_in_progress);
170✔
1061
    REALM_ASSERT(m_waiting_for_pong);
170✔
1062
    REALM_ASSERT(m_send_ping);
170✔
1063

1064
    m_send_ping = false;
170✔
1065
    if (m_reconnect_info.scheduled_reset)
170✔
1066
        m_ping_after_scheduled_reset_of_reconnect_info = true;
124✔
1067

1068
    m_last_ping_sent_at = monotonic_clock_now();
170✔
1069
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
170✔
1070
                 m_previous_ping_rtt); // Throws
170✔
1071

1072
    ClientProtocol& protocol = get_client_protocol();
170✔
1073
    OutputBuffer& out = get_output_buffer();
170✔
1074
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
170✔
1075
    initiate_write_ping(out);                                          // Throws
170✔
1076
    m_ping_sent = true;
170✔
1077
}
170✔
1078

1079

1080
void Connection::initiate_write_ping(const OutputBuffer& out)
1081
{
170✔
1082
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
170✔
1083
        if (sentinel->destroyed) {
170✔
1084
            return;
2✔
1085
        }
2✔
1086
        if (!status.is_ok()) {
168✔
1087
            if (status != ErrorCodes::Error::OperationAborted) {
×
1088
                // Write errors will be handled by the websocket_write_error_handler() callback
1089
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1090
            }
×
1091
            return;
×
1092
        }
×
1093
        handle_write_ping(); // Throws
168✔
1094
    });                      // Throws
168✔
1095
    m_sending = true;
170✔
1096
}
170✔
1097

1098

1099
void Connection::handle_write_ping()
1100
{
168✔
1101
    REALM_ASSERT(m_sending);
168✔
1102
    REALM_ASSERT(!m_sending_session);
168✔
1103
    m_sending = false;
168✔
1104
    send_next_message(); // Throws
168✔
1105
}
168✔
1106

1107

1108
void Connection::handle_message_received(util::Span<const char> data)
1109
{
77,942✔
1110
    // parse_message_received() parses the message and calls the proper handler
1111
    // on the Connection object (this).
1112
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
77,942✔
1113
}
77,942✔
1114

1115

1116
void Connection::initiate_disconnect_wait()
1117
{
4,376✔
1118
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,376✔
1119

1120
    if (m_disconnect_delay_in_progress) {
4,376✔
1121
        m_reconnect_disconnect_timer.reset();
1,852✔
1122
        m_disconnect_delay_in_progress = false;
1,852✔
1123
    }
1,852✔
1124

1125
    milliseconds_type time = m_client.m_connection_linger_time;
4,376✔
1126

1127
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,376✔
1128
        // If the operation is aborted, the connection object may have been
1129
        // destroyed.
1130
        if (status != ErrorCodes::OperationAborted)
4,374✔
1131
            handle_disconnect_wait(status); // Throws
12✔
1132
    });                                     // Throws
4,374✔
1133
    m_disconnect_delay_in_progress = true;
4,376✔
1134
}
4,376✔
1135

1136

1137
void Connection::handle_disconnect_wait(Status status)
1138
{
12✔
1139
    if (!status.is_ok()) {
12✔
1140
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1141
        throw Exception(status);
×
1142
    }
×
1143

1144
    m_disconnect_delay_in_progress = false;
12✔
1145

1146
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1147
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1148
        if (m_client.m_connection_linger_time > 0)
12✔
1149
            logger.detail("Linger time expired"); // Throws
×
1150
        voluntary_disconnect();                   // Throws
12✔
1151
        logger.info("Disconnected");              // Throws
12✔
1152
    }
12✔
1153
}
12✔
1154

1155

1156
void Connection::close_due_to_protocol_error(Status status)
1157
{
16✔
1158
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1159
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1160
    involuntary_disconnect(std::move(error_info),
16✔
1161
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
16✔
1162
}
16✔
1163

1164

1165
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1166
{
446✔
1167
    logger.info("Connection closed due to error: %1", status); // Throws
446✔
1168

1169
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
446✔
1170
}
446✔
1171

1172

1173
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1174
{
546✔
1175
    logger.info("Connection closed due to transient error: %1", status); // Throws
546✔
1176
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
546✔
1177
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
546✔
1178

1179
    involuntary_disconnect(std::move(error_info), reason); // Throw
546✔
1180
}
546✔
1181

1182

1183
// Close connection due to error discovered on the server-side, and then
1184
// reported to the client by way of a connection-level ERROR message.
1185
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1186
{
66✔
1187
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1188
                int(error_code)); // Throws
66✔
1189

1190
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
66✔
1191
                                      : ConnectionTerminationReason::server_said_try_again_later;
66✔
1192
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1193
                           reason); // Throws
66✔
1194
}
66✔
1195

1196

1197
void Connection::disconnect(const SessionErrorInfo& info)
1198
{
3,730✔
1199
    // Cancel connect timeout watchdog
1200
    m_connect_timer.reset();
3,730✔
1201

1202
    if (m_state == ConnectionState::connected) {
3,730✔
1203
        m_disconnect_time = monotonic_clock_now();
3,522✔
1204
        m_disconnect_has_occurred = true;
3,522✔
1205

1206
        // Sessions that are in the Deactivating state at this time can be
1207
        // immediately discarded, in part because they are no longer enlisted to
1208
        // send. Such sessions will be taken to the Deactivated state by
1209
        // Session::connection_lost(), and then they will be removed from
1210
        // `m_sessions`.
1211
        auto i = m_sessions.begin(), end = m_sessions.end();
3,522✔
1212
        while (i != end) {
8,084✔
1213
            // Prevent invalidation of the main iterator when erasing elements
1214
            auto j = i++;
4,562✔
1215
            Session& sess = *j->second;
4,562✔
1216
            sess.connection_lost(); // Throws
4,562✔
1217
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,562✔
1218
                m_sessions.erase(j);
2,504✔
1219
        }
4,562✔
1220
    }
3,522✔
1221

1222
    change_state_to_disconnected();
3,730✔
1223

1224
    m_ping_delay_in_progress = false;
3,730✔
1225
    m_waiting_for_pong = false;
3,730✔
1226
    m_send_ping = false;
3,730✔
1227
    m_minimize_next_ping_delay = false;
3,730✔
1228
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,730✔
1229
    m_ping_sent = false;
3,730✔
1230
    m_heartbeat_timer.reset();
3,730✔
1231
    m_previous_ping_rtt = 0;
3,730✔
1232

1233
    m_websocket_sentinel->destroyed = true;
3,730✔
1234
    m_websocket_sentinel.reset();
3,730✔
1235
    m_websocket.reset();
3,730✔
1236
    m_input_body_buffer.reset();
3,730✔
1237
    m_sending_session = nullptr;
3,730✔
1238
    m_sessions_enlisted_to_send.clear();
3,730✔
1239
    m_sending = false;
3,730✔
1240

1241
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,730✔
1242
    initiate_reconnect_wait();                                           // Throws
3,730✔
1243
}
3,730✔
1244

1245
bool Connection::is_flx_sync_connection() const noexcept
1246
{
112,082✔
1247
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
112,082✔
1248
}
112,082✔
1249

1250
void Connection::receive_pong(milliseconds_type timestamp)
1251
{
158✔
1252
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
158✔
1253

1254
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
158✔
1255
    if (REALM_UNLIKELY(!legal_at_this_time)) {
158✔
1256
        close_due_to_protocol_error(
×
1257
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1258
        return;
×
1259
    }
×
1260

1261
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
158✔
1262
        close_due_to_protocol_error(
×
1263
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1264
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1265
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1266
        return;
×
1267
    }
×
1268

1269
    milliseconds_type now = monotonic_clock_now();
158✔
1270
    milliseconds_type round_trip_time = now - timestamp;
158✔
1271
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
158✔
1272
    m_previous_ping_rtt = round_trip_time;
158✔
1273

1274
    // If this PONG message is a response to a PING mesage that was sent after
1275
    // the last invocation of cancel_reconnect_delay(), then the connection is
1276
    // still good, and we do not have to skip the next reconnect delay.
1277
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
158✔
1278
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
110✔
1279
        m_ping_after_scheduled_reset_of_reconnect_info = false;
110✔
1280
        m_reconnect_info.scheduled_reset = false;
110✔
1281
    }
110✔
1282

1283
    m_heartbeat_timer.reset();
158✔
1284
    m_waiting_for_pong = false;
158✔
1285

1286
    initiate_ping_delay(now); // Throws
158✔
1287

1288
    if (m_client.m_roundtrip_time_handler)
158✔
1289
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1290
}
158✔
1291

1292
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1293
{
71,928✔
1294
    if (session_ident == 0) {
71,928✔
1295
        return nullptr;
×
1296
    }
×
1297

1298
    auto* sess = get_session(session_ident);
71,928✔
1299
    if (REALM_LIKELY(sess)) {
71,928✔
1300
        return sess;
71,928✔
1301
    }
71,928✔
1302
    // Check the history to see if the message received was for a previous session
UNCOV
1303
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
×
1304
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1305
        close_due_to_protocol_error(
×
1306
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1307
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1308
                          session_ident)});
×
1309
    }
×
UNCOV
1310
    else {
×
UNCOV
1311
        logger.error("Received %1 message for closed session, session_ident = %2", message,
×
UNCOV
1312
                     session_ident); // Throws
×
UNCOV
1313
    }
×
UNCOV
1314
    return nullptr;
×
1315
}
71,928✔
1316

1317
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1318
{
768✔
1319
    Session* sess = nullptr;
768✔
1320
    if (session_ident != 0) {
768✔
1321
        sess = find_and_validate_session(session_ident, "ERROR");
698✔
1322
        if (REALM_UNLIKELY(!sess)) {
698✔
1323
            return;
×
1324
        }
×
1325
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
698✔
1326
            close_due_to_protocol_error(std::move(status)); // Throws
×
1327
            return;
×
1328
        }
×
1329

1330
        if (sess->m_state == Session::Deactivated) {
698✔
1331
            finish_session_deactivation(sess);
×
1332
        }
×
1333
        return;
698✔
1334
    }
698✔
1335

1336
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1337
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1338
                info.server_requests_action); // Throws
70✔
1339

1340
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1341
    if (REALM_LIKELY(known_error_code)) {
70✔
1342
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1343
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1344
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1345
            return;
66✔
1346
        }
66✔
1347
        close_due_to_protocol_error(
×
1348
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1349
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1350
                          info.raw_error_code)});
×
1351
    }
×
1352
    else {
4✔
1353
        close_due_to_protocol_error(
4✔
1354
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1355
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1356
    }
4✔
1357
}
70✔
1358

1359

1360
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1361
                                             session_ident_type session_ident)
1362
{
20✔
1363
    if (session_ident == 0) {
20✔
1364
        return close_due_to_protocol_error(
×
1365
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1366
    }
×
1367

1368
    if (!is_flx_sync_connection()) {
20✔
1369
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1370
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1371
    }
×
1372

1373
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1374
    if (REALM_UNLIKELY(!sess)) {
20✔
1375
        return;
×
1376
    }
×
1377

1378
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
20✔
1379
        close_due_to_protocol_error(std::move(status));
×
1380
    }
×
1381
}
20✔
1382

1383

1384
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1385
{
3,420✔
1386
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,420✔
1387
    if (REALM_UNLIKELY(!sess)) {
3,420✔
1388
        return;
×
1389
    }
×
1390

1391
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,420✔
1392
        close_due_to_protocol_error(std::move(status)); // Throws
×
1393
}
3,420✔
1394

1395
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
1396
{
47,668✔
1397
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
47,668✔
1398
    if (REALM_UNLIKELY(!sess)) {
47,668✔
1399
        return;
×
1400
    }
×
1401

1402
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
47,668✔
1403
        close_due_to_protocol_error(std::move(status));
2✔
1404
    }
2✔
1405
}
47,668✔
1406

1407
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1408
{
16,416✔
1409
    Session* sess = find_and_validate_session(session_ident, "MARK");
16,416✔
1410
    if (REALM_UNLIKELY(!sess)) {
16,416✔
1411
        return;
×
1412
    }
×
1413

1414
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
16,416✔
1415
        close_due_to_protocol_error(std::move(status)); // Throws
10✔
1416
}
16,416✔
1417

1418

1419
void Connection::receive_unbound_message(session_ident_type session_ident)
1420
{
3,654✔
1421
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,654✔
1422
    if (REALM_UNLIKELY(!sess)) {
3,654✔
1423
        return;
×
1424
    }
×
1425

1426
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,654✔
1427
        close_due_to_protocol_error(std::move(status)); // Throws
×
1428
        return;
×
1429
    }
×
1430

1431
    if (sess->m_state == Session::Deactivated) {
3,654✔
1432
        finish_session_deactivation(sess);
3,654✔
1433
    }
3,654✔
1434
}
3,654✔
1435

1436

1437
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1438
                                               std::string_view body)
1439
{
52✔
1440
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
52✔
1441
    if (REALM_UNLIKELY(!sess)) {
52✔
1442
        return;
×
1443
    }
×
1444

1445
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
52✔
1446
        close_due_to_protocol_error(std::move(status));
×
1447
    }
×
1448
}
52✔
1449

1450

1451
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1452
                                            std::string_view message)
1453
{
5,786✔
1454
    std::string prefix;
5,786✔
1455
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,786✔
1456
        prefix = util::format("Server[%1]", m_appservices_coid);
5,786✔
1457
    }
5,786✔
1458
    else {
×
1459
        prefix = "Server";
×
1460
    }
×
1461

1462
    if (session_ident != 0) {
5,786✔
1463
        if (auto sess = get_session(session_ident)) {
3,838✔
1464
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,816✔
1465
            return;
3,816✔
1466
        }
3,816✔
1467

1468
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
22✔
1469
                   message);
22✔
1470
        return;
22✔
1471
    }
3,838✔
1472

1473
    logger.log(level, "%1 log: %2", prefix, message);
1,948✔
1474
}
1,948✔
1475

1476

1477
void Connection::receive_appservices_request_id(std::string_view coid)
1478
{
5,470✔
1479
    // Only set once per connection
1480
    if (!coid.empty() && m_appservices_coid.empty()) {
5,470✔
1481
        m_appservices_coid = coid;
2,526✔
1482
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
2,526✔
1483
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
2,526✔
1484
    }
2,526✔
1485
}
5,470✔
1486

1487

1488
void Connection::handle_protocol_error(Status status)
1489
{
×
1490
    close_due_to_protocol_error(std::move(status));
×
1491
}
×
1492

1493

1494
// Sessions are guaranteed to be granted the opportunity to send a message in
1495
// the order that they enlist. Note that this is important to ensure
1496
// nonoverlapping communication with the server for consecutive sessions
1497
// associated with the same Realm file.
1498
//
1499
// CAUTION: The specified session may get destroyed before this function
1500
// returns, but only if its Session::send_message() puts it into the Deactivated
1501
// state.
1502
void Connection::enlist_to_send(Session* sess)
1503
{
169,448✔
1504
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
169,448✔
1505
    m_sessions_enlisted_to_send.push_back(sess); // Throws
169,448✔
1506
    if (!m_sending)
169,448✔
1507
        send_next_message(); // Throws
66,646✔
1508
}
169,448✔
1509

1510

1511
std::string Connection::get_active_appservices_connection_id()
1512
{
72✔
1513
    return m_appservices_coid;
72✔
1514
}
72✔
1515

1516
void Session::cancel_resumption_delay()
1517
{
3,890✔
1518
    REALM_ASSERT_EX(m_state == Active, m_state);
3,890✔
1519

1520
    if (!m_suspended)
3,890✔
1521
        return;
3,832✔
1522

1523
    m_suspended = false;
58✔
1524

1525
    logger.debug("Resumed"); // Throws
58✔
1526

1527
    if (unbind_process_complete())
58✔
1528
        initiate_rebind(); // Throws
32✔
1529

1530
    m_conn.one_more_active_unsuspended_session(); // Throws
58✔
1531
    if (m_try_again_activation_timer) {
58✔
1532
        m_try_again_activation_timer.reset();
8✔
1533
    }
8✔
1534

1535
    on_resumed(); // Throws
58✔
1536
}
58✔
1537

1538

1539
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1540
                                                 std::vector<ProtocolErrorInfo>* out)
1541
{
23,088✔
1542
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
23,088✔
1543
        return;
23,044✔
1544
    }
23,044✔
1545

1546
#ifdef REALM_DEBUG
44✔
1547
    REALM_ASSERT_DEBUG(
44✔
1548
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
44✔
1549
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
44✔
1550
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
44✔
1551
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1552
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1553
                       }));
44✔
1554
#endif
44✔
1555

1556
    while (!m_pending_compensating_write_errors.empty() &&
88✔
1557
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
88✔
1558
               changesets.back().version) {
44✔
1559
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1560
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1561
        out->push_back(std::move(cur_error));
44✔
1562
        m_pending_compensating_write_errors.pop_front();
44✔
1563
    }
44✔
1564
}
44✔
1565

1566

1567
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1568
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1569
                                   DownloadBatchState download_batch_state)
1570
{
44,704✔
1571
    auto& history = get_history();
44,704✔
1572
    if (received_changesets.empty()) {
44,704✔
1573
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,594✔
1574
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1575
                                       "received empty download message that was not the last in batch",
×
1576
                                       ProtocolError::bad_progress);
×
1577
        }
×
1578
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
21,594✔
1579
        return;
21,594✔
1580
    }
21,594✔
1581

1582
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,110✔
1583
    auto transact = get_db()->start_read();
23,110✔
1584
    history.integrate_server_changesets(
23,110✔
1585
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
23,110✔
1586
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
23,110✔
1587
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
23,088✔
1588
        }); // Throws
23,088✔
1589
    if (received_changesets.size() == 1) {
23,110✔
1590
        logger.debug("1 remote changeset integrated, producing client version %1",
15,326✔
1591
                     version_info.sync_version.version); // Throws
15,326✔
1592
    }
15,326✔
1593
    else {
7,784✔
1594
        logger.debug("%2 remote changesets integrated, producing client version %1",
7,784✔
1595
                     version_info.sync_version.version, received_changesets.size()); // Throws
7,784✔
1596
    }
7,784✔
1597

1598
    for (const auto& pending_error : pending_compensating_write_errors) {
23,110✔
1599
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1600
                    pending_error.compensating_write_rejected_client_version,
44✔
1601
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1602
        try {
44✔
1603
            on_connection_state_changed(
44✔
1604
                m_conn.get_state(),
44✔
1605
                SessionErrorInfo{pending_error,
44✔
1606
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1607
                                                          pending_error.message)});
44✔
1608
        }
44✔
1609
        catch (...) {
44✔
1610
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1611
        }
×
1612
    }
44✔
1613
}
23,110✔
1614

1615

1616
void Session::on_integration_failure(const IntegrationException& error)
1617
{
40✔
1618
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1619
    REALM_ASSERT(!m_client_error && !m_error_to_send);
40✔
1620
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1621

1622
    m_client_error = util::make_optional<IntegrationException>(error);
40✔
1623
    m_error_to_send = true;
40✔
1624
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1625
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1626
    // Surface the error to the user otherwise is lost.
1627
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
40✔
1628

1629
    // Since the deactivation process has not been initiated, the UNBIND
1630
    // message cannot have been sent unless an ERROR message was received.
1631
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
40✔
1632
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
40✔
1633
        ensure_enlisted_to_send(); // Throws
36✔
1634
    }
36✔
1635
}
40✔
1636

1637
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1638
{
46,680✔
1639
    REALM_ASSERT_EX(m_state == Active, m_state);
46,680✔
1640
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
46,680✔
1641

1642
    m_download_progress = progress.download;
46,680✔
1643
    m_progress = progress;
46,680✔
1644

1645
    if (progress.upload.client_version > m_upload_progress.client_version)
46,680✔
1646
        m_upload_progress = progress.upload;
594✔
1647

1648
    do_recognize_sync_version(client_version); // Allows upload process to resume
46,680✔
1649

1650
    check_for_download_completion(); // Throws
46,680✔
1651

1652
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1653
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
46,680✔
1654
        auto& flx_subscription_store = *get_flx_subscription_store();
3,382✔
1655
        get_migration_store()->create_subscriptions(flx_subscription_store);
3,382✔
1656
    }
3,382✔
1657

1658
    // Since the deactivation process has not been initiated, the UNBIND
1659
    // message cannot have been sent unless an ERROR message was received.
1660
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
46,680✔
1661
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
46,680✔
1662
        ensure_enlisted_to_send(); // Throws
46,674✔
1663
    }
46,674✔
1664
}
46,680✔
1665

1666

1667
Session::~Session()
1668
{
10,090✔
1669
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
1670
}
10,090✔
1671

1672

1673
std::string Session::make_logger_prefix(session_ident_type ident)
1674
{
10,090✔
1675
    std::ostringstream out;
10,090✔
1676
    out.imbue(std::locale::classic());
10,090✔
1677
    out << "Session[" << ident << "]: "; // Throws
10,090✔
1678
    return out.str();                    // Throws
10,090✔
1679
}
10,090✔
1680

1681

1682
void Session::activate()
1683
{
10,090✔
1684
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
10,090✔
1685

1686
    logger.debug("Activating"); // Throws
10,090✔
1687

1688
    if (REALM_LIKELY(!get_client().is_dry_run())) {
10,090✔
1689
        bool file_exists = util::File::exists(get_realm_path());
10,090✔
1690

1691
        logger.info("client_reset_config = %1, Realm exists = %2, upload messages allowed = %3",
10,090✔
1692
                    get_client_reset_config().has_value(), file_exists, upload_messages_allowed() ? "yes" : "no");
10,090✔
1693
        get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
10,090✔
1694
    }
10,090✔
1695
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,090✔
1696
                 m_client_file_ident.salt); // Throws
10,090✔
1697
    m_upload_progress = m_progress.upload;
10,090✔
1698
    m_download_progress = m_progress.download;
10,090✔
1699
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,090✔
1700
    init_progress_handler();
10,090✔
1701

1702
    logger.debug("last_version_available = %1", m_last_version_available);                     // Throws
10,090✔
1703
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,090✔
1704
    logger.debug("progress_download_client_version = %1",
10,090✔
1705
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,090✔
1706
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
10,090✔
1707
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,090✔
1708

1709
    reset_protocol_state();
10,090✔
1710
    m_state = Active;
10,090✔
1711

1712
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,090✔
1713

1714
    REALM_ASSERT(!m_suspended);
10,090✔
1715
    m_conn.one_more_active_unsuspended_session(); // Throws
10,090✔
1716

1717
    try {
10,090✔
1718
        process_pending_flx_bootstrap();
10,090✔
1719
    }
10,090✔
1720
    catch (const IntegrationException& error) {
10,090✔
1721
        on_integration_failure(error);
×
1722
    }
×
1723
    catch (...) {
10,090✔
1724
        on_integration_failure(IntegrationException(exception_to_status()));
4✔
1725
    }
4✔
1726

1727
    // Checks if there is a pending client reset
1728
    handle_pending_client_reset_acknowledgement();
10,090✔
1729
}
10,090✔
1730

1731

1732
// The caller (Connection) must discard the session if the session has become
1733
// deactivated upon return.
1734
void Session::initiate_deactivation()
1735
{
10,088✔
1736
    REALM_ASSERT_EX(m_state == Active, m_state);
10,088✔
1737

1738
    logger.debug("Initiating deactivation"); // Throws
10,088✔
1739

1740
    m_state = Deactivating;
10,088✔
1741

1742
    if (!m_suspended)
10,088✔
1743
        m_conn.one_less_active_unsuspended_session(); // Throws
9,478✔
1744

1745
    if (m_enlisted_to_send) {
10,088✔
1746
        REALM_ASSERT(!unbind_process_complete());
5,654✔
1747
        return;
5,654✔
1748
    }
5,654✔
1749

1750
    // Deactivate immediately if the BIND message has not yet been sent and the
1751
    // session is not enlisted to send, or if the unbinding process has already
1752
    // completed.
1753
    if (!m_bind_message_sent || unbind_process_complete()) {
4,434✔
1754
        complete_deactivation(); // Throws
892✔
1755
        // Life cycle state is now Deactivated
1756
        return;
892✔
1757
    }
892✔
1758

1759
    // Ready to send the UNBIND message, if it has not already been sent
1760
    if (!m_unbind_message_sent) {
3,542✔
1761
        enlist_to_send(); // Throws
3,332✔
1762
        return;
3,332✔
1763
    }
3,332✔
1764
}
3,542✔
1765

1766

1767
void Session::complete_deactivation()
1768
{
10,090✔
1769
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
10,090✔
1770
    m_state = Deactivated;
10,090✔
1771

1772
    logger.debug("Deactivation completed"); // Throws
10,090✔
1773
}
10,090✔
1774

1775

1776
// Called by the associated Connection object when this session is granted an
1777
// opportunity to send a message.
1778
//
1779
// The caller (Connection) must discard the session if the session has become
1780
// deactivated upon return.
1781
void Session::send_message()
1782
{
167,870✔
1783
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
167,870✔
1784
    REALM_ASSERT(m_enlisted_to_send);
167,870✔
1785
    m_enlisted_to_send = false;
167,870✔
1786
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
167,870✔
1787
        // Deactivation has been initiated. If the UNBIND message has not been
1788
        // sent yet, there is no point in sending it. Instead, we can let the
1789
        // deactivation process complete.
1790
        if (!m_bind_message_sent) {
9,200✔
1791
            return complete_deactivation(); // Throws
2,910✔
1792
            // Life cycle state is now Deactivated
1793
        }
2,910✔
1794

1795
        // Session life cycle state is Deactivating or the unbinding process has
1796
        // been initiated by a session specific ERROR message
1797
        if (!m_unbind_message_sent)
6,290✔
1798
            send_unbind_message(); // Throws
6,290✔
1799
        return;
6,290✔
1800
    }
9,200✔
1801

1802
    // Session life cycle state is Active and the unbinding process has
1803
    // not been initiated
1804
    REALM_ASSERT(!m_unbind_message_sent);
158,670✔
1805

1806
    if (!m_bind_message_sent)
158,670✔
1807
        return send_bind_message(); // Throws
8,670✔
1808

1809
    if (!m_ident_message_sent) {
150,000✔
1810
        if (have_client_file_ident())
7,278✔
1811
            send_ident_message(); // Throws
7,278✔
1812
        return;
7,278✔
1813
    }
7,278✔
1814

1815
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
142,722✔
1816
                                                      [](const PendingTestCommand& command) {
142,722✔
1817
                                                          return command.pending;
108✔
1818
                                                      });
108✔
1819
    if (has_pending_test_command) {
142,722✔
1820
        return send_test_command_message();
52✔
1821
    }
52✔
1822

1823
    if (m_error_to_send)
142,670✔
1824
        return send_json_error_message(); // Throws
36✔
1825

1826
    // Stop sending upload, mark and query messages when the client detects an error.
1827
    if (m_client_error) {
142,634✔
1828
        return;
12✔
1829
    }
12✔
1830

1831
    if (m_target_download_mark > m_last_download_mark_sent)
142,622✔
1832
        return send_mark_message(); // Throws
17,170✔
1833

1834
    auto is_upload_allowed = [&]() -> bool {
125,462✔
1835
        if (!m_is_flx_sync_session) {
125,462✔
1836
            return true;
109,282✔
1837
        }
109,282✔
1838

1839
        auto migration_store = get_migration_store();
16,180✔
1840
        if (!migration_store) {
16,180✔
1841
            return true;
×
1842
        }
×
1843

1844
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
16,180✔
1845
        if (!sentinel_query_version) {
16,180✔
1846
            return true;
16,152✔
1847
        }
16,152✔
1848

1849
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
1850
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1851
    };
16,180✔
1852

1853
    if (!is_upload_allowed()) {
125,452✔
1854
        return;
16✔
1855
    }
16✔
1856

1857
    auto check_pending_flx_version = [&]() -> bool {
125,446✔
1858
        if (!m_is_flx_sync_session) {
125,446✔
1859
            return false;
109,282✔
1860
        }
109,282✔
1861

1862
        if (m_delay_uploads) {
16,164✔
1863
            return false;
3,296✔
1864
        }
3,296✔
1865

1866
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
12,868✔
1867

1868
        if (!m_pending_flx_sub_set) {
12,868✔
1869
            return false;
10,784✔
1870
        }
10,784✔
1871

1872
        // Send QUERY messages when the upload progress client version reaches the snapshot version
1873
        // of a pending subscription
1874
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
2,084✔
1875
    };
12,868✔
1876

1877
    if (check_pending_flx_version()) {
125,436✔
1878
        return send_query_change_message(); // throws
1,184✔
1879
    }
1,184✔
1880

1881
    if (!m_delay_uploads && (m_last_version_available > m_upload_progress.client_version)) {
124,252✔
1882
        return send_upload_message(); // Throws
59,728✔
1883
    }
59,728✔
1884
}
124,252✔
1885

1886

1887
void Session::send_bind_message()
1888
{
8,670✔
1889
    REALM_ASSERT_EX(m_state == Active, m_state);
8,670✔
1890

1891
    session_ident_type session_ident = m_ident;
8,670✔
1892
    // Request an ident if we don't already have one and there isn't a pending client reset diff
1893
    // The file ident can be 0 when a client reset is being performed if a brand new local realm
1894
    // has been opened (or using Async open) and a FLX/PBS migration occurs when first connecting
1895
    // to the server.
1896
    bool need_client_file_ident = !have_client_file_ident() && !get_client_reset_config();
8,670✔
1897
    const bool is_subserver = false;
8,670✔
1898

1899
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,670✔
1900
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,670✔
1901
    OutputBuffer& out = m_conn.get_output_buffer();
8,670✔
1902
    // Discard the token since it's ignored by the server.
1903
    std::string empty_access_token;
8,670✔
1904
    if (m_is_flx_sync_session) {
8,670✔
1905
        nlohmann::json bind_json_data;
1,512✔
1906
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,512✔
1907
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1908
        }
60✔
1909
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,512✔
1910
        auto schema_version = get_schema_version();
1,512✔
1911
        // Send 0 if schema is not versioned.
1912
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,512✔
1913
        if (logger.would_log(util::Logger::Level::debug)) {
1,512✔
1914
            std::string json_data_dump;
1,512✔
1915
            if (!bind_json_data.empty()) {
1,512✔
1916
                json_data_dump = bind_json_data.dump();
1,512✔
1917
            }
1,512✔
1918
            logger.debug(
1,512✔
1919
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,512✔
1920
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,512✔
1921
        }
1,512✔
1922
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,512✔
1923
                                       need_client_file_ident, is_subserver); // Throws
1,512✔
1924
    }
1,512✔
1925
    else {
7,158✔
1926
        std::string server_path = get_virt_path();
7,158✔
1927
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,158✔
1928
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,158✔
1929
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,158✔
1930
                                       need_client_file_ident, is_subserver); // Throws
7,158✔
1931
    }
7,158✔
1932
    m_conn.initiate_write_message(out, this); // Throws
8,670✔
1933

1934
    m_bind_message_sent = true;
8,670✔
1935
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
8,670✔
1936

1937
    // If there is a pending client reset diff, process that when the BIND message has
1938
    // been sent successfully and wait before sending the IDENT message. Otherwise,
1939
    // ready to send the IDENT message if the file identifier pair is already available.
1940
    if (!need_client_file_ident)
8,670✔
1941
        enlist_to_send(); // Throws
5,048✔
1942
}
8,670✔
1943

1944

1945
void Session::send_ident_message()
1946
{
7,278✔
1947
    REALM_ASSERT_EX(m_state == Active, m_state);
7,278✔
1948
    REALM_ASSERT(m_bind_message_sent);
7,278✔
1949
    REALM_ASSERT(!m_unbind_message_sent);
7,278✔
1950
    REALM_ASSERT(have_client_file_ident());
7,278✔
1951

1952
    ClientProtocol& protocol = m_conn.get_client_protocol();
7,278✔
1953
    OutputBuffer& out = m_conn.get_output_buffer();
7,278✔
1954
    session_ident_type session_ident = m_ident;
7,278✔
1955

1956
    if (m_is_flx_sync_session) {
7,278✔
1957
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,436✔
1958
        const auto active_query_body = active_query_set.to_ext_json();
1,436✔
1959
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,436✔
1960
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,436✔
1961
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,436✔
1962
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,436✔
1963
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,436✔
1964
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,436✔
1965
                     active_query_body); // Throws
1,436✔
1966
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,436✔
1967
                                        active_query_set.version(), active_query_body); // Throws
1,436✔
1968
        m_last_sent_flx_query_version = active_query_set.version();
1,436✔
1969
    }
1,436✔
1970
    else {
5,842✔
1971
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,842✔
1972
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,842✔
1973
                     "latest_server_version_salt=%6)",
5,842✔
1974
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,842✔
1975
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,842✔
1976
                     m_progress.latest_server_version.salt);                                  // Throws
5,842✔
1977
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,842✔
1978
    }
5,842✔
1979
    m_conn.initiate_write_message(out, this); // Throws
7,278✔
1980

1981
    m_ident_message_sent = true;
7,278✔
1982

1983
    // Other messages may be waiting to be sent
1984
    enlist_to_send(); // Throws
7,278✔
1985
}
7,278✔
1986

1987
void Session::send_query_change_message()
1988
{
1,184✔
1989
    REALM_ASSERT_EX(m_state == Active, m_state);
1,184✔
1990
    REALM_ASSERT(m_ident_message_sent);
1,184✔
1991
    REALM_ASSERT(!m_unbind_message_sent);
1,184✔
1992
    REALM_ASSERT(m_pending_flx_sub_set);
1,184✔
1993
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,184✔
1994

1995
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,184✔
1996
        return;
×
1997
    }
×
1998

1999
    auto sub_store = get_flx_subscription_store();
1,184✔
2000
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,184✔
2001
    auto latest_queries = latest_sub_set.to_ext_json();
1,184✔
2002
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,184✔
2003
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,184✔
2004

2005
    OutputBuffer& out = m_conn.get_output_buffer();
1,184✔
2006
    session_ident_type session_ident = get_ident();
1,184✔
2007
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,184✔
2008
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,184✔
2009
    m_conn.initiate_write_message(out, this);
1,184✔
2010

2011
    m_last_sent_flx_query_version = latest_sub_set.version();
1,184✔
2012

2013
    request_download_completion_notification();
1,184✔
2014
}
1,184✔
2015

2016
void Session::send_upload_message()
2017
{
59,730✔
2018
    REALM_ASSERT_EX(m_state == Active, m_state);
59,730✔
2019
    REALM_ASSERT(m_ident_message_sent);
59,730✔
2020
    REALM_ASSERT(!m_unbind_message_sent);
59,730✔
2021

2022
    if (REALM_UNLIKELY(get_client().is_dry_run()))
59,730✔
2023
        return;
×
2024

2025
    version_type target_upload_version = m_last_version_available;
59,730✔
2026
    if (m_pending_flx_sub_set) {
59,730✔
2027
        REALM_ASSERT(m_is_flx_sync_session);
900✔
2028
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
900✔
2029
    }
900✔
2030

2031
    std::vector<UploadChangeset> uploadable_changesets;
59,730✔
2032
    version_type locked_server_version = 0;
59,730✔
2033
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
59,730✔
2034
                                             locked_server_version); // Throws
59,730✔
2035

2036
    if (uploadable_changesets.empty()) {
59,730✔
2037
        // Nothing more to upload right now
2038
        // If we need to limit upload up to some version other than the last client version available and there are no
2039
        // changes to upload, then there is no need to send an empty message.
2040
        if (m_pending_flx_sub_set) {
29,950✔
2041
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
280✔
2042
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
280✔
2043
            // Other messages may be waiting to be sent
2044
            return enlist_to_send(); // Throws
280✔
2045
        }
280✔
2046
    }
29,950✔
2047

2048
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
59,450✔
2049
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
620✔
2050
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
620✔
2051
    }
620✔
2052

2053
    version_type progress_client_version = m_upload_progress.client_version;
59,450✔
2054
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
59,450✔
2055

2056
    if (!upload_messages_allowed()) {
59,450✔
2057
        logger.debug("UPLOAD not allowed: upload progress(progress_client_version=%1, progress_server_version=%2, "
792✔
2058
                     "locked_server_version=%3, num_changesets=%4)",
792✔
2059
                     progress_client_version, progress_server_version, locked_server_version,
792✔
2060
                     uploadable_changesets.size()); // Throws
792✔
2061
        return;
792✔
2062
    }
792✔
2063

2064
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
58,658✔
2065
                 "locked_server_version=%3, num_changesets=%4)",
58,658✔
2066
                 progress_client_version, progress_server_version, locked_server_version,
58,658✔
2067
                 uploadable_changesets.size()); // Throws
58,658✔
2068

2069
    ClientProtocol& protocol = m_conn.get_client_protocol();
58,658✔
2070
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
58,658✔
2071

2072
    for (const UploadChangeset& uc : uploadable_changesets) {
58,658✔
2073
        logger.debug(util::LogCategory::changeset,
42,924✔
2074
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
42,924✔
2075
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
42,924✔
2076
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
42,924✔
2077
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
42,924✔
2078
        if (logger.would_log(util::Logger::Level::trace)) {
42,924✔
2079
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2080
            if (changeset_data.size() < 1024) {
×
2081
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2082
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2083
            }
×
2084
            else {
×
2085
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2086
                             protocol.compressed_hex_dump(changeset_data));
×
2087
            }
×
2088

2089
#if REALM_DEBUG
×
2090
            ChunkedBinaryInputStream in{changeset_data};
×
2091
            Changeset log;
×
2092
            try {
×
2093
                parse_changeset(in, log);
×
2094
                std::stringstream ss;
×
2095
                log.print(ss);
×
2096
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2097
            }
×
2098
            catch (const BadChangesetError& err) {
×
2099
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
×
2100
            }
×
2101
#endif
×
2102
        }
×
2103

2104
#if 0 // Upload log compaction is currently not implemented
2105
        if (!get_client().m_disable_upload_compaction) {
2106
            ChangesetEncoder::Buffer encode_buffer;
2107

2108
            {
2109
                // Upload compaction only takes place within single changesets to
2110
                // avoid another client seeing inconsistent snapshots.
2111
                ChunkedBinaryInputStream stream{uc.changeset};
2112
                Changeset changeset;
2113
                parse_changeset(stream, changeset); // Throws
2114
                // FIXME: What is the point of setting these? How can compaction care about them?
2115
                changeset.version = uc.progress.client_version;
2116
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2117
                changeset.origin_timestamp = uc.origin_timestamp;
2118
                changeset.origin_file_ident = uc.origin_file_ident;
2119

2120
                compact_changesets(&changeset, 1);
2121
                encode_changeset(changeset, encode_buffer);
2122

2123
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2124
                             encode_buffer.size()); // Throws
2125
            }
2126

2127
            upload_message_builder.add_changeset(
2128
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2129
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2130
        }
2131
        else
2132
#endif
2133
        {
42,924✔
2134
            upload_message_builder.add_changeset(uc.progress.client_version,
42,924✔
2135
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
42,924✔
2136
                                                 uc.origin_file_ident,
42,924✔
2137
                                                 uc.changeset); // Throws
42,924✔
2138
        }
42,924✔
2139
    }
42,924✔
2140

2141
    int protocol_version = m_conn.get_negotiated_protocol_version();
58,658✔
2142
    OutputBuffer& out = m_conn.get_output_buffer();
58,658✔
2143
    session_ident_type session_ident = get_ident();
58,658✔
2144
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
58,658✔
2145
                                               progress_server_version,
58,658✔
2146
                                               locked_server_version); // Throws
58,658✔
2147
    m_conn.initiate_write_message(out, this);                          // Throws
58,658✔
2148

2149
    call_debug_hook(SyncClientHookEvent::UploadMessageSent);
58,658✔
2150

2151
    // Other messages may be waiting to be sent
2152
    enlist_to_send(); // Throws
58,658✔
2153
}
58,658✔
2154

2155

2156
void Session::send_mark_message()
2157
{
17,170✔
2158
    REALM_ASSERT_EX(m_state == Active, m_state);
17,170✔
2159
    REALM_ASSERT(m_ident_message_sent);
17,170✔
2160
    REALM_ASSERT(!m_unbind_message_sent);
17,170✔
2161
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,170✔
2162

2163
    request_ident_type request_ident = m_target_download_mark;
17,170✔
2164
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,170✔
2165

2166
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,170✔
2167
    OutputBuffer& out = m_conn.get_output_buffer();
17,170✔
2168
    session_ident_type session_ident = get_ident();
17,170✔
2169
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,170✔
2170
    m_conn.initiate_write_message(out, this);                      // Throws
17,170✔
2171

2172
    m_last_download_mark_sent = request_ident;
17,170✔
2173

2174
    // Other messages may be waiting to be sent
2175
    enlist_to_send(); // Throws
17,170✔
2176
}
17,170✔
2177

2178

2179
void Session::send_unbind_message()
2180
{
6,290✔
2181
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,290✔
2182
    REALM_ASSERT(m_bind_message_sent);
6,290✔
2183
    REALM_ASSERT(!m_unbind_message_sent);
6,290✔
2184

2185
    logger.debug("Sending: UNBIND"); // Throws
6,290✔
2186

2187
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,290✔
2188
    OutputBuffer& out = m_conn.get_output_buffer();
6,290✔
2189
    session_ident_type session_ident = get_ident();
6,290✔
2190
    protocol.make_unbind_message(out, session_ident); // Throws
6,290✔
2191
    m_conn.initiate_write_message(out, this);         // Throws
6,290✔
2192

2193
    m_unbind_message_sent = true;
6,290✔
2194
}
6,290✔
2195

2196

2197
void Session::send_json_error_message()
2198
{
36✔
2199
    REALM_ASSERT_EX(m_state == Active, m_state);
36✔
2200
    REALM_ASSERT(m_ident_message_sent);
36✔
2201
    REALM_ASSERT(!m_unbind_message_sent);
36✔
2202
    REALM_ASSERT(m_error_to_send);
36✔
2203
    REALM_ASSERT(m_client_error);
36✔
2204

2205
    ClientProtocol& protocol = m_conn.get_client_protocol();
36✔
2206
    OutputBuffer& out = m_conn.get_output_buffer();
36✔
2207
    session_ident_type session_ident = get_ident();
36✔
2208
    auto protocol_error = m_client_error->error_for_server;
36✔
2209

2210
    auto message = util::format("%1", m_client_error->to_status());
36✔
2211
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
36✔
2212
                session_ident); // Throws
36✔
2213

2214
    nlohmann::json error_body_json;
36✔
2215
    error_body_json["message"] = std::move(message);
36✔
2216
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
36✔
2217
                                     error_body_json.dump()); // Throws
36✔
2218
    m_conn.initiate_write_message(out, this);                 // Throws
36✔
2219

2220
    m_error_to_send = false;
36✔
2221
    enlist_to_send(); // Throws
36✔
2222
}
36✔
2223

2224

2225
void Session::send_test_command_message()
2226
{
52✔
2227
    REALM_ASSERT_EX(m_state == Active, m_state);
52✔
2228

2229
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2230
                           [](const PendingTestCommand& command) {
52✔
2231
                               return command.pending;
52✔
2232
                           });
52✔
2233
    REALM_ASSERT(it != m_pending_test_commands.end());
52✔
2234

2235
    ClientProtocol& protocol = m_conn.get_client_protocol();
52✔
2236
    OutputBuffer& out = m_conn.get_output_buffer();
52✔
2237
    auto session_ident = get_ident();
52✔
2238

2239
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
52✔
2240
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
52✔
2241

2242
    m_conn.initiate_write_message(out, this); // Throws;
52✔
2243
    it->pending = false;
52✔
2244

2245
    enlist_to_send();
52✔
2246
}
52✔
2247

2248
bool Session::client_reset_if_needed()
2249
{
376✔
2250
    // Even if we end up not actually performing a client reset, consume the
2251
    // config to ensure that the resources it holds are released
2252
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
376✔
2253
    if (!client_reset_config) {
376✔
2254
        return false;
×
2255
    }
×
2256

2257
    // Save a copy of the status and action in case an error/exception occurs
2258
    Status cr_status = client_reset_config->error;
376✔
2259
    ProtocolErrorInfo::Action cr_action = client_reset_config->action;
376✔
2260

2261
    auto on_flx_version_complete = [this](int64_t version) {
376✔
2262
        this->on_flx_sync_version_complete(version);
300✔
2263
    };
300✔
2264
    try {
376✔
2265
        // The file ident from the fresh realm will be copied over to the local realm
2266
        bool did_reset = client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config),
376✔
2267
                                                            get_flx_subscription_store(), on_flx_version_complete);
376✔
2268

2269
        call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
376✔
2270
        if (!did_reset) {
376✔
2271
            return false;
×
2272
        }
×
2273
    }
376✔
2274
    catch (const std::exception& e) {
376✔
2275
        auto err_msg = util::format("A fatal error occurred during '%1' client reset diff for %2: '%3'", cr_action,
80✔
2276
                                    cr_status, e.what());
80✔
2277
        logger.error(err_msg.c_str());
80✔
2278
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
80✔
2279
        suspend(err_info);
80✔
2280
        return false;
80✔
2281
    }
80✔
2282

2283
    // The fresh Realm has been used to reset the state
2284
    logger.debug("Client reset is completed, path = %1", get_realm_path()); // Throws
296✔
2285

2286
    // Update the version, file ident and progress info after the client reset diff is done
2287
    get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
296✔
2288
    // Print the version/progress information before performing the asserts
2289
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
296✔
2290
                 m_client_file_ident.salt);                                // Throws
296✔
2291
    logger.debug("last_version_available = %1", m_last_version_available); // Throws
296✔
2292
    logger.debug("upload_progress_client_version = %1, upload_progress_server_version = %2",
296✔
2293
                 m_progress.upload.client_version,
296✔
2294
                 m_progress.upload.last_integrated_server_version); // Throws
296✔
2295
    logger.debug("download_progress_client_version = %1, download_progress_server_version = %2",
296✔
2296
                 m_progress.download.last_integrated_client_version,
296✔
2297
                 m_progress.download.server_version); // Throws
296✔
2298

2299
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
296✔
2300
                    m_progress.download.last_integrated_client_version);
296✔
2301
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
296✔
2302

2303
    m_upload_progress = m_progress.upload;
296✔
2304
    m_download_progress = m_progress.download;
296✔
2305
    init_progress_handler();
296✔
2306
    // In recovery mode, there may be new changesets to upload and nothing left to download.
2307
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
2308
    // For both, we want to allow uploads again without needing external changes to download first.
2309
    m_delay_uploads = false;
296✔
2310

2311
    // Checks if there is a pending client reset
2312
    handle_pending_client_reset_acknowledgement();
296✔
2313

2314
    update_subscription_version_info();
296✔
2315

2316
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2317
    if (auto migration_store = get_migration_store()) {
296✔
2318
        migration_store->complete_migration_or_rollback();
268✔
2319
    }
268✔
2320

2321
    return true;
296✔
2322
}
376✔
2323

2324
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2325
{
3,420✔
2326
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,420✔
2327
                 client_file_ident.salt); // Throws
3,420✔
2328

2329
    // Ignore the message if the deactivation process has been initiated,
2330
    // because in that case, the associated Realm and SessionWrapper must
2331
    // not be accessed any longer.
2332
    if (m_state != Active)
3,420✔
2333
        return Status::OK(); // Success
42✔
2334

2335
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,378✔
2336
                               !m_unbound_message_received);
3,378✔
2337
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,378✔
2338
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2339
    }
×
2340
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,378✔
2341
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2342
    }
×
2343
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,378✔
2344
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2345
    }
×
2346

2347
    m_client_file_ident = client_file_ident;
3,378✔
2348

2349
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,378✔
2350
        // Ready to send the IDENT message
2351
        ensure_enlisted_to_send(); // Throws
×
2352
        return Status::OK();       // Success
×
2353
    }
×
2354

2355
    get_history().set_client_file_ident(client_file_ident,
3,378✔
2356
                                        m_fix_up_object_ids); // Throws
3,378✔
2357
    m_progress.download.last_integrated_client_version = 0;
3,378✔
2358
    m_progress.upload.client_version = 0;
3,378✔
2359

2360
    // Ready to send the IDENT message
2361
    ensure_enlisted_to_send(); // Throws
3,378✔
2362
    return Status::OK();       // Success
3,378✔
2363
}
3,378✔
2364

2365
Status Session::receive_download_message(const DownloadMessage& message)
2366
{
47,668✔
2367
    // Ignore the message if the deactivation process has been initiated,
2368
    // because in that case, the associated Realm and SessionWrapper must
2369
    // not be accessed any longer.
2370
    if (m_state != Active)
47,668✔
2371
        return Status::OK();
616✔
2372

2373
    bool is_flx = m_conn.is_flx_sync_connection();
47,052✔
2374
    int64_t query_version = is_flx ? *message.query_version : 0;
47,052✔
2375

2376
    if (!is_flx || query_version > 0)
47,052✔
2377
        enable_progress_notifications();
45,376✔
2378

2379
    // If this is a PBS connection, then every download message is its own complete batch.
2380
    bool last_in_batch = is_flx ? *message.last_in_batch : true;
47,052✔
2381
    auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
47,052✔
2382
    if (is_steady_state_download_message(batch_state, query_version))
47,052✔
2383
        batch_state = DownloadBatchState::SteadyState;
44,712✔
2384

2385
    auto&& progress = message.progress;
47,052✔
2386
    if (is_flx) {
47,052✔
2387
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
3,724✔
2388
                     "latest_server_version=%3, latest_server_version_salt=%4, "
3,724✔
2389
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
3,724✔
2390
                     "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
3,724✔
2391
                     progress.download.server_version, progress.download.last_integrated_client_version,
3,724✔
2392
                     progress.latest_server_version.version, progress.latest_server_version.salt,
3,724✔
2393
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
3,724✔
2394
                     message.downloadable.as_estimate(), last_in_batch, query_version,
3,724✔
2395
                     message.changesets.size()); // Throws
3,724✔
2396
    }
3,724✔
2397
    else {
43,328✔
2398
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
43,328✔
2399
                     "latest_server_version=%3, latest_server_version_salt=%4, "
43,328✔
2400
                     "upload_client_version=%5, upload_server_version=%6, "
43,328✔
2401
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
43,328✔
2402
                     progress.download.server_version, progress.download.last_integrated_client_version,
43,328✔
2403
                     progress.latest_server_version.version, progress.latest_server_version.salt,
43,328✔
2404
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
43,328✔
2405
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
43,328✔
2406
    }
43,328✔
2407

2408
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
2409
    // changeset over and over again.
2410
    if (m_client_error) {
47,052✔
2411
        logger.debug("Ignoring download message because the client detected an integration error");
×
2412
        return Status::OK();
×
2413
    }
×
2414

2415
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
47,052✔
2416
    if (REALM_UNLIKELY(!legal_at_this_time)) {
47,052✔
2417
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
2✔
2418
    }
2✔
2419
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
47,050✔
2420
        logger.error("Bad sync progress received (%1)", status);
×
2421
        return status;
×
2422
    }
×
2423

2424
    version_type server_version = m_progress.download.server_version;
47,050✔
2425
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
47,050✔
2426
    for (const RemoteChangeset& changeset : message.changesets) {
48,666✔
2427
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
2428
        // version must be increasing, but can stay the same during bootstraps.
2429
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
46,072✔
2430
                                                         : (changeset.remote_version > server_version);
46,072✔
2431
        // Each server version cannot be greater than the one in the header of the download message.
2432
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
46,072✔
2433
        if (!good_server_version) {
46,072✔
2434
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2435
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2436
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2437
        }
×
2438
        server_version = changeset.remote_version;
46,072✔
2439
        // Check that per-changeset last integrated client version is "weakly"
2440
        // increasing.
2441
        bool good_client_version =
46,072✔
2442
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
46,072✔
2443
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
46,072✔
2444
        if (!good_client_version) {
46,072✔
2445
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2446
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2447
                                 "(%1, %2, %3)",
×
2448
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2449
                                 progress.download.last_integrated_client_version)};
×
2450
        }
×
2451
        last_integrated_client_version = changeset.last_integrated_local_version;
46,072✔
2452
        // Server shouldn't send our own changes, and zero is not a valid client
2453
        // file identifier.
2454
        bool good_file_ident =
46,072✔
2455
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
46,072✔
2456
        if (!good_file_ident) {
46,072✔
2457
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2458
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2459
                                 changeset.origin_file_ident)};
×
2460
        }
×
2461
    }
46,072✔
2462

2463
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
47,050✔
2464
                                       batch_state, message.changesets.size());
47,050✔
2465
    if (hook_action == SyncClientHookAction::EarlyReturn) {
47,050✔
2466
        return Status::OK();
16✔
2467
    }
16✔
2468
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
47,034✔
2469

2470
    if (process_flx_bootstrap_message(message)) {
47,034✔
2471
        clear_resumption_delay_state();
2,328✔
2472
        return Status::OK();
2,328✔
2473
    }
2,328✔
2474

2475
    initiate_integrate_changesets(message.downloadable.as_bytes(), batch_state, progress,
44,706✔
2476
                                  message.changesets); // Throws
44,706✔
2477

2478
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
44,706✔
2479
                                  batch_state, message.changesets.size());
44,706✔
2480
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,706✔
2481
        return Status::OK();
×
2482
    }
×
2483
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,706✔
2484

2485
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
2486
    // after a retryable session error.
2487
    clear_resumption_delay_state();
44,706✔
2488
    return Status::OK();
44,706✔
2489
}
44,706✔
2490

2491
Status Session::receive_mark_message(request_ident_type request_ident)
2492
{
16,416✔
2493
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
16,416✔
2494

2495
    // Ignore the message if the deactivation process has been initiated,
2496
    // because in that case, the associated Realm and SessionWrapper must
2497
    // not be accessed any longer.
2498
    if (m_state != Active)
16,416✔
2499
        return Status::OK(); // Success
62✔
2500

2501
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
16,354✔
2502
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,354✔
2503
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
10✔
2504
    }
10✔
2505
    bool good_request_ident =
16,344✔
2506
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
16,344✔
2507
    if (REALM_UNLIKELY(!good_request_ident)) {
16,344✔
2508
        return {
×
2509
            ErrorCodes::SyncProtocolInvariantFailed,
×
2510
            util::format(
×
2511
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2512
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2513
    }
×
2514

2515
    m_server_version_at_last_download_mark = m_progress.download.server_version;
16,344✔
2516
    m_last_download_mark_received = request_ident;
16,344✔
2517
    check_for_download_completion(); // Throws
16,344✔
2518

2519
    return Status::OK(); // Success
16,344✔
2520
}
16,344✔
2521

2522

2523
// The caller (Connection) must discard the session if the session has become
2524
// deactivated upon return.
2525
Status Session::receive_unbound_message()
2526
{
3,654✔
2527
    logger.debug("Received: UNBOUND");
3,654✔
2528

2529
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,654✔
2530
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,654✔
2531
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2532
    }
×
2533

2534
    // The fact that the UNBIND message has been sent, but an ERROR message has
2535
    // not been received, implies that the deactivation process must have been
2536
    // initiated, so this session must be in the Deactivating state or the session
2537
    // has been suspended because of a client side error.
2538
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,654!
2539

2540
    m_unbound_message_received = true;
3,654✔
2541

2542
    // Detect completion of the unbinding process
2543
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,654✔
2544
        // The deactivation process completes when the unbinding process
2545
        // completes.
2546
        complete_deactivation(); // Throws
3,654✔
2547
        // Life cycle state is now Deactivated
2548
    }
3,654✔
2549

2550
    return Status::OK(); // Success
3,654✔
2551
}
3,654✔
2552

2553

2554
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2555
{
20✔
2556
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
20✔
2557
    // Ignore the message if the deactivation process has been initiated,
2558
    // because in that case, the associated Realm and SessionWrapper must
2559
    // not be accessed any longer.
2560
    if (m_state == Active) {
20✔
2561
        on_flx_sync_error(query_version, message); // throws
20✔
2562
    }
20✔
2563
    return Status::OK();
20✔
2564
}
20✔
2565

2566
// The caller (Connection) must discard the session if the session has become
2567
// deactivated upon return.
2568
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2569
{
710✔
2570
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
710✔
2571
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
710✔
2572

2573
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
710✔
2574
    if (REALM_UNLIKELY(!legal_at_this_time)) {
710✔
2575
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2576
    }
×
2577

2578
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
710✔
2579
    auto status = protocol_error_to_status(protocol_error, info.message);
710✔
2580
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
710✔
2581
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2582
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2583
                             info.raw_error_code)};
×
2584
    }
×
2585

2586
    // Can't process debug hook actions once the Session is undergoing deactivation, since
2587
    // the SessionWrapper may not be available
2588
    if (m_state == Active) {
710✔
2589
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
710✔
2590
        if (debug_action == SyncClientHookAction::EarlyReturn) {
710✔
2591
            return Status::OK();
8✔
2592
        }
8✔
2593
    }
710✔
2594

2595
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
2596
    // containing the compensating write has appeared in a download message.
2597
    if (status == ErrorCodes::SyncCompensatingWrite) {
702✔
2598
        // If the client is not active, the compensating writes will not be processed now, but will be
2599
        // sent again the next time the client connects
2600
        if (m_state == Active) {
44✔
2601
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2602
            m_pending_compensating_write_errors.push_back(info);
44✔
2603
        }
44✔
2604
        return Status::OK();
44✔
2605
    }
44✔
2606

2607
    if (protocol_error == ProtocolError::schema_version_changed) {
658✔
2608
        // Enable upload immediately if the session is still active.
2609
        if (m_state == Active) {
68✔
2610
            auto wt = get_db()->start_write();
68✔
2611
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
68✔
2612
            wt->commit();
68✔
2613
            // Notify SyncSession a schema migration is required.
2614
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
68✔
2615
        }
68✔
2616
        // Keep the session active to upload any unsynced changes.
2617
        return Status::OK();
68✔
2618
    }
68✔
2619

2620
    m_error_message_received = true;
590✔
2621
    suspend(SessionErrorInfo{info, std::move(status)});
590✔
2622
    return Status::OK();
590✔
2623
}
658✔
2624

2625
void Session::suspend(const SessionErrorInfo& info)
2626
{
670✔
2627
    REALM_ASSERT(!m_suspended);
670✔
2628
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
670✔
2629
    logger.debug("Suspended"); // Throws
670✔
2630

2631
    m_suspended = true;
670✔
2632

2633
    // Detect completion of the unbinding process
2634
    if (m_unbind_message_send_complete && m_error_message_received) {
670!
2635
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2636
        // we received an ERROR message implies that the deactivation process must
2637
        // have been initiated, so this session must be in the Deactivating state.
2638
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2639

2640
        // The deactivation process completes when the unbinding process
2641
        // completes.
2642
        complete_deactivation(); // Throws
×
2643
        // Life cycle state is now Deactivated
2644
    }
×
2645

2646
    // Notify the application of the suspension of the session if the session is
2647
    // still in the Active state
2648
    if (m_state == Active) {
670✔
2649
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
670✔
2650
        m_conn.one_less_active_unsuspended_session(); // Throws
670✔
2651
        on_suspended(info);                           // Throws
670✔
2652
    }
670✔
2653

2654
    if (!info.is_fatal) {
670✔
2655
        begin_resumption_delay(info);
58✔
2656
    }
58✔
2657

2658
    // Ready to send the UNBIND message, if it has not been sent already
2659
    if (!m_unbind_message_sent)
670✔
2660
        ensure_enlisted_to_send(); // Throws
670✔
2661
}
670✔
2662

2663
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2664
{
52✔
2665
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
52✔
2666
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
52✔
2667
                           [&](const PendingTestCommand& command) {
52✔
2668
                               return command.id == ident;
52✔
2669
                           });
52✔
2670
    if (it == m_pending_test_commands.end()) {
52✔
2671
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2672
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2673
    }
×
2674

2675
    it->promise.emplace_value(std::string{body});
52✔
2676
    m_pending_test_commands.erase(it);
52✔
2677

2678
    return Status::OK();
52✔
2679
}
52✔
2680

2681
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2682
{
58✔
2683
    REALM_ASSERT(!m_try_again_activation_timer);
58✔
2684

2685
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
58✔
2686
                                  error_info.resumption_delay_interval);
58✔
2687
    auto try_again_interval = m_try_again_delay_info.delay_interval();
58✔
2688
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
58✔
2689
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
2690
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2691
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2692
        try_again_interval = std::chrono::milliseconds{1000};
28✔
2693
    }
28✔
2694
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
58✔
2695
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
58✔
2696
        if (status == ErrorCodes::OperationAborted)
58✔
2697
            return;
16✔
2698
        else if (!status.is_ok())
42✔
2699
            throw Exception(status);
×
2700

2701
        m_try_again_activation_timer.reset();
42✔
2702
        cancel_resumption_delay();
42✔
2703
    });
42✔
2704
}
58✔
2705

2706
void Session::clear_resumption_delay_state()
2707
{
47,034✔
2708
    if (m_try_again_activation_timer) {
47,034✔
2709
        logger.debug("Clearing resumption delay state after successful download");
×
2710
        m_try_again_delay_info.reset();
×
2711
    }
×
2712
}
47,034✔
2713

2714
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2715
{
47,050✔
2716
    const SyncProgress& a = m_progress;
47,050✔
2717
    const SyncProgress& b = progress;
47,050✔
2718
    std::string message;
47,050✔
2719
    if (b.latest_server_version.version < a.latest_server_version.version) {
47,050✔
2720
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2721
                               "session (current: %1, received: %2)",
×
2722
                               a.latest_server_version.version, b.latest_server_version.version);
×
2723
    }
×
2724
    if (b.upload.client_version < a.upload.client_version) {
47,050✔
2725
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2726
                               "throughout a session (current: %1, received: %2)",
×
2727
                               a.upload.client_version, b.upload.client_version);
×
2728
    }
×
2729
    if (b.upload.client_version > m_last_version_available) {
47,050✔
2730
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2731
                               "version in existence (current: %1, received: %2)",
×
2732
                               m_last_version_available, b.upload.client_version);
×
2733
    }
×
2734
    if (b.download.server_version < a.download.server_version) {
47,050✔
2735
        message =
×
2736
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2737
                         a.download.server_version, b.download.server_version);
×
2738
    }
×
2739
    if (b.download.server_version > b.latest_server_version.version) {
47,050✔
2740
        message = util::format(
×
2741
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2742
            b.download.server_version, b.latest_server_version.version);
×
2743
    }
×
2744
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
47,050✔
2745
        message = util::format(
×
2746
            "Last integrated client version on the server at the position in the server's history of the download "
×
2747
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2748
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2749
    }
×
2750
    if (b.download.last_integrated_client_version > b.upload.client_version) {
47,050✔
2751
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2752
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2753
                               "on the server (download: %1, upload: %2)",
×
2754
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2755
    }
×
2756
    if (b.download.server_version < b.upload.last_integrated_server_version) {
47,050✔
2757
        message = util::format(
×
2758
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2759
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2760
            b.download.server_version, b.upload.last_integrated_server_version);
×
2761
    }
×
2762

2763
    if (message.empty()) {
47,050✔
2764
        return Status::OK();
47,050✔
2765
    }
47,050✔
2766
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
×
2767
}
47,050✔
2768

2769

2770
void Session::check_for_download_completion()
2771
{
63,024✔
2772
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
63,024✔
2773
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
63,024✔
2774
    if (m_last_download_mark_received == m_last_triggering_download_mark)
63,024✔
2775
        return;
46,464✔
2776
    if (m_last_download_mark_received < m_target_download_mark)
16,560✔
2777
        return;
306✔
2778
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
16,254✔
2779
        return;
×
2780
    m_last_triggering_download_mark = m_target_download_mark;
16,254✔
2781
    if (REALM_UNLIKELY(m_delay_uploads)) {
16,254✔
2782
        // Activate the upload process now, and enable immediate reactivation
2783
        // after a subsequent fast reconnect.
2784
        m_delay_uploads = false;
4,564✔
2785
        ensure_enlisted_to_send(); // Throws
4,564✔
2786
    }
4,564✔
2787
    on_download_completion(); // Throws
16,254✔
2788
}
16,254✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc