• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1771

20 Oct 2023 08:58AM UTC coverage: 91.567% (-0.009%) from 91.576%
1771

push

Evergreen

web-flow
Fix blocked DB::open on multiprocess access on exFAT filesystem (#6959)

Fix double file lock and DB::open being blocked with multiple concurrent realm access on fat32/exfat file systems.

When file is truncated on fat32/exfat its uid is available for other files. With multiple processes opening and truncating the same set of files could lead to the situation when within one process get_unique_id will return the same value for different files in timing is right. This breaks proper initialization of static data for interprocess mutexes, so that subsequent locks will hang by trying to lock essentially the same file twice.

94304 of 173552 branches covered (0.0%)

59 of 82 new or added lines in 5 files covered. (71.95%)

53 existing lines in 13 files now uncovered.

230544 of 251776 relevant lines covered (91.57%)

6594884.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.62
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <system_error>
2
#include <sstream>
3

4
#include <realm/util/assert.hpp>
5
#include <realm/util/safe_int_ops.hpp>
6
#include <realm/util/random.hpp>
7
#include <realm/util/memory_stream.hpp>
8
#include <realm/util/basic_system_errors.hpp>
9
#include <realm/util/scope_exit.hpp>
10
#include <realm/util/to_string.hpp>
11
#include <realm/util/uri.hpp>
12
#include <realm/util/platform_info.hpp>
13
#include <realm/sync/impl/clock.hpp>
14
#include <realm/impl/simulated_failure.hpp>
15
#include <realm/sync/network/http.hpp>
16
#include <realm/sync/network/websocket.hpp>
17
#include <realm/sync/noinst/client_history_impl.hpp>
18
#include <realm/sync/noinst/client_impl_base.hpp>
19
#include <realm/sync/noinst/compact_changesets.hpp>
20
#include <realm/sync/protocol.hpp>
21
#include <realm/version.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,740✔
48
    m_backoff_state.reset();
1,740✔
49
    scheduled_reset = false;
1,740✔
50
}
1,740✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,304✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,304✔
57
}
3,304✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
5,328✔
62
    if (scheduled_reset) {
5,328✔
63
        reset();
4✔
64
    }
4✔
65

2,724✔
66
    if (!m_backoff_state.triggering_error) {
5,328✔
67
        return std::chrono::milliseconds::zero();
4,102✔
68
    }
4,102✔
69

654✔
70
    switch (*m_backoff_state.triggering_error) {
1,226✔
71
        case ConnectionTerminationReason::closed_voluntarily:
80✔
72
            return std::chrono::milliseconds::zero();
80✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,126✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,126✔
77
                return std::chrono::milliseconds::max();
900✔
78
            }
900✔
79

112✔
80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
226✔
81
            return m_backoff_state.delay_interval();
226✔
82
    }
1,226✔
83
}
1,226✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
3,334✔
89
    util::Uri uri(url); // Throws
3,334✔
90
    uri.canonicalize(); // Throws
3,334✔
91
    std::string userinfo, address_2, port_2;
3,334✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,334✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,334✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,334✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,334✔
96
    if (REALM_UNLIKELY(!good))
3,334✔
97
        return false;
1,492✔
98
    ProtocolEnvelope protocol_2;
3,334✔
99
    port_type port_3;
3,334✔
100
    if (realm_scheme) {
3,334✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
3,334✔
111
        REALM_ASSERT(ws_scheme);
3,334✔
112
        if (uri.get_scheme() == "ws:") {
3,334✔
113
            protocol_2 = ProtocolEnvelope::ws;
3,330✔
114
            port_3 = 80;
3,330✔
115
        }
3,330✔
116
        else {
4✔
117
            protocol_2 = ProtocolEnvelope::wss;
4✔
118
            port_3 = 443;
4✔
119
        }
4✔
120
    }
3,334✔
121
    if (!port_2.empty()) {
3,334✔
122
        std::istringstream in(port_2);    // Throws
3,334✔
123
        in.imbue(std::locale::classic()); // Throws
3,334✔
124
        in >> port_3;
3,334✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,334✔
126
            return false;
1,492✔
127
    }
3,334✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
3,334✔
129

1,492✔
130
    protocol = protocol_2;
3,334✔
131
    address = std::move(address_2);
3,334✔
132
    port = port_3;
3,334✔
133
    path = std::move(path_2);
3,334✔
134
    return true;
3,334✔
135
}
3,334✔
136

137

138
ClientImpl::ClientImpl(ClientConfig config)
139
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
140
    , logger{*logger_ptr}
141
    , m_reconnect_mode{config.reconnect_mode}
142
    , m_connect_timeout{config.connect_timeout}
143
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
144
    , m_ping_keepalive_period{config.ping_keepalive_period}
145
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
146
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
147
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
148
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
149
    , m_dry_run{config.dry_run}
150
    , m_enable_default_port_hack{config.enable_default_port_hack}
151
    , m_disable_upload_compaction{config.disable_upload_compaction}
152
    , m_fix_up_object_ids{config.fix_up_object_ids}
153
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
154
    , m_socket_provider{std::move(config.socket_provider)}
155
    , m_client_protocol{} // Throws
156
    , m_one_connection_per_session{config.one_connection_per_session}
157
    , m_random{}
158
{
8,974✔
159
    // FIXME: Would be better if seeding was up to the application.
4,418✔
160
    util::seed_prng_nondeterministically(m_random); // Throws
8,974✔
161

4,418✔
162
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,974✔
163
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,974✔
164
                 get_current_protocol_version()); // Throws
8,974✔
165
    logger.info("Platform: %1", util::get_platform_info());
8,974✔
166
    const char* build_mode;
8,974✔
167
#if REALM_DEBUG
8,974✔
168
    build_mode = "Debug";
8,974✔
169
#else
170
    build_mode = "Release";
171
#endif
172
    logger.debug("Build mode: %1", build_mode);
8,974✔
173
    logger.debug("Config param: one_connection_per_session = %1",
8,974✔
174
                 config.one_connection_per_session); // Throws
8,974✔
175
    logger.debug("Config param: connect_timeout = %1 ms",
8,974✔
176
                 config.connect_timeout); // Throws
8,974✔
177
    logger.debug("Config param: connection_linger_time = %1 ms",
8,974✔
178
                 config.connection_linger_time); // Throws
8,974✔
179
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,974✔
180
                 config.ping_keepalive_period); // Throws
8,974✔
181
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,974✔
182
                 config.pong_keepalive_timeout); // Throws
8,974✔
183
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,974✔
184
                 config.fast_reconnect_limit); // Throws
8,974✔
185
    logger.debug("Config param: disable_upload_compaction = %1",
8,974✔
186
                 config.disable_upload_compaction); // Throws
8,974✔
187
    logger.debug("Config param: disable_sync_to_disk = %1",
8,974✔
188
                 config.disable_sync_to_disk); // Throws
8,974✔
189
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,974✔
190
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,974✔
191
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,974✔
192
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
8,974✔
193

4,418✔
194
    if (config.reconnect_mode != ReconnectMode::normal) {
8,974✔
195
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
772✔
196
                    "never do this in production!");
772✔
197
    }
772✔
198

4,418✔
199
    if (config.dry_run) {
8,974✔
200
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
201
                    "never do this in production!");
×
202
    }
×
203

4,418✔
204
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
8,974✔
205

4,418✔
206
    if (m_one_connection_per_session) {
8,974✔
207
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
208
        // multiplexing.
2✔
209
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
210
        //            "never do this in production");
2✔
211
    }
4✔
212

4,418✔
213
    if (config.disable_upload_activation_delay) {
8,974✔
214
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
215
                    "never do this in production");
×
216
    }
×
217

4,418✔
218
    if (config.disable_sync_to_disk) {
8,974✔
219
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
220
                    "never do this in production");
×
221
    }
×
222

4,418✔
223
    m_actualize_and_finalize = create_trigger([this](Status status) {
13,974✔
224
        if (status == ErrorCodes::OperationAborted)
13,974✔
225
            return;
×
226
        else if (!status.is_ok())
13,974✔
227
            throw Exception(status);
×
228
        actualize_and_finalize_session_wrappers(); // Throws
13,974✔
229
    });
13,974✔
230
}
8,974✔
231

232

233
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
234
{
160,366✔
235
    REALM_ASSERT(m_socket_provider);
160,366✔
236
    {
160,366✔
237
        std::lock_guard lock(m_drain_mutex);
160,366✔
238
        ++m_outstanding_posts;
160,366✔
239
        m_drained = false;
160,366✔
240
    }
160,366✔
241
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
160,376✔
242
        auto decr_guard = util::make_scope_exit([&]() noexcept {
160,382✔
243
            std::lock_guard lock(m_drain_mutex);
160,382✔
244
            REALM_ASSERT(m_outstanding_posts);
160,382✔
245
            --m_outstanding_posts;
160,382✔
246
            m_drain_cv.notify_all();
160,382✔
247
        });
160,382✔
248
        handler(status);
160,376✔
249
    });
160,376✔
250
}
160,366✔
251

252

253
void ClientImpl::drain_connections()
254
{
8,972✔
255
    logger.debug("Draining connections during sync client shutdown");
8,972✔
256
    for (auto& server_slot_pair : m_server_slots) {
5,656✔
257
        auto& server_slot = server_slot_pair.second;
2,342✔
258

1,104✔
259
        if (server_slot.connection) {
2,342✔
260
            auto& conn = server_slot.connection;
2,244✔
261
            conn->force_close();
2,244✔
262
        }
2,244✔
263
        else {
98✔
264
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
UNCOV
265
                conn_pair.second->force_close();
×
UNCOV
266
            }
×
267
        }
98✔
268
    }
2,342✔
269
}
8,972✔
270

271

272
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
273
                                                       SyncSocketProvider::FunctionHandler&& handler)
274
{
16,078✔
275
    REALM_ASSERT(m_socket_provider);
16,078✔
276
    {
16,078✔
277
        std::lock_guard lock(m_drain_mutex);
16,078✔
278
        ++m_outstanding_posts;
16,078✔
279
        m_drained = false;
16,078✔
280
    }
16,078✔
281
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,090✔
282
        handler(status);
16,090✔
283

7,836✔
284
        std::lock_guard lock(m_drain_mutex);
16,090✔
285
        REALM_ASSERT(m_outstanding_posts);
16,090✔
286
        --m_outstanding_posts;
16,090✔
287
        m_drain_cv.notify_all();
16,090✔
288
    });
16,090✔
289
}
16,078✔
290

291

292
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
293
{
11,432✔
294
    REALM_ASSERT(m_socket_provider);
11,432✔
295
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,432✔
296
}
11,432✔
297

298
Connection::~Connection()
299
{
2,458✔
300
    if (m_websocket_sentinel) {
2,458✔
301
        m_websocket_sentinel->destroyed = true;
×
302
        m_websocket_sentinel.reset();
×
303
    }
×
304
}
2,458✔
305

306
void Connection::activate()
307
{
2,456✔
308
    REALM_ASSERT(m_on_idle);
2,456✔
309
    m_activated = true;
2,456✔
310
    if (m_num_active_sessions == 0)
2,456✔
311
        m_on_idle->trigger();
×
312
    // We cannot in general connect immediately, because a prior failure to
1,160✔
313
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,160✔
314
    initiate_reconnect_wait(); // Throws
2,456✔
315
}
2,456✔
316

317

318
void Connection::activate_session(std::unique_ptr<Session> sess)
319
{
9,620✔
320
    REALM_ASSERT(sess);
9,620✔
321
    REALM_ASSERT(&sess->m_conn == this);
9,620✔
322
    REALM_ASSERT(!m_force_closed);
9,620✔
323
    Session& sess_2 = *sess;
9,620✔
324
    session_ident_type ident = sess->m_ident;
9,620✔
325
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,620✔
326
    bool was_inserted = p.second;
9,620✔
327
    REALM_ASSERT(was_inserted);
9,620✔
328
    // Save the session ident to the historical list of session idents
4,636✔
329
    m_session_history.insert(ident);
9,620✔
330
    sess_2.activate(); // Throws
9,620✔
331
    if (m_state == ConnectionState::connected) {
9,620✔
332
        bool fast_reconnect = false;
6,660✔
333
        sess_2.connection_established(fast_reconnect); // Throws
6,660✔
334
    }
6,660✔
335
    ++m_num_active_sessions;
9,620✔
336
}
9,620✔
337

338

339
void Connection::initiate_session_deactivation(Session* sess)
340
{
9,622✔
341
    REALM_ASSERT(sess);
9,622✔
342
    REALM_ASSERT(&sess->m_conn == this);
9,622✔
343
    REALM_ASSERT(m_num_active_sessions);
9,622✔
344
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,636✔
345
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,636✔
346
    // decrementing the num active sessions value.
4,636✔
347
    sess->initiate_deactivation(); // Throws
9,622✔
348
    if (sess->m_state == Session::Deactivated) {
9,622✔
349
        finish_session_deactivation(sess);
1,038✔
350
    }
1,038✔
351
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,622✔
352
        if (m_activated && m_state == ConnectionState::disconnected)
3,968✔
353
            m_on_idle->trigger();
332✔
354
    }
3,968✔
355
}
9,622✔
356

357

358
void Connection::cancel_reconnect_delay()
359
{
1,944✔
360
    REALM_ASSERT(m_activated);
1,944✔
361

1,058✔
362
    if (m_reconnect_delay_in_progress) {
1,944✔
363
        if (m_nonzero_reconnect_delay)
1,732✔
364
            logger.detail("Canceling reconnect delay"); // Throws
870✔
365

952✔
366
        // Cancel the in-progress wait operation by destroying the timer
952✔
367
        // object. Destruction is needed in this case, because a new wait
952✔
368
        // operation might have to be initiated before the previous one
952✔
369
        // completes (its completion handler starts to execute), so the new wait
952✔
370
        // operation must be done on a new timer object.
952✔
371
        m_reconnect_disconnect_timer.reset();
1,732✔
372
        m_reconnect_delay_in_progress = false;
1,732✔
373
        m_reconnect_info.reset();
1,732✔
374
        initiate_reconnect_wait(); // Throws
1,732✔
375
        return;
1,732✔
376
    }
1,732✔
377

106✔
378
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
379
    // that we are allowed to re-connect as quickly as possible.
106✔
380
    //
106✔
381
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
106✔
382
    // backoff/delay state before calculating the next delay, unless a PONG message is received
106✔
383
    // for the urgent PING message we send below.
106✔
384
    //
106✔
385
    // If we get a PONG message for the urgent PING message sent below, then the connection is
106✔
386
    // healthy and we can calculate the next delay normally.
106✔
387
    if (m_state != ConnectionState::disconnected) {
212✔
388
        m_reconnect_info.scheduled_reset = true;
212✔
389
        m_ping_after_scheduled_reset_of_reconnect_info = false;
212✔
390

106✔
391
        schedule_urgent_ping(); // Throws
212✔
392
        return;
212✔
393
    }
212✔
394
    // Nothing to do in this case. The next reconnect attemp will be made as
106✔
395
    // soon as there are any sessions that are both active and unsuspended.
106✔
396
}
212✔
397

398
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
399
{
7,768✔
400
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,768✔
401
    auto ident = sess->m_ident;
7,768✔
402
    m_sessions.erase(ident);
7,768✔
403
    m_session_history.erase(ident);
7,768✔
404
}
7,768✔
405

406
void Connection::force_close()
407
{
2,244✔
408
    if (m_force_closed) {
2,244✔
409
        return;
×
410
    }
×
411

1,056✔
412
    m_force_closed = true;
2,244✔
413

1,056✔
414
    if (m_state != ConnectionState::disconnected) {
2,244✔
415
        voluntary_disconnect();
2,162✔
416
    }
2,162✔
417

1,056✔
418
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,244✔
419
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,244✔
420
        m_reconnect_disconnect_timer.reset();
82✔
421
        m_reconnect_delay_in_progress = false;
82✔
422
        m_disconnect_delay_in_progress = false;
82✔
423
    }
82✔
424

1,056✔
425
    // We must copy any session pointers we want to close to a vector because force_closing
1,056✔
426
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,056✔
427
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,056✔
428
    std::vector<Session*> to_close;
2,244✔
429
    for (auto& session_pair : m_sessions) {
1,130✔
430
        if (session_pair.second->m_state == Session::State::Active) {
152✔
431
            to_close.push_back(session_pair.second.get());
152✔
432
        }
152✔
433
    }
152✔
434

1,056✔
435
    for (auto& sess : to_close) {
1,130✔
436
        sess->force_close();
152✔
437
    }
152✔
438

1,056✔
439
    logger.debug("Force closed idle connection");
2,244✔
440
}
2,244✔
441

442

443
void Connection::websocket_connected_handler(const std::string& protocol)
444
{
3,188✔
445
    if (!protocol.empty()) {
3,188✔
446
        std::string_view expected_prefix =
3,188✔
447
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,922✔
448
        // FIXME: Use std::string_view::begins_with() in C++20.
1,568✔
449
        auto prefix_matches = [&](std::string_view other) {
3,188✔
450
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,188✔
451
        };
3,188✔
452
        if (prefix_matches(expected_prefix)) {
3,188✔
453
            util::MemoryInputStream in;
3,188✔
454
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,188✔
455
            in.imbue(std::locale::classic());
3,188✔
456
            in.unsetf(std::ios_base::skipws);
3,188✔
457
            int value_2 = 0;
3,188✔
458
            in >> value_2;
3,188✔
459
            if (in && in.eof() && value_2 >= 0) {
3,188✔
460
                bool good_version =
3,188✔
461
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,188✔
462
                if (good_version) {
3,188✔
463
                    logger.detail("Negotiated protocol version: %1", value_2);
3,188✔
464
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,568✔
465
                    // will provide the appservices connection ID via a log message.
1,568✔
466
                    // TODO: Remove once the server starts sending the connection ID
1,568✔
467
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,188✔
468
                    m_negotiated_protocol_version = value_2;
3,188✔
469
                    handle_connection_established(); // Throws
3,188✔
470
                    return;
3,188✔
471
                }
3,188✔
UNCOV
472
            }
×
473
        }
3,188✔
UNCOV
474
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
UNCOV
475
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
UNCOV
476
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
UNCOV
477
    }
×
478
    else {
×
479
        close_due_to_client_side_error(
×
480
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
481
            ConnectionTerminationReason::bad_headers_in_http_response);
×
482
    }
×
483
}
3,188✔
484

485

486
bool Connection::websocket_binary_message_received(util::Span<const char> data)
487
{
75,092✔
488
    if (m_force_closed) {
75,092✔
489
        logger.debug("Received binary message after connection was force closed");
×
490
        return false;
×
491
    }
×
492

38,778✔
493
    using sf = SimulatedFailure;
75,092✔
494
    if (sf::check_trigger(sf::sync_client__read_head)) {
75,092✔
495
        close_due_to_client_side_error(
450✔
496
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
450✔
497
            ConnectionTerminationReason::read_or_write_error);
450✔
498
        return bool(m_websocket);
450✔
499
    }
450✔
500

38,494✔
501
    handle_message_received(data);
74,642✔
502
    return bool(m_websocket);
74,642✔
503
}
74,642✔
504

505

506
void Connection::websocket_error_handler()
507
{
508✔
508
    m_websocket_error_received = true;
508✔
509
}
508✔
510

511
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
512
{
636✔
513
    if (m_force_closed) {
636✔
514
        logger.debug("Received websocket close message after connection was force closed");
×
515
        return false;
×
516
    }
×
517
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
636✔
518

302✔
519
    switch (error_code) {
636✔
520
        case WebSocketError::websocket_ok:
76✔
521
            break;
76✔
522
        case WebSocketError::websocket_resolve_failed:
4✔
523
            [[fallthrough]];
4✔
524
        case WebSocketError::websocket_connection_failed: {
4✔
525
            SessionErrorInfo error_info(
4✔
526
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
4✔
527
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
4✔
528
            break;
4✔
529
        }
4✔
530
        case WebSocketError::websocket_read_error:
494✔
531
            [[fallthrough]];
494✔
532
        case WebSocketError::websocket_write_error: {
494✔
533
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
494✔
534
                                         ConnectionTerminationReason::read_or_write_error);
494✔
535
            break;
494✔
536
        }
494✔
537
        case WebSocketError::websocket_going_away:
230✔
538
            [[fallthrough]];
×
539
        case WebSocketError::websocket_protocol_error:
✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_unsupported_data:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_invalid_payload_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_policy_violation:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_reserved:
✔
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_no_status_received:
✔
550
            [[fallthrough]];
×
551
        case WebSocketError::websocket_invalid_extension: {
✔
552
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
553
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
554
            break;
×
555
        }
×
556
        case WebSocketError::websocket_message_too_big: {
4✔
557
            auto message = util::format(
4✔
558
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
559
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
560
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
561
            involuntary_disconnect(std::move(error_info),
4✔
562
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
563
            break;
4✔
564
        }
×
565
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
566
            close_due_to_client_side_error(
10✔
567
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
568
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
569
            break;
10✔
570
        }
×
571
        case WebSocketError::websocket_client_too_old:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_client_too_new:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_protocol_mismatch: {
✔
576
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
577
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
578
            break;
×
579
        }
×
580
        case WebSocketError::websocket_fatal_error: {
✔
581
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
582
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
583
            break;
×
584
        }
×
585
        case WebSocketError::websocket_forbidden: {
✔
586
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
587
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
588
            involuntary_disconnect(std::move(error_info),
×
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
590
            break;
×
591
        }
×
592
        case WebSocketError::websocket_unauthorized: {
36✔
593
            SessionErrorInfo error_info(
36✔
594
                {ErrorCodes::AuthError,
36✔
595
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
596
                IsFatal{false});
36✔
597
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
36✔
598
            involuntary_disconnect(std::move(error_info),
36✔
599
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
600
            break;
36✔
601
        }
×
602
        case WebSocketError::websocket_moved_permanently: {
12✔
603
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
605
            involuntary_disconnect(std::move(error_info),
12✔
606
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
607
            break;
12✔
608
        }
×
609
        case WebSocketError::websocket_abnormal_closure: {
✔
610
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
612
            involuntary_disconnect(std::move(error_info),
×
613
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_internal_server_error:
✔
617
            [[fallthrough]];
×
618
        case WebSocketError::websocket_retry_error: {
✔
619
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
621
            break;
×
622
        }
636✔
623
    }
636✔
624

302✔
625
    return bool(m_websocket);
636✔
626
}
636✔
627

628
// Guarantees that handle_reconnect_wait() is never called from within the
629
// execution of initiate_reconnect_wait() (no callback reentrance).
630
void Connection::initiate_reconnect_wait()
631
{
7,490✔
632
    REALM_ASSERT(m_activated);
7,490✔
633
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,490✔
634
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,490✔
635

3,738✔
636
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,738✔
637
    if (m_force_closed) {
7,490✔
638
        return;
2,164✔
639
    }
2,164✔
640

2,724✔
641
    m_reconnect_delay_in_progress = true;
5,326✔
642
    auto delay = m_reconnect_info.delay_interval();
5,326✔
643
    if (delay == std::chrono::milliseconds::max()) {
5,326✔
644
        logger.detail("Reconnection delayed indefinitely"); // Throws
918✔
645
        // Not actually starting a timer corresponds to an infinite wait
502✔
646
        m_nonzero_reconnect_delay = true;
918✔
647
        return;
918✔
648
    }
918✔
649

2,222✔
650
    if (delay == std::chrono::milliseconds::zero()) {
4,408✔
651
        m_nonzero_reconnect_delay = false;
4,186✔
652
    }
4,186✔
653
    else {
222✔
654
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
222✔
655
        m_nonzero_reconnect_delay = true;
222✔
656
    }
222✔
657

2,222✔
658
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,222✔
659
    // we need it to be cancelable in case the connection is terminated before the timer
2,222✔
660
    // callback is run.
2,222✔
661
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,412✔
662
        // If the operation is aborted, the connection object may have been
2,224✔
663
        // destroyed.
2,224✔
664
        if (status != ErrorCodes::OperationAborted)
4,412✔
665
            handle_reconnect_wait(status); // Throws
3,300✔
666
    });                                    // Throws
4,412✔
667
}
4,408✔
668

669

670
void Connection::handle_reconnect_wait(Status status)
671
{
3,302✔
672
    if (!status.is_ok()) {
3,302✔
673
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
674
        throw Exception(status);
×
675
    }
×
676

1,624✔
677
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,302✔
678
    m_reconnect_delay_in_progress = false;
3,302✔
679

1,624✔
680
    if (m_num_active_unsuspended_sessions > 0)
3,302✔
681
        initiate_reconnect(); // Throws
3,300✔
682
}
3,302✔
683

684
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
685
    explicit WebSocketObserverShim(Connection* conn)
686
        : conn(conn)
687
        , sentinel(conn->m_websocket_sentinel)
688
    {
3,304✔
689
    }
3,304✔
690

691
    Connection* conn;
692
    util::bind_ptr<LifecycleSentinel> sentinel;
693

694
    void websocket_connected_handler(const std::string& protocol) override
695
    {
3,188✔
696
        if (sentinel->destroyed) {
3,188✔
697
            return;
×
698
        }
×
699

1,568✔
700
        return conn->websocket_connected_handler(protocol);
3,188✔
701
    }
3,188✔
702

703
    void websocket_error_handler() override
704
    {
508✔
705
        if (sentinel->destroyed) {
508✔
706
            return;
×
707
        }
×
708

238✔
709
        conn->websocket_error_handler();
508✔
710
    }
508✔
711

712
    bool websocket_binary_message_received(util::Span<const char> data) override
713
    {
75,092✔
714
        if (sentinel->destroyed) {
75,092✔
715
            return false;
×
716
        }
×
717

38,776✔
718
        return conn->websocket_binary_message_received(data);
75,092✔
719
    }
75,092✔
720

721
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
722
    {
636✔
723
        if (sentinel->destroyed) {
636✔
724
            return true;
×
725
        }
×
726

302✔
727
        return conn->websocket_closed_handler(was_clean, error_code, msg);
636✔
728
    }
636✔
729
};
730

731
void Connection::initiate_reconnect()
732
{
3,298✔
733
    REALM_ASSERT(m_activated);
3,298✔
734

1,622✔
735
    m_state = ConnectionState::connecting;
3,298✔
736
    report_connection_state_change(ConnectionState::connecting); // Throws
3,298✔
737
    if (m_websocket_sentinel) {
3,298✔
738
        m_websocket_sentinel->destroyed = true;
×
739
    }
×
740
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,298✔
741
    m_websocket.reset();
3,298✔
742

1,622✔
743
    // Watchdog
1,622✔
744
    initiate_connect_wait(); // Throws
3,298✔
745

1,622✔
746
    std::vector<std::string> sec_websocket_protocol;
3,298✔
747
    {
3,298✔
748
        auto protocol_prefix =
3,298✔
749
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,026✔
750
        int min = get_oldest_supported_protocol_version();
3,298✔
751
        int max = get_current_protocol_version();
3,298✔
752
        REALM_ASSERT_3(min, <=, max);
3,298✔
753
        // List protocol version in descending order to ensure that the server
1,622✔
754
        // selects the highest possible version.
1,622✔
755
        for (int version = max; version >= min; --version) {
33,016✔
756
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
29,718✔
757
        }
29,718✔
758
    }
3,298✔
759

1,622✔
760
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,298✔
761
                m_server_endpoint.port, m_http_request_path_prefix);
3,298✔
762

1,622✔
763
    m_websocket_error_received = false;
3,298✔
764
    m_websocket =
3,298✔
765
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,298✔
766
                                            WebSocketEndpoint{
3,298✔
767
                                                m_server_endpoint.address,
3,298✔
768
                                                m_server_endpoint.port,
3,298✔
769
                                                get_http_request_path(),
3,298✔
770
                                                std::move(sec_websocket_protocol),
3,298✔
771
                                                is_ssl(m_server_endpoint.envelope),
3,298✔
772
                                                /// DEPRECATED - The following will be removed in a future release
1,622✔
773
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,298✔
774
                                                m_verify_servers_ssl_certificate,
3,298✔
775
                                                m_ssl_trust_certificate_path,
3,298✔
776
                                                m_ssl_verify_callback,
3,298✔
777
                                                m_proxy_config,
3,298✔
778
                                            });
3,298✔
779
}
3,298✔
780

781

782
void Connection::initiate_connect_wait()
783
{
3,298✔
784
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,624✔
785
    // fully establish the connection (including SSL and WebSocket
1,624✔
786
    // handshakes). Without such a watchdog, connect operations could take very
1,624✔
787
    // long, or even indefinite time.
1,624✔
788
    milliseconds_type time = m_client.m_connect_timeout;
3,298✔
789

1,624✔
790
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,304✔
791
        // If the operation is aborted, the connection object may have been
1,626✔
792
        // destroyed.
1,626✔
793
        if (status != ErrorCodes::OperationAborted)
3,304✔
794
            handle_connect_wait(status); // Throws
×
795
    });                                  // Throws
3,304✔
796
}
3,298✔
797

798

799
void Connection::handle_connect_wait(Status status)
800
{
×
801
    if (!status.is_ok()) {
×
802
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
803
        throw Exception(status);
×
804
    }
×
805

806
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
807
    logger.info("Connect timeout"); // Throws
×
808
    involuntary_disconnect(
×
809
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
810
                         IsFatal{false}},
×
811
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
812
}
×
813

814

815
void Connection::handle_connection_established()
816
{
3,188✔
817
    // Cancel connect timeout watchdog
1,568✔
818
    m_connect_timer.reset();
3,188✔
819

1,568✔
820
    m_state = ConnectionState::connected;
3,188✔
821

1,568✔
822
    milliseconds_type now = monotonic_clock_now();
3,188✔
823
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,188✔
824
    initiate_ping_delay(now);     // Throws
3,188✔
825

1,568✔
826
    bool fast_reconnect = false;
3,188✔
827
    if (m_disconnect_has_occurred) {
3,188✔
828
        milliseconds_type time = now - m_disconnect_time;
926✔
829
        if (time <= m_client.m_fast_reconnect_limit)
926✔
830
            fast_reconnect = true;
926✔
831
    }
926✔
832

1,568✔
833
    for (auto& p : m_sessions) {
4,210✔
834
        Session& sess = *p.second;
4,210✔
835
        sess.connection_established(fast_reconnect); // Throws
4,210✔
836
    }
4,210✔
837

1,568✔
838
    report_connection_state_change(ConnectionState::connected); // Throws
3,188✔
839
}
3,188✔
840

841

842
void Connection::schedule_urgent_ping()
843
{
212✔
844
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
212✔
845
    if (m_ping_delay_in_progress) {
212✔
846
        m_heartbeat_timer.reset();
174✔
847
        m_ping_delay_in_progress = false;
174✔
848
        m_minimize_next_ping_delay = true;
174✔
849
        milliseconds_type now = monotonic_clock_now();
174✔
850
        initiate_ping_delay(now); // Throws
174✔
851
        return;
174✔
852
    }
174✔
853
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
38!
854
    if (!m_send_ping)
38!
855
        m_minimize_next_ping_delay = true;
38✔
856
}
38✔
857

858

859
void Connection::initiate_ping_delay(milliseconds_type now)
860
{
3,530✔
861
    REALM_ASSERT(!m_ping_delay_in_progress);
3,530✔
862
    REALM_ASSERT(!m_waiting_for_pong);
3,530✔
863
    REALM_ASSERT(!m_send_ping);
3,530✔
864

1,690✔
865
    milliseconds_type delay = 0;
3,530✔
866
    if (!m_minimize_next_ping_delay) {
3,530✔
867
        delay = m_client.m_ping_keepalive_period;
3,350✔
868
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,616✔
869
        // the first PING message to be sent since the connection was
1,616✔
870
        // established. The purpose of this randomized deduction is to reduce
1,616✔
871
        // the risk of many connections sending PING messages simultaneously to
1,616✔
872
        // the server.
1,616✔
873
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,236✔
874
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,350✔
875
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,350✔
876
        delay -= randomized_deduction;
3,350✔
877
        // Deduct the time spent waiting for PONG
1,616✔
878
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,350✔
879
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,350✔
880
        if (spent_time < delay) {
3,350✔
881
            delay -= spent_time;
3,342✔
882
        }
3,342✔
883
        else {
8✔
884
            delay = 0;
8✔
885
        }
8✔
886
    }
3,350✔
887
    else {
180✔
888
        m_minimize_next_ping_delay = false;
180✔
889
    }
180✔
890

1,690✔
891

1,690✔
892
    m_ping_delay_in_progress = true;
3,530✔
893

1,690✔
894
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,530✔
895
        if (status == ErrorCodes::OperationAborted)
3,530✔
896
            return;
3,342✔
897
        else if (!status.is_ok())
188✔
898
            throw Exception(status);
×
899

64✔
900
        handle_ping_delay();                                    // Throws
188✔
901
    });                                                         // Throws
188✔
902
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,530✔
903
}
3,530✔
904

905

906
void Connection::handle_ping_delay()
907
{
188✔
908
    REALM_ASSERT(m_ping_delay_in_progress);
188✔
909
    m_ping_delay_in_progress = false;
188✔
910
    m_send_ping = true;
188✔
911

64✔
912
    initiate_pong_timeout(); // Throws
188✔
913

64✔
914
    if (m_state == ConnectionState::connected && !m_sending)
188✔
915
        send_next_message(); // Throws
158✔
916
}
188✔
917

918

919
void Connection::initiate_pong_timeout()
920
{
188✔
921
    REALM_ASSERT(!m_ping_delay_in_progress);
188✔
922
    REALM_ASSERT(!m_waiting_for_pong);
188✔
923
    REALM_ASSERT(m_send_ping);
188✔
924

64✔
925
    m_waiting_for_pong = true;
188✔
926
    m_pong_wait_started_at = monotonic_clock_now();
188✔
927

64✔
928
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
188✔
929
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
188✔
930
        if (status == ErrorCodes::OperationAborted)
188✔
931
            return;
176✔
932
        else if (!status.is_ok())
12✔
933
            throw Exception(status);
×
934

6✔
935
        handle_pong_timeout(); // Throws
12✔
936
    });                        // Throws
12✔
937
}
188✔
938

939

940
void Connection::handle_pong_timeout()
941
{
12✔
942
    REALM_ASSERT(m_waiting_for_pong);
12✔
943
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
944
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
945
                                 ConnectionTerminationReason::pong_timeout);
12✔
946
}
12✔
947

948

949
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
950
{
92,806✔
951
    // Stop sending messages if an websocket error was received.
46,372✔
952
    if (m_websocket_error_received)
92,806✔
953
        return;
×
954

46,372✔
955
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
92,806✔
956
        if (sentinel->destroyed) {
91,270✔
957
            return;
×
958
        }
×
959
        if (status == ErrorCodes::OperationAborted)
91,270✔
960
            return;
×
961
        else if (!status.is_ok())
91,270✔
962
            throw Exception(status);
×
963

45,488✔
964
        handle_write_message(); // Throws
91,270✔
965
    });                         // Throws
91,270✔
966
    m_sending_session = sess;
92,806✔
967
    m_sending = true;
92,806✔
968
}
92,806✔
969

970

971
void Connection::handle_write_message()
972
{
91,268✔
973
    m_sending_session->message_sent(); // Throws
91,268✔
974
    if (m_sending_session->m_state == Session::Deactivated) {
91,268✔
975
        finish_session_deactivation(m_sending_session);
100✔
976
    }
100✔
977
    m_sending_session = nullptr;
91,268✔
978
    m_sending = false;
91,268✔
979
    send_next_message(); // Throws
91,268✔
980
}
91,268✔
981

982

983
void Connection::send_next_message()
984
{
147,980✔
985
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
147,980✔
986
    REALM_ASSERT(!m_sending_session);
147,980✔
987
    REALM_ASSERT(!m_sending);
147,980✔
988
    if (m_send_ping) {
147,980✔
989
        send_ping(); // Throws
176✔
990
        return;
176✔
991
    }
176✔
992
    while (!m_sessions_enlisted_to_send.empty()) {
206,800✔
993
        // The state of being connected is not supposed to be able to change
74,498✔
994
        // across this loop thanks to the "no callback reentrance" guarantee
74,498✔
995
        // provided by Websocket::async_write_text(), and friends.
74,498✔
996
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
152,010✔
997

74,498✔
998
        Session& sess = *m_sessions_enlisted_to_send.front();
152,010✔
999
        m_sessions_enlisted_to_send.pop_front();
152,010✔
1000
        sess.send_message(); // Throws
152,010✔
1001

74,498✔
1002
        if (sess.m_state == Session::Deactivated) {
152,010✔
1003
            finish_session_deactivation(&sess);
2,640✔
1004
        }
2,640✔
1005

74,498✔
1006
        // An enlisted session may choose to not send a message. In that case,
74,498✔
1007
        // we should pass the opportunity to the next enlisted session.
74,498✔
1008
        if (m_sending)
152,010✔
1009
            break;
93,014✔
1010
    }
152,010✔
1011
}
147,804✔
1012

1013

1014
void Connection::send_ping()
1015
{
176✔
1016
    REALM_ASSERT(!m_ping_delay_in_progress);
176✔
1017
    REALM_ASSERT(m_waiting_for_pong);
176✔
1018
    REALM_ASSERT(m_send_ping);
176✔
1019

58✔
1020
    m_send_ping = false;
176✔
1021
    if (m_reconnect_info.scheduled_reset)
176✔
1022
        m_ping_after_scheduled_reset_of_reconnect_info = true;
146✔
1023

58✔
1024
    m_last_ping_sent_at = monotonic_clock_now();
176✔
1025
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
176✔
1026
                 m_previous_ping_rtt); // Throws
176✔
1027

58✔
1028
    ClientProtocol& protocol = get_client_protocol();
176✔
1029
    OutputBuffer& out = get_output_buffer();
176✔
1030
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
176✔
1031
    initiate_write_ping(out);                                          // Throws
176✔
1032
    m_ping_sent = true;
176✔
1033
}
176✔
1034

1035

1036
void Connection::initiate_write_ping(const OutputBuffer& out)
1037
{
176✔
1038
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
176✔
1039
        if (sentinel->destroyed) {
176✔
1040
            return;
×
1041
        }
×
1042
        if (status == ErrorCodes::OperationAborted)
176✔
1043
            return;
×
1044
        else if (!status.is_ok())
176✔
1045
            throw Exception(status);
×
1046

58✔
1047
        handle_write_ping(); // Throws
176✔
1048
    });                      // Throws
176✔
1049
    m_sending = true;
176✔
1050
}
176✔
1051

1052

1053
void Connection::handle_write_ping()
1054
{
176✔
1055
    REALM_ASSERT(m_sending);
176✔
1056
    REALM_ASSERT(!m_sending_session);
176✔
1057
    m_sending = false;
176✔
1058
    send_next_message(); // Throws
176✔
1059
}
176✔
1060

1061

1062
void Connection::handle_message_received(util::Span<const char> data)
1063
{
74,640✔
1064
    // parse_message_received() parses the message and calls the proper handler
38,492✔
1065
    // on the Connection object (this).
38,492✔
1066
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
74,640✔
1067
}
74,640✔
1068

1069

1070
void Connection::initiate_disconnect_wait()
1071
{
4,282✔
1072
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,282✔
1073

2,028✔
1074
    if (m_disconnect_delay_in_progress) {
4,282✔
1075
        m_reconnect_disconnect_timer.reset();
2,054✔
1076
        m_disconnect_delay_in_progress = false;
2,054✔
1077
    }
2,054✔
1078

2,028✔
1079
    milliseconds_type time = m_client.m_connection_linger_time;
4,282✔
1080

2,028✔
1081
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,282✔
1082
        // If the operation is aborted, the connection object may have been
2,028✔
1083
        // destroyed.
2,028✔
1084
        if (status != ErrorCodes::OperationAborted)
4,282✔
1085
            handle_disconnect_wait(status); // Throws
12✔
1086
    });                                     // Throws
4,282✔
1087
    m_disconnect_delay_in_progress = true;
4,282✔
1088
}
4,282✔
1089

1090

1091
void Connection::handle_disconnect_wait(Status status)
1092
{
12✔
1093
    if (!status.is_ok()) {
12✔
1094
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1095
        throw Exception(status);
×
1096
    }
×
1097

6✔
1098
    m_disconnect_delay_in_progress = false;
12✔
1099

6✔
1100
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1101
    if (m_num_active_unsuspended_sessions == 0) {
12✔
1102
        if (m_client.m_connection_linger_time > 0)
12✔
1103
            logger.detail("Linger time expired"); // Throws
×
1104
        voluntary_disconnect();                   // Throws
12✔
1105
        logger.info("Disconnected");              // Throws
12✔
1106
    }
12✔
1107
}
12✔
1108

1109

1110
void Connection::close_due_to_protocol_error(Status status)
1111
{
4✔
1112
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
4✔
1113
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
4✔
1114
    involuntary_disconnect(std::move(error_info),
4✔
1115
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
4✔
1116
}
4✔
1117

1118

1119
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1120
{
460✔
1121
    logger.info("Connection closed due to error: %1", status); // Throws
460✔
1122

290✔
1123
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
460✔
1124
}
460✔
1125

1126

1127
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1128
{
506✔
1129
    logger.info("Connection closed due to transient error: %1", status); // Throws
506✔
1130
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
506✔
1131
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
506✔
1132

236✔
1133
    involuntary_disconnect(std::move(error_info), reason); // Throw
506✔
1134
}
506✔
1135

1136

1137
// Close connection due to error discovered on the server-side, and then
1138
// reported to the client by way of a connection-level ERROR message.
1139
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1140
{
66✔
1141
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1142
                int(error_code)); // Throws
66✔
1143

32✔
1144
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
42✔
1145
                                      : ConnectionTerminationReason::server_said_try_again_later;
56✔
1146
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
66✔
1147
                           reason); // Throws
66✔
1148
}
66✔
1149

1150

1151
void Connection::disconnect(const SessionErrorInfo& info)
1152
{
3,304✔
1153
    // Cancel connect timeout watchdog
1,626✔
1154
    m_connect_timer.reset();
3,304✔
1155

1,626✔
1156
    if (m_state == ConnectionState::connected) {
3,304✔
1157
        m_disconnect_time = monotonic_clock_now();
3,188✔
1158
        m_disconnect_has_occurred = true;
3,188✔
1159

1,568✔
1160
        // Sessions that are in the Deactivating state at this time can be
1,568✔
1161
        // immediately discarded, in part because they are no longer enlisted to
1,568✔
1162
        // send. Such sessions will be taken to the Deactivated state by
1,568✔
1163
        // Session::connection_lost(), and then they will be removed from
1,568✔
1164
        // `m_sessions`.
1,568✔
1165
        auto i = m_sessions.begin(), end = m_sessions.end();
3,188✔
1166
        while (i != end) {
7,010✔
1167
            // Prevent invalidation of the main iterator when erasing elements
2,112✔
1168
            auto j = i++;
3,822✔
1169
            Session& sess = *j->second;
3,822✔
1170
            sess.connection_lost(); // Throws
3,822✔
1171
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,824✔
1172
                m_sessions.erase(j);
1,854✔
1173
        }
3,822✔
1174
    }
3,188✔
1175

1,626✔
1176
    change_state_to_disconnected();
3,304✔
1177

1,626✔
1178
    m_ping_delay_in_progress = false;
3,304✔
1179
    m_waiting_for_pong = false;
3,304✔
1180
    m_send_ping = false;
3,304✔
1181
    m_minimize_next_ping_delay = false;
3,304✔
1182
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,304✔
1183
    m_ping_sent = false;
3,304✔
1184
    m_heartbeat_timer.reset();
3,304✔
1185
    m_previous_ping_rtt = 0;
3,304✔
1186

1,626✔
1187
    m_websocket_sentinel->destroyed = true;
3,304✔
1188
    m_websocket_sentinel.reset();
3,304✔
1189
    m_websocket.reset();
3,304✔
1190
    m_input_body_buffer.reset();
3,304✔
1191
    m_sending_session = nullptr;
3,304✔
1192
    m_sessions_enlisted_to_send.clear();
3,304✔
1193
    m_sending = false;
3,304✔
1194

1,626✔
1195
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,304✔
1196
    initiate_reconnect_wait();                                           // Throws
3,304✔
1197
}
3,304✔
1198

1199
bool Connection::is_flx_sync_connection() const noexcept
1200
{
105,800✔
1201
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
105,800✔
1202
}
105,800✔
1203

1204
void Connection::receive_pong(milliseconds_type timestamp)
1205
{
168✔
1206
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
168✔
1207

54✔
1208
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
168✔
1209
    if (REALM_UNLIKELY(!legal_at_this_time)) {
168✔
1210
        close_due_to_protocol_error(
×
1211
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1212
        return;
×
1213
    }
×
1214

54✔
1215
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
168✔
1216
        close_due_to_protocol_error(
×
1217
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1218
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1219
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1220
        return;
×
1221
    }
×
1222

54✔
1223
    milliseconds_type now = monotonic_clock_now();
168✔
1224
    milliseconds_type round_trip_time = now - timestamp;
168✔
1225
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
168✔
1226
    m_previous_ping_rtt = round_trip_time;
168✔
1227

54✔
1228
    // If this PONG message is a response to a PING mesage that was sent after
54✔
1229
    // the last invocation of cancel_reconnect_delay(), then the connection is
54✔
1230
    // still good, and we do not have to skip the next reconnect delay.
54✔
1231
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
168✔
1232
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
136✔
1233
        m_ping_after_scheduled_reset_of_reconnect_info = false;
136✔
1234
        m_reconnect_info.scheduled_reset = false;
136✔
1235
    }
136✔
1236

54✔
1237
    m_heartbeat_timer.reset();
168✔
1238
    m_waiting_for_pong = false;
168✔
1239

54✔
1240
    initiate_ping_delay(now); // Throws
168✔
1241

54✔
1242
    if (m_client.m_roundtrip_time_handler)
168✔
1243
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1244
}
168✔
1245

1246
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1247
{
68,696✔
1248
    if (session_ident == 0) {
68,696✔
1249
        return nullptr;
×
1250
    }
×
1251

35,542✔
1252
    auto* sess = get_session(session_ident);
68,696✔
1253
    if (REALM_LIKELY(sess)) {
68,700✔
1254
        return sess;
68,698✔
1255
    }
68,698✔
1256
    // Check the history to see if the message received was for a previous session
2✔
1257
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,649✔
1258
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1259
        close_due_to_protocol_error(
×
1260
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1261
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1262
                          session_ident)});
×
1263
    }
×
1264
    else {
2,147,483,649✔
1265
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,649✔
1266
                     session_ident); // Throws
2,147,483,649✔
1267
    }
2,147,483,649✔
1268
    return nullptr;
2,147,483,649✔
1269
}
2,147,483,649✔
1270

1271
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1272
{
952✔
1273
    Session* sess = nullptr;
952✔
1274
    if (session_ident != 0) {
952✔
1275
        sess = find_and_validate_session(session_ident, "ERROR");
882✔
1276
        if (REALM_UNLIKELY(!sess)) {
882✔
1277
            return;
×
1278
        }
×
1279
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
882✔
1280
            close_due_to_protocol_error(std::move(status)); // Throws
×
1281
            return;
×
1282
        }
×
1283

456✔
1284
        if (sess->m_state == Session::Deactivated) {
882✔
UNCOV
1285
            finish_session_deactivation(sess);
×
UNCOV
1286
        }
×
1287
        return;
882✔
1288
    }
882✔
1289

34✔
1290
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
70✔
1291
                info.message, info.raw_error_code, info.is_fatal, session_ident,
70✔
1292
                info.server_requests_action); // Throws
70✔
1293

34✔
1294
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1295
    if (REALM_LIKELY(known_error_code)) {
70✔
1296
        ProtocolError error_code = ProtocolError(info.raw_error_code);
66✔
1297
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
66✔
1298
            close_due_to_server_side_error(error_code, info); // Throws
66✔
1299
            return;
66✔
1300
        }
66✔
1301
        close_due_to_protocol_error(
×
1302
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1303
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1304
                          info.raw_error_code)});
×
1305
    }
×
1306
    else {
4✔
1307
        close_due_to_protocol_error(
4✔
1308
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1309
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1310
    }
4✔
1311
}
70✔
1312

1313

1314
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1315
                                             session_ident_type session_ident)
1316
{
16✔
1317
    if (session_ident == 0) {
16✔
1318
        return close_due_to_protocol_error(
×
1319
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1320
    }
×
1321

8✔
1322
    if (!is_flx_sync_connection()) {
16✔
1323
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1324
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1325
    }
×
1326

8✔
1327
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1328
    if (REALM_UNLIKELY(!sess)) {
16✔
1329
        return;
×
1330
    }
×
1331

8✔
1332
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1333
        close_due_to_protocol_error(std::move(status));
×
1334
    }
×
1335
}
16✔
1336

1337

1338
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1339
{
3,270✔
1340
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,270✔
1341
    if (REALM_UNLIKELY(!sess)) {
3,270✔
1342
        return;
×
1343
    }
×
1344

1,532✔
1345
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,270✔
1346
        close_due_to_protocol_error(std::move(status)); // Throws
×
1347
}
3,270✔
1348

1349
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1350
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1351
                                          DownloadBatchState batch_state,
1352
                                          const ReceivedChangesets& received_changesets)
1353
{
44,838✔
1354
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
44,838✔
1355
    if (REALM_UNLIKELY(!sess)) {
44,838✔
1356
        return;
×
1357
    }
×
1358

24,002✔
1359
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
44,838✔
1360
                                                     received_changesets);
44,838✔
1361
        !status.is_ok()) {
44,838✔
1362
        close_due_to_protocol_error(std::move(status));
×
1363
    }
×
1364
}
44,838✔
1365

1366
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1367
{
15,662✔
1368
    Session* sess = find_and_validate_session(session_ident, "MARK");
15,662✔
1369
    if (REALM_UNLIKELY(!sess)) {
15,662✔
1370
        return;
×
1371
    }
×
1372

7,746✔
1373
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
15,662✔
1374
        close_due_to_protocol_error(std::move(status)); // Throws
×
1375
}
15,662✔
1376

1377

1378
void Connection::receive_unbound_message(session_ident_type session_ident)
1379
{
3,990✔
1380
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,990✔
1381
    if (REALM_UNLIKELY(!sess)) {
3,990✔
1382
        return;
×
1383
    }
×
1384

1,778✔
1385
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,990✔
1386
        close_due_to_protocol_error(std::move(status)); // Throws
×
1387
        return;
×
1388
    }
×
1389

1,778✔
1390
    if (sess->m_state == Session::Deactivated) {
3,990✔
1391
        finish_session_deactivation(sess);
3,990✔
1392
    }
3,990✔
1393
}
3,990✔
1394

1395

1396
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1397
                                               std::string_view body)
1398
{
44✔
1399
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
44✔
1400
    if (REALM_UNLIKELY(!sess)) {
44✔
1401
        return;
×
1402
    }
×
1403

22✔
1404
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
44✔
1405
        close_due_to_protocol_error(std::move(status));
×
1406
    }
×
1407
}
44✔
1408

1409

1410
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1411
                                            std::string_view message)
1412
{
5,706✔
1413
    std::string prefix;
5,706✔
1414
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,708✔
1415
        prefix = util::format("Server[%1]", m_appservices_coid);
5,708✔
1416
    }
5,708✔
1417
    else {
2,147,483,647✔
1418
        prefix = "Server";
2,147,483,647✔
1419
    }
2,147,483,647✔
1420

2,866✔
1421
    if (session_ident != 0) {
5,706✔
1422
        if (auto sess = get_session(session_ident)) {
3,834✔
1423
            sess->logger.log(level, "%1 log: %2", prefix, message);
3,834✔
1424
            return;
3,834✔
1425
        }
3,834✔
1426

1427
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1428
        return;
×
1429
    }
×
1430

926✔
1431
    logger.log(level, "%1 log: %2", prefix, message);
1,872✔
1432
}
1,872✔
1433

1434

1435
void Connection::receive_appservices_request_id(std::string_view coid)
1436
{
5,062✔
1437
    // Only set once per connection
2,494✔
1438
    if (!coid.empty() && m_appservices_coid.empty()) {
5,062✔
1439
        m_appservices_coid = coid;
2,244✔
1440
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,244✔
1441
    }
2,244✔
1442
}
5,062✔
1443

1444

1445
void Connection::handle_protocol_error(Status status)
1446
{
×
1447
    close_due_to_protocol_error(std::move(status));
×
1448
}
×
1449

1450

1451
// Sessions are guaranteed to be granted the opportunity to send a message in
1452
// the order that they enlist. Note that this is important to ensure
1453
// nonoverlapping communication with the server for consecutive sessions
1454
// associated with the same Realm file.
1455
//
1456
// CAUTION: The specified session may get destroyed before this function
1457
// returns, but only if its Session::send_message() puts it into the Deactivated
1458
// state.
1459
void Connection::enlist_to_send(Session* sess)
1460
{
153,588✔
1461
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
153,588✔
1462
    m_sessions_enlisted_to_send.push_back(sess); // Throws
153,588✔
1463
    if (!m_sending)
153,588✔
1464
        send_next_message(); // Throws
56,378✔
1465
}
153,588✔
1466

1467

1468
std::string Connection::get_active_appservices_connection_id()
1469
{
72✔
1470
    return m_appservices_coid;
72✔
1471
}
72✔
1472

1473
void Session::cancel_resumption_delay()
1474
{
4,034✔
1475
    REALM_ASSERT_EX(m_state == Active, m_state);
4,034✔
1476

2,206✔
1477
    if (!m_suspended)
4,034✔
1478
        return;
3,656✔
1479

206✔
1480
    m_suspended = false;
378✔
1481

206✔
1482
    logger.debug("Resumed"); // Throws
378✔
1483

206✔
1484
    if (unbind_process_complete())
378✔
1485
        initiate_rebind(); // Throws
372✔
1486

206✔
1487
    m_conn.one_more_active_unsuspended_session(); // Throws
378✔
1488

206✔
1489
    on_resumed(); // Throws
378✔
1490
}
378✔
1491

1492

1493
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1494
                                                 std::vector<ProtocolErrorInfo>* out)
1495
{
21,794✔
1496
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,794✔
1497
        return;
21,756✔
1498
    }
21,756✔
1499

20✔
1500
#ifdef REALM_DEBUG
38✔
1501
    REALM_ASSERT_DEBUG(
38✔
1502
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
38✔
1503
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
38✔
1504
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
38✔
1505
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
38✔
1506
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
38✔
1507
                       }));
38✔
1508
#endif
38✔
1509

20✔
1510
    while (!m_pending_compensating_write_errors.empty() &&
78✔
1511
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
60✔
1512
               changesets.back().version) {
40✔
1513
        auto& cur_error = m_pending_compensating_write_errors.front();
40✔
1514
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
40✔
1515
        out->push_back(std::move(cur_error));
40✔
1516
        m_pending_compensating_write_errors.pop_front();
40✔
1517
    }
40✔
1518
}
38✔
1519

1520

1521
void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& progress,
1522
                                   std::uint_fast64_t downloadable_bytes,
1523
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1524
                                   DownloadBatchState download_batch_state)
1525
{
42,750✔
1526
    auto& history = repl.get_history();
42,750✔
1527
    if (received_changesets.empty()) {
42,750✔
1528
        if (download_batch_state == DownloadBatchState::MoreToCome) {
20,936✔
1529
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1530
                                       "received empty download message that was not the last in batch",
×
1531
                                       ProtocolError::bad_progress);
×
1532
        }
×
1533
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
20,936✔
1534
        return;
20,936✔
1535
    }
20,936✔
1536

11,906✔
1537
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,814✔
1538
    auto transact = get_db()->start_read();
21,814✔
1539
    history.integrate_server_changesets(
21,814✔
1540
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,814✔
1541
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,802✔
1542
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,792✔
1543
        }); // Throws
21,792✔
1544
    if (received_changesets.size() == 1) {
21,814✔
1545
        logger.debug("1 remote changeset integrated, producing client version %1",
15,014✔
1546
                     version_info.sync_version.version); // Throws
15,014✔
1547
    }
15,014✔
1548
    else {
6,800✔
1549
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,800✔
1550
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,800✔
1551
    }
6,800✔
1552

11,906✔
1553
    for (const auto& pending_error : pending_compensating_write_errors) {
11,926✔
1554
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
40✔
1555
                    pending_error.compensating_write_rejected_client_version,
40✔
1556
                    *pending_error.compensating_write_server_version, pending_error.message);
40✔
1557
        try {
40✔
1558
            on_connection_state_changed(
40✔
1559
                m_conn.get_state(),
40✔
1560
                SessionErrorInfo{pending_error,
40✔
1561
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
40✔
1562
                                                          pending_error.message)});
40✔
1563
        }
40✔
1564
        catch (...) {
20✔
1565
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1566
        }
×
1567
    }
40✔
1568
}
21,814✔
1569

1570

1571
void Session::on_integration_failure(const IntegrationException& error)
1572
{
32✔
1573
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1574
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1575
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1576

16✔
1577
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1578
    m_error_to_send = true;
32✔
1579

16✔
1580
    // Surface the error to the user otherwise is lost.
16✔
1581
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1582

16✔
1583
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1584
    // message cannot have been sent unless an ERROR message was received.
16✔
1585
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1586
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1587
        ensure_enlisted_to_send(); // Throws
32✔
1588
    }
32✔
1589
}
32✔
1590

1591
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1592
{
44,122✔
1593
    REALM_ASSERT_EX(m_state == Active, m_state);
44,122✔
1594
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
44,122✔
1595
    m_download_progress = progress.download;
44,122✔
1596
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
44,122✔
1597
    m_progress = progress;
44,122✔
1598
    if (upload_progressed) {
44,122✔
1599
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
31,802✔
1600
            if (progress.upload.client_version > m_upload_progress.client_version)
13,192✔
1601
                m_upload_progress = progress.upload;
846✔
1602
            m_last_version_selected_for_upload = progress.upload.client_version;
13,192✔
1603
        }
13,192✔
1604

16,384✔
1605
        check_for_upload_completion();
31,802✔
1606
    }
31,802✔
1607

23,596✔
1608
    do_recognize_sync_version(client_version); // Allows upload process to resume
44,122✔
1609
    check_for_download_completion();           // Throws
44,122✔
1610

23,596✔
1611
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,596✔
1612
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
44,122✔
1613
        auto& flx_subscription_store = *get_flx_subscription_store();
2,434✔
1614
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,434✔
1615
    }
2,434✔
1616

23,596✔
1617
    // Since the deactivation process has not been initiated, the UNBIND
23,596✔
1618
    // message cannot have been sent unless an ERROR message was received.
23,596✔
1619
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
44,122✔
1620
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
44,122✔
1621
        ensure_enlisted_to_send(); // Throws
44,120✔
1622
    }
44,120✔
1623
}
44,122✔
1624

1625

1626
Session::~Session()
1627
{
9,622✔
1628
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,636✔
1629
}
9,622✔
1630

1631

1632
std::string Session::make_logger_prefix(session_ident_type ident)
1633
{
9,622✔
1634
    std::ostringstream out;
9,622✔
1635
    out.imbue(std::locale::classic());
9,622✔
1636
    out << "Session[" << ident << "]: "; // Throws
9,622✔
1637
    return out.str();                    // Throws
9,622✔
1638
}
9,622✔
1639

1640

1641
void Session::activate()
1642
{
9,620✔
1643
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,620✔
1644

4,636✔
1645
    logger.debug("Activating"); // Throws
9,620✔
1646

4,636✔
1647
    bool has_pending_client_reset = false;
9,620✔
1648
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,620✔
1649
        // The reason we need a mutable reference from get_client_reset_config() is because we
4,636✔
1650
        // don't want the session to keep a strong reference to the client_reset_config->fresh_copy
4,636✔
1651
        // DB. If it did, then the fresh DB would stay alive for the duration of this sync session
4,636✔
1652
        // and we want to clean it up once the reset is finished. Additionally, the fresh copy will
4,636✔
1653
        // be set to a new copy on every reset so there is no reason to keep a reference to it.
4,636✔
1654
        // The modification to the client reset config happens via std::move(client_reset_config->fresh_copy).
4,636✔
1655
        // If the client reset config were a `const &` then this std::move would create another strong
4,636✔
1656
        // reference which we don't want to happen.
4,636✔
1657
        util::Optional<ClientReset>& client_reset_config = get_client_reset_config();
9,618✔
1658

4,636✔
1659
        bool file_exists = util::File::exists(get_realm_path());
9,618✔
1660

4,636✔
1661
        logger.info("client_reset_config = %1, Realm exists = %2, "
9,618✔
1662
                    "client reset = %3",
9,618✔
1663
                    client_reset_config ? "true" : "false", file_exists ? "true" : "false",
9,616✔
1664
                    (client_reset_config && file_exists) ? "true" : "false"); // Throws
9,618✔
1665
        if (client_reset_config && !m_client_reset_operation) {
9,618✔
1666
            m_client_reset_operation = std::make_unique<_impl::ClientResetOperation>(
336✔
1667
                logger, get_db(), std::move(client_reset_config->fresh_copy), client_reset_config->mode,
336✔
1668
                std::move(client_reset_config->notify_before_client_reset),
336✔
1669
                std::move(client_reset_config->notify_after_client_reset),
336✔
1670
                client_reset_config->recovery_is_allowed); // Throws
336✔
1671
        }
336✔
1672

4,636✔
1673
        if (!m_client_reset_operation) {
9,618✔
1674
            const ClientReplication& repl = access_realm(); // Throws
9,286✔
1675
            repl.get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,286✔
1676
                                          &has_pending_client_reset); // Throws
9,286✔
1677
        }
9,286✔
1678
    }
9,618✔
1679
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,620✔
1680
                 m_client_file_ident.salt); // Throws
9,620✔
1681
    m_upload_progress = m_progress.upload;
9,620✔
1682
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,620✔
1683
    m_download_progress = m_progress.download;
9,620✔
1684
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,620✔
1685

4,636✔
1686
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,620✔
1687
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,620✔
1688
    logger.debug("progress_download_client_version = %1",
9,620✔
1689
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,620✔
1690
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,620✔
1691
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,620✔
1692

4,636✔
1693
    reset_protocol_state();
9,620✔
1694
    m_state = Active;
9,620✔
1695

4,636✔
1696
    REALM_ASSERT(!m_suspended);
9,620✔
1697
    m_conn.one_more_active_unsuspended_session(); // Throws
9,620✔
1698

4,636✔
1699
    try {
9,620✔
1700
        process_pending_flx_bootstrap();
9,620✔
1701
    }
9,620✔
1702
    catch (const IntegrationException& error) {
4,638✔
1703
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1704
        m_suspended = true;
4✔
1705
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1706
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
4✔
1707
    }
4✔
1708

4,636✔
1709
    if (has_pending_client_reset) {
9,622✔
1710
        handle_pending_client_reset_acknowledgement();
24✔
1711
    }
24✔
1712
}
9,620✔
1713

1714

1715
// The caller (Connection) must discard the session if the session has become
1716
// deactivated upon return.
1717
void Session::initiate_deactivation()
1718
{
9,622✔
1719
    REALM_ASSERT_EX(m_state == Active, m_state);
9,622✔
1720

4,636✔
1721
    logger.debug("Initiating deactivation"); // Throws
9,622✔
1722

4,636✔
1723
    m_state = Deactivating;
9,622✔
1724

4,636✔
1725
    if (!m_suspended)
9,622✔
1726
        m_conn.one_less_active_unsuspended_session(); // Throws
9,098✔
1727

4,636✔
1728
    if (m_enlisted_to_send) {
9,622✔
1729
        REALM_ASSERT(!unbind_process_complete());
4,822✔
1730
        return;
4,822✔
1731
    }
4,822✔
1732

2,244✔
1733
    // Deactivate immediately if the BIND message has not yet been sent and the
2,244✔
1734
    // session is not enlisted to send, or if the unbinding process has already
2,244✔
1735
    // completed.
2,244✔
1736
    if (!m_bind_message_sent || unbind_process_complete()) {
4,800✔
1737
        complete_deactivation(); // Throws
1,038✔
1738
        // Life cycle state is now Deactivated
476✔
1739
        return;
1,038✔
1740
    }
1,038✔
1741

1,768✔
1742
    // Ready to send the UNBIND message, if it has not already been sent
1,768✔
1743
    if (!m_unbind_message_sent) {
3,762✔
1744
        enlist_to_send(); // Throws
3,594✔
1745
        return;
3,594✔
1746
    }
3,594✔
1747
}
3,762✔
1748

1749

1750
void Session::complete_deactivation()
1751
{
9,620✔
1752
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,620✔
1753
    m_state = Deactivated;
9,620✔
1754

4,636✔
1755
    logger.debug("Deactivation completed"); // Throws
9,620✔
1756
}
9,620✔
1757

1758

1759
// Called by the associated Connection object when this session is granted an
1760
// opportunity to send a message.
1761
//
1762
// The caller (Connection) must discard the session if the session has become
1763
// deactivated upon return.
1764
void Session::send_message()
1765
{
152,014✔
1766
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
152,014✔
1767
    REALM_ASSERT(m_enlisted_to_send);
152,014✔
1768
    m_enlisted_to_send = false;
152,014✔
1769
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
152,014✔
1770
        // Deactivation has been initiated. If the UNBIND message has not been
4,294✔
1771
        // sent yet, there is no point in sending it. Instead, we can let the
4,294✔
1772
        // deactivation process complete.
4,294✔
1773
        if (!m_bind_message_sent) {
8,902✔
1774
            return complete_deactivation(); // Throws
2,640✔
1775
            // Life cycle state is now Deactivated
1,292✔
1776
        }
2,640✔
1777

3,002✔
1778
        // Session life cycle state is Deactivating or the unbinding process has
3,002✔
1779
        // been initiated by a session specific ERROR message
3,002✔
1780
        if (!m_unbind_message_sent)
6,262✔
1781
            send_unbind_message(); // Throws
6,262✔
1782
        return;
6,262✔
1783
    }
6,262✔
1784

70,202✔
1785
    // Session life cycle state is Active and the unbinding process has
70,202✔
1786
    // not been initiated
70,202✔
1787
    REALM_ASSERT(!m_unbind_message_sent);
143,112✔
1788

70,202✔
1789
    if (!m_bind_message_sent)
143,112✔
1790
        return send_bind_message(); // Throws
8,586✔
1791

65,918✔
1792
    if (!m_ident_message_sent) {
134,526✔
1793
        if (have_client_file_ident())
6,608✔
1794
            send_ident_message(); // Throws
6,608✔
1795
        return;
6,608✔
1796
    }
6,608✔
1797

62,606✔
1798
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
127,918✔
1799
                                                      [](const PendingTestCommand& command) {
62,678✔
1800
                                                          return command.pending;
144✔
1801
                                                      });
144✔
1802
    if (has_pending_test_command) {
127,918✔
1803
        return send_test_command_message();
44✔
1804
    }
44✔
1805

62,584✔
1806
    if (m_error_to_send)
127,874✔
1807
        return send_json_error_message(); // Throws
30✔
1808

62,570✔
1809
    // Stop sending upload, mark and query messages when the client detects an error.
62,570✔
1810
    if (m_client_error) {
127,844✔
1811
        return;
16✔
1812
    }
16✔
1813

62,562✔
1814
    if (m_target_download_mark > m_last_download_mark_sent)
127,828✔
1815
        return send_mark_message(); // Throws
16,238✔
1816

54,524✔
1817
    auto is_upload_allowed = [&]() -> bool {
111,596✔
1818
        if (!m_is_flx_sync_session) {
111,592✔
1819
            return true;
100,734✔
1820
        }
100,734✔
1821

5,546✔
1822
        auto migration_store = get_migration_store();
10,858✔
1823
        if (!migration_store) {
10,858✔
1824
            return true;
×
1825
        }
×
1826

5,546✔
1827
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
10,858✔
1828
        if (!sentinel_query_version) {
10,858✔
1829
            return true;
10,830✔
1830
        }
10,830✔
1831

12✔
1832
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
12✔
1833
        return m_last_sent_flx_query_version != *sentinel_query_version;
28✔
1834
    };
28✔
1835

54,524✔
1836
    if (!is_upload_allowed()) {
111,590✔
1837
        return;
16✔
1838
    }
16✔
1839

54,516✔
1840
    auto check_pending_flx_version = [&]() -> bool {
111,580✔
1841
        if (!m_is_flx_sync_session) {
111,578✔
1842
            return false;
100,734✔
1843
        }
100,734✔
1844

5,540✔
1845
        if (!m_allow_upload) {
10,844✔
1846
            return false;
2,184✔
1847
        }
2,184✔
1848

4,446✔
1849
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(
8,660✔
1850
            m_last_sent_flx_query_version, m_upload_progress.client_version);
8,660✔
1851

4,446✔
1852
        if (!m_pending_flx_sub_set) {
8,660✔
1853
            return false;
7,178✔
1854
        }
7,178✔
1855

742✔
1856
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,482✔
1857
    };
1,482✔
1858

54,516✔
1859
    if (check_pending_flx_version()) {
111,574✔
1860
        return send_query_change_message(); // throws
836✔
1861
    }
836✔
1862

54,098✔
1863
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
110,738✔
1864
        return send_upload_message(); // Throws
54,406✔
1865
    }
54,406✔
1866
}
110,738✔
1867

1868

1869
void Session::send_bind_message()
1870
{
8,586✔
1871
    REALM_ASSERT_EX(m_state == Active, m_state);
8,586✔
1872

4,284✔
1873
    session_ident_type session_ident = m_ident;
8,586✔
1874
    bool need_client_file_ident = !have_client_file_ident();
8,586✔
1875
    const bool is_subserver = false;
8,586✔
1876

4,284✔
1877

4,284✔
1878
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,586✔
1879
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,586✔
1880
    OutputBuffer& out = m_conn.get_output_buffer();
8,586✔
1881
    // Discard the token since it's ignored by the server.
4,284✔
1882
    std::string empty_access_token;
8,586✔
1883
    if (m_is_flx_sync_session) {
8,586✔
1884
        nlohmann::json bind_json_data;
1,350✔
1885
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,350✔
1886
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1887
        }
60✔
1888
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,350✔
1889
        if (logger.would_log(util::Logger::Level::debug)) {
1,350✔
1890
            std::string json_data_dump;
1,350✔
1891
            if (!bind_json_data.empty()) {
1,350✔
1892
                json_data_dump = bind_json_data.dump();
1,350✔
1893
            }
1,350✔
1894
            logger.debug(
1,350✔
1895
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,350✔
1896
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,350✔
1897
        }
1,350✔
1898
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,350✔
1899
                                       need_client_file_ident, is_subserver); // Throws
1,350✔
1900
    }
1,350✔
1901
    else {
7,236✔
1902
        std::string server_path = get_virt_path();
7,236✔
1903
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
7,236✔
1904
                     session_ident, need_client_file_ident, is_subserver, server_path);
7,236✔
1905
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
7,236✔
1906
                                       need_client_file_ident, is_subserver); // Throws
7,236✔
1907
    }
7,236✔
1908
    m_conn.initiate_write_message(out, this); // Throws
8,586✔
1909

4,284✔
1910
    m_bind_message_sent = true;
8,586✔
1911

4,284✔
1912
    // Ready to send the IDENT message if the file identifier pair is already
4,284✔
1913
    // available.
4,284✔
1914
    if (!need_client_file_ident)
8,586✔
1915
        enlist_to_send(); // Throws
3,524✔
1916
}
8,586✔
1917

1918

1919
void Session::send_ident_message()
1920
{
6,608✔
1921
    REALM_ASSERT_EX(m_state == Active, m_state);
6,608✔
1922
    REALM_ASSERT(m_bind_message_sent);
6,608✔
1923
    REALM_ASSERT(!m_unbind_message_sent);
6,608✔
1924
    REALM_ASSERT(have_client_file_ident());
6,608✔
1925

3,312✔
1926

3,312✔
1927
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,608✔
1928
    OutputBuffer& out = m_conn.get_output_buffer();
6,608✔
1929
    session_ident_type session_ident = m_ident;
6,608✔
1930

3,312✔
1931
    if (m_is_flx_sync_session) {
6,608✔
1932
        const auto active_query_set = get_flx_subscription_store()->get_active();
962✔
1933
        const auto active_query_body = active_query_set.to_ext_json();
962✔
1934
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
962✔
1935
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
962✔
1936
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
962✔
1937
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
962✔
1938
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
962✔
1939
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
962✔
1940
                     active_query_body); // Throws
962✔
1941
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
962✔
1942
                                        active_query_set.version(), active_query_body); // Throws
962✔
1943
        m_last_sent_flx_query_version = active_query_set.version();
962✔
1944
    }
962✔
1945
    else {
5,646✔
1946
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,646✔
1947
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,646✔
1948
                     "latest_server_version_salt=%6)",
5,646✔
1949
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,646✔
1950
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,646✔
1951
                     m_progress.latest_server_version.salt);                                  // Throws
5,646✔
1952
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,646✔
1953
    }
5,646✔
1954
    m_conn.initiate_write_message(out, this); // Throws
6,608✔
1955

3,312✔
1956
    m_ident_message_sent = true;
6,608✔
1957

3,312✔
1958
    // Other messages may be waiting to be sent
3,312✔
1959
    enlist_to_send(); // Throws
6,608✔
1960
}
6,608✔
1961

1962
void Session::send_query_change_message()
1963
{
836✔
1964
    REALM_ASSERT_EX(m_state == Active, m_state);
836✔
1965
    REALM_ASSERT(m_ident_message_sent);
836✔
1966
    REALM_ASSERT(!m_unbind_message_sent);
836✔
1967
    REALM_ASSERT(m_pending_flx_sub_set);
836✔
1968
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
836✔
1969

418✔
1970
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
836✔
1971
        return;
×
1972
    }
×
1973

418✔
1974
    auto sub_store = get_flx_subscription_store();
836✔
1975
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
836✔
1976
    auto latest_queries = latest_sub_set.to_ext_json();
836✔
1977
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
836✔
1978
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
836✔
1979

418✔
1980
    OutputBuffer& out = m_conn.get_output_buffer();
836✔
1981
    session_ident_type session_ident = get_ident();
836✔
1982
    ClientProtocol& protocol = m_conn.get_client_protocol();
836✔
1983
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
836✔
1984
    m_conn.initiate_write_message(out, this);
836✔
1985

418✔
1986
    m_last_sent_flx_query_version = latest_sub_set.version();
836✔
1987

418✔
1988
    request_download_completion_notification();
836✔
1989
}
836✔
1990

1991
void Session::send_upload_message()
1992
{
54,406✔
1993
    REALM_ASSERT_EX(m_state == Active, m_state);
54,406✔
1994
    REALM_ASSERT(m_ident_message_sent);
54,406✔
1995
    REALM_ASSERT(!m_unbind_message_sent);
54,406✔
1996

27,384✔
1997
    if (REALM_UNLIKELY(get_client().is_dry_run()))
54,406✔
1998
        return;
27,384✔
1999

27,384✔
2000
    version_type target_upload_version = get_db()->get_version_of_latest_snapshot();
54,406✔
2001
    if (m_pending_flx_sub_set) {
54,406✔
2002
        REALM_ASSERT(m_is_flx_sync_session);
646✔
2003
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
646✔
2004
    }
646✔
2005
    if (target_upload_version > m_last_version_available) {
54,406✔
2006
        m_last_version_available = target_upload_version;
394✔
2007
    }
394✔
2008

27,384✔
2009
    const ClientReplication& repl = access_realm(); // Throws
54,406✔
2010

27,384✔
2011
    std::vector<UploadChangeset> uploadable_changesets;
54,406✔
2012
    version_type locked_server_version = 0;
54,406✔
2013
    repl.get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
54,406✔
2014
                                                  locked_server_version); // Throws
54,406✔
2015

27,384✔
2016
    if (uploadable_changesets.empty()) {
54,406✔
2017
        // Nothing more to upload right now
13,586✔
2018
        check_for_upload_completion(); // Throws
27,762✔
2019
        // If we need to limit upload up to some version other than the last client version available and there are no
13,586✔
2020
        // changes to upload, then there is no need to send an empty message.
13,586✔
2021
        if (m_pending_flx_sub_set) {
27,762✔
2022
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
208✔
2023
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
208✔
2024
            // Other messages may be waiting to be sent
104✔
2025
            return enlist_to_send(); // Throws
208✔
2026
        }
208✔
2027
    }
26,644✔
2028
    else {
26,644✔
2029
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
26,644✔
2030
    }
26,644✔
2031

27,384✔
2032
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
54,302✔
2033
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
438✔
2034
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
438✔
2035
    }
438✔
2036

27,280✔
2037
    version_type progress_client_version = m_upload_progress.client_version;
54,198✔
2038
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
54,198✔
2039

27,280✔
2040
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
54,198✔
2041
                 "locked_server_version=%3, num_changesets=%4)",
54,198✔
2042
                 progress_client_version, progress_server_version, locked_server_version,
54,198✔
2043
                 uploadable_changesets.size()); // Throws
54,198✔
2044

27,280✔
2045
    ClientProtocol& protocol = m_conn.get_client_protocol();
54,198✔
2046
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
54,198✔
2047

27,280✔
2048
    for (const UploadChangeset& uc : uploadable_changesets) {
48,992✔
2049
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
41,948✔
2050
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
41,948✔
2051
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
41,948✔
2052
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
41,948✔
2053
        if (logger.would_log(util::Logger::Level::trace)) {
41,948✔
2054
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2055
            if (changeset_data.size() < 1024) {
×
2056
                logger.trace("Changeset: %1",
×
2057
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2058
            }
×
2059
            else {
×
2060
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2061
                             protocol.compressed_hex_dump(changeset_data));
×
2062
            }
×
2063

2064
#if REALM_DEBUG
×
2065
            ChunkedBinaryInputStream in{changeset_data};
×
2066
            Changeset log;
×
2067
            try {
×
2068
                parse_changeset(in, log);
×
2069
                std::stringstream ss;
×
2070
                log.print(ss);
×
2071
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2072
            }
×
2073
            catch (const BadChangesetError& err) {
×
2074
                logger.error("Unable to parse changeset: %1", err.what());
×
2075
            }
×
2076
#endif
×
2077
        }
×
2078

20,236✔
2079
#if 0 // Upload log compaction is currently not implemented
2080
        if (!get_client().m_disable_upload_compaction) {
2081
            ChangesetEncoder::Buffer encode_buffer;
2082

2083
            {
2084
                // Upload compaction only takes place within single changesets to
2085
                // avoid another client seeing inconsistent snapshots.
2086
                ChunkedBinaryInputStream stream{uc.changeset};
2087
                Changeset changeset;
2088
                parse_changeset(stream, changeset); // Throws
2089
                // FIXME: What is the point of setting these? How can compaction care about them?
2090
                changeset.version = uc.progress.client_version;
2091
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2092
                changeset.origin_timestamp = uc.origin_timestamp;
2093
                changeset.origin_file_ident = uc.origin_file_ident;
2094

2095
                compact_changesets(&changeset, 1);
2096
                encode_changeset(changeset, encode_buffer);
2097

2098
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2099
                             encode_buffer.size()); // Throws
2100
            }
2101

2102
            upload_message_builder.add_changeset(
2103
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2104
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2105
        }
2106
        else
2107
#endif
2108
        {
41,948✔
2109
            upload_message_builder.add_changeset(uc.progress.client_version,
41,948✔
2110
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
41,948✔
2111
                                                 uc.origin_file_ident,
41,948✔
2112
                                                 uc.changeset); // Throws
41,948✔
2113
        }
41,948✔
2114
    }
41,948✔
2115

27,280✔
2116
    int protocol_version = m_conn.get_negotiated_protocol_version();
54,198✔
2117
    OutputBuffer& out = m_conn.get_output_buffer();
54,198✔
2118
    session_ident_type session_ident = get_ident();
54,198✔
2119
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
54,198✔
2120
                                               progress_server_version,
54,198✔
2121
                                               locked_server_version); // Throws
54,198✔
2122
    m_conn.initiate_write_message(out, this);                          // Throws
54,198✔
2123

27,280✔
2124
    // Other messages may be waiting to be sent
27,280✔
2125
    enlist_to_send(); // Throws
54,198✔
2126
}
54,198✔
2127

2128

2129
void Session::send_mark_message()
2130
{
16,238✔
2131
    REALM_ASSERT_EX(m_state == Active, m_state);
16,238✔
2132
    REALM_ASSERT(m_ident_message_sent);
16,238✔
2133
    REALM_ASSERT(!m_unbind_message_sent);
16,238✔
2134
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
16,238✔
2135

8,038✔
2136
    request_ident_type request_ident = m_target_download_mark;
16,238✔
2137
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
16,238✔
2138

8,038✔
2139
    ClientProtocol& protocol = m_conn.get_client_protocol();
16,238✔
2140
    OutputBuffer& out = m_conn.get_output_buffer();
16,238✔
2141
    session_ident_type session_ident = get_ident();
16,238✔
2142
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
16,238✔
2143
    m_conn.initiate_write_message(out, this);                      // Throws
16,238✔
2144

8,038✔
2145
    m_last_download_mark_sent = request_ident;
16,238✔
2146

8,038✔
2147
    // Other messages may be waiting to be sent
8,038✔
2148
    enlist_to_send(); // Throws
16,238✔
2149
}
16,238✔
2150

2151

2152
void Session::send_unbind_message()
2153
{
6,262✔
2154
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,262✔
2155
    REALM_ASSERT(m_bind_message_sent);
6,262✔
2156
    REALM_ASSERT(!m_unbind_message_sent);
6,262✔
2157

3,002✔
2158
    logger.debug("Sending: UNBIND"); // Throws
6,262✔
2159

3,002✔
2160
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,262✔
2161
    OutputBuffer& out = m_conn.get_output_buffer();
6,262✔
2162
    session_ident_type session_ident = get_ident();
6,262✔
2163
    protocol.make_unbind_message(out, session_ident); // Throws
6,262✔
2164
    m_conn.initiate_write_message(out, this);         // Throws
6,262✔
2165

3,002✔
2166
    m_unbind_message_sent = true;
6,262✔
2167
}
6,262✔
2168

2169

2170
void Session::send_json_error_message()
2171
{
30✔
2172
    REALM_ASSERT_EX(m_state == Active, m_state);
30✔
2173
    REALM_ASSERT(m_ident_message_sent);
30✔
2174
    REALM_ASSERT(!m_unbind_message_sent);
30✔
2175
    REALM_ASSERT(m_error_to_send);
30✔
2176
    REALM_ASSERT(m_client_error);
30✔
2177

14✔
2178
    ClientProtocol& protocol = m_conn.get_client_protocol();
30✔
2179
    OutputBuffer& out = m_conn.get_output_buffer();
30✔
2180
    session_ident_type session_ident = get_ident();
30✔
2181
    auto protocol_error = m_client_error->error_for_server;
30✔
2182

14✔
2183
    auto message = util::format("%1", m_client_error->to_status());
30✔
2184
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
30✔
2185
                session_ident); // Throws
30✔
2186

14✔
2187
    nlohmann::json error_body_json;
30✔
2188
    error_body_json["message"] = std::move(message);
30✔
2189
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
30✔
2190
                                     error_body_json.dump()); // Throws
30✔
2191
    m_conn.initiate_write_message(out, this);                 // Throws
30✔
2192

14✔
2193
    m_error_to_send = false;
30✔
2194
    enlist_to_send(); // Throws
30✔
2195
}
30✔
2196

2197

2198
void Session::send_test_command_message()
2199
{
44✔
2200
    REALM_ASSERT_EX(m_state == Active, m_state);
44✔
2201

22✔
2202
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2203
                           [](const PendingTestCommand& command) {
44✔
2204
                               return command.pending;
44✔
2205
                           });
44✔
2206
    REALM_ASSERT(it != m_pending_test_commands.end());
44✔
2207

22✔
2208
    ClientProtocol& protocol = m_conn.get_client_protocol();
44✔
2209
    OutputBuffer& out = m_conn.get_output_buffer();
44✔
2210
    auto session_ident = get_ident();
44✔
2211

22✔
2212
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
44✔
2213
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
44✔
2214

22✔
2215
    m_conn.initiate_write_message(out, this); // Throws;
44✔
2216
    it->pending = false;
44✔
2217

22✔
2218
    enlist_to_send();
44✔
2219
}
44✔
2220

2221

2222
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2223
{
3,270✔
2224
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,270✔
2225
                 client_file_ident.salt); // Throws
3,270✔
2226

1,532✔
2227
    // Ignore the message if the deactivation process has been initiated,
1,532✔
2228
    // because in that case, the associated Realm and SessionWrapper must
1,532✔
2229
    // not be accessed any longer.
1,532✔
2230
    if (m_state != Active)
3,270✔
2231
        return Status::OK(); // Success
96✔
2232

1,508✔
2233
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,174✔
2234
                               !m_unbound_message_received);
3,174✔
2235
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,174✔
2236
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2237
    }
×
2238
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,174✔
2239
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2240
    }
×
2241
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,174✔
2242
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2243
    }
×
2244

1,508✔
2245
    m_client_file_ident = client_file_ident;
3,174✔
2246

1,508✔
2247
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,174✔
2248
        // Ready to send the IDENT message
2249
        ensure_enlisted_to_send(); // Throws
×
2250
        return Status::OK();       // Success
×
2251
    }
×
2252

1,508✔
2253
    // access before the client reset (if applicable) because
1,508✔
2254
    // the reset can take a while and the sync session might have died
1,508✔
2255
    // by the time the reset finishes.
1,508✔
2256
    ClientReplication& repl = access_realm(); // Throws
3,174✔
2257

1,508✔
2258
    auto client_reset_if_needed = [&]() -> bool {
3,174✔
2259
        if (!m_client_reset_operation) {
3,174✔
2260
            return false;
2,838✔
2261
        }
2,838✔
2262

168✔
2263
        // ClientResetOperation::finalize() will return true only if the operation actually did
168✔
2264
        // a client reset. It may choose not to do a reset if the local Realm does not exist
168✔
2265
        // at this point (in that case there is nothing to reset). But in any case, we must
168✔
2266
        // clean up m_client_reset_operation at this point as sync should be able to continue from
168✔
2267
        // this point forward.
168✔
2268
        auto client_reset_operation = std::move(m_client_reset_operation);
336✔
2269
        util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
220✔
2270
            this->on_flx_sync_version_complete(version);
104✔
2271
        };
104✔
2272
        if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
336✔
2273
                                              std::move(on_flx_subscription_complete))) {
168✔
2274
            return false;
×
2275
        }
×
2276

168✔
2277
        // The fresh Realm has been used to reset the state
168✔
2278
        logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2279

168✔
2280
        SaltedFileIdent client_file_ident;
336✔
2281
        bool has_pending_client_reset = false;
336✔
2282
        repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress,
336✔
2283
                                      &has_pending_client_reset); // Throws
336✔
2284
        REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
336✔
2285
        REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
336✔
2286
        REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2287
                        m_progress.download.last_integrated_client_version);
336✔
2288
        REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
336✔
2289
        REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
336✔
2290
                        m_progress.upload.last_integrated_server_version);
336✔
2291
        logger.trace("last_version_available  = %1", m_last_version_available); // Throws
336✔
2292

168✔
2293
        m_upload_progress = m_progress.upload;
336✔
2294
        m_download_progress = m_progress.download;
336✔
2295
        // In recovery mode, there may be new changesets to upload and nothing left to download.
168✔
2296
        // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
168✔
2297
        // For both, we want to allow uploads again without needing external changes to download first.
168✔
2298
        m_allow_upload = true;
336✔
2299
        REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
336✔
2300

168✔
2301
        if (has_pending_client_reset) {
336✔
2302
            handle_pending_client_reset_acknowledgement();
268✔
2303
        }
268✔
2304

168✔
2305
        // If a migration or rollback is in progress, mark it complete when client reset is completed.
168✔
2306
        if (auto migration_store = get_migration_store()) {
336✔
2307
            migration_store->complete_migration_or_rollback();
240✔
2308
        }
240✔
2309

168✔
2310
        return true;
336✔
2311
    };
336✔
2312
    // if a client reset happens, it will take care of setting the file ident
1,508✔
2313
    // and if not, we do it here
1,508✔
2314
    bool did_client_reset = false;
3,174✔
2315
    try {
3,174✔
2316
        did_client_reset = client_reset_if_needed();
3,174✔
2317
    }
3,174✔
2318
    catch (const std::exception& e) {
1,542✔
2319
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
68✔
2320
        logger.error(err_msg.c_str());
68✔
2321
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
68✔
2322
        suspend(err_info);
68✔
2323
        return Status::OK();
68✔
2324
    }
68✔
2325
    if (!did_client_reset) {
3,106✔
2326
        repl.get_history().set_client_file_ident(client_file_ident,
2,838✔
2327
                                                 m_fix_up_object_ids); // Throws
2,838✔
2328
        m_progress.download.last_integrated_client_version = 0;
2,838✔
2329
        m_progress.upload.client_version = 0;
2,838✔
2330
        m_last_version_selected_for_upload = 0;
2,838✔
2331
    }
2,838✔
2332

1,474✔
2333
    // Ready to send the IDENT message
1,474✔
2334
    ensure_enlisted_to_send(); // Throws
3,106✔
2335
    return Status::OK();       // Success
3,106✔
2336
}
3,106✔
2337

2338
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2339
                                         DownloadBatchState batch_state, int64_t query_version,
2340
                                         const ReceivedChangesets& received_changesets)
2341
{
44,836✔
2342
    REALM_ASSERT_EX(query_version >= 0, query_version);
44,836✔
2343
    // Ignore the message if the deactivation process has been initiated,
24,000✔
2344
    // because in that case, the associated Realm and SessionWrapper must
24,000✔
2345
    // not be accessed any longer.
24,000✔
2346
    if (m_state != Active)
44,836✔
2347
        return Status::OK();
506✔
2348

23,700✔
2349
    if (is_steady_state_download_message(batch_state, query_version)) {
44,330✔
2350
        batch_state = DownloadBatchState::SteadyState;
42,754✔
2351
    }
42,754✔
2352

23,700✔
2353
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
44,330✔
2354
                 "latest_server_version=%3, latest_server_version_salt=%4, "
44,330✔
2355
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
44,330✔
2356
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
44,330✔
2357
                 progress.download.server_version, progress.download.last_integrated_client_version,
44,330✔
2358
                 progress.latest_server_version.version, progress.latest_server_version.salt,
44,330✔
2359
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
44,330✔
2360
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
44,330✔
2361

23,700✔
2362
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,700✔
2363
    // changeset over and over again.
23,700✔
2364
    if (m_client_error) {
44,330✔
2365
        logger.debug("Ignoring download message because the client detected an integration error");
×
2366
        return Status::OK();
×
2367
    }
×
2368

23,700✔
2369
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
44,332✔
2370
    if (REALM_UNLIKELY(!legal_at_this_time)) {
44,330✔
2371
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2372
    }
×
2373
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
44,330✔
2374
        logger.error("Bad sync progress received (%1)", status);
×
2375
        return status;
×
2376
    }
×
2377

23,700✔
2378
    version_type server_version = m_progress.download.server_version;
44,330✔
2379
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
44,330✔
2380
    for (const Transformer::RemoteChangeset& changeset : received_changesets) {
44,538✔
2381
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,398✔
2382
        // version must be increasing, but can stay the same during bootstraps.
22,398✔
2383
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,218✔
2384
                                                         : (changeset.remote_version > server_version);
42,416✔
2385
        // Each server version cannot be greater than the one in the header of the download message.
22,398✔
2386
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,236✔
2387
        if (!good_server_version) {
43,236✔
2388
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2389
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2390
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2391
        }
×
2392
        server_version = changeset.remote_version;
43,236✔
2393
        // Check that per-changeset last integrated client version is "weakly"
22,398✔
2394
        // increasing.
22,398✔
2395
        bool good_client_version =
43,236✔
2396
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,236✔
2397
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,236✔
2398
        if (!good_client_version) {
43,236✔
2399
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2400
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2401
                                 "(%1, %2, %3)",
×
2402
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2403
                                 progress.download.last_integrated_client_version)};
×
2404
        }
×
2405
        last_integrated_client_version = changeset.last_integrated_local_version;
43,236✔
2406
        // Server shouldn't send our own changes, and zero is not a valid client
22,398✔
2407
        // file identifier.
22,398✔
2408
        bool good_file_ident =
43,236✔
2409
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,236✔
2410
        if (!good_file_ident) {
43,236✔
2411
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2412
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2413
                                 changeset.origin_file_ident)};
×
2414
        }
×
2415
    }
43,236✔
2416

23,700✔
2417
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
44,330✔
2418
                                       batch_state, received_changesets.size());
44,330✔
2419
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,330✔
2420
        return Status::OK();
12✔
2421
    }
12✔
2422
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,318✔
2423

23,694✔
2424
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
44,318✔
2425
        clear_resumption_delay_state();
1,566✔
2426
        return Status::OK();
1,566✔
2427
    }
1,566✔
2428

22,910✔
2429
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
42,752✔
2430

22,910✔
2431
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
42,752✔
2432
                                  batch_state, received_changesets.size());
42,752✔
2433
    if (hook_action == SyncClientHookAction::EarlyReturn) {
42,752✔
2434
        return Status::OK();
×
2435
    }
×
2436
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
42,752✔
2437

22,910✔
2438
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,910✔
2439
    // after a retryable session error.
22,910✔
2440
    clear_resumption_delay_state();
42,752✔
2441
    return Status::OK();
42,752✔
2442
}
42,752✔
2443

2444
Status Session::receive_mark_message(request_ident_type request_ident)
2445
{
15,660✔
2446
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
15,660✔
2447

7,744✔
2448
    // Ignore the message if the deactivation process has been initiated,
7,744✔
2449
    // because in that case, the associated Realm and SessionWrapper must
7,744✔
2450
    // not be accessed any longer.
7,744✔
2451
    if (m_state != Active)
15,660✔
2452
        return Status::OK(); // Success
18✔
2453

7,734✔
2454
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
15,642✔
2455
    if (REALM_UNLIKELY(!legal_at_this_time)) {
15,642✔
2456
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
×
2457
    }
×
2458
    bool good_request_ident =
15,642✔
2459
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
15,642✔
2460
    if (REALM_UNLIKELY(!good_request_ident)) {
15,642✔
2461
        return {
×
2462
            ErrorCodes::SyncProtocolInvariantFailed,
×
2463
            util::format(
×
2464
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2465
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2466
    }
×
2467

7,734✔
2468
    m_server_version_at_last_download_mark = m_progress.download.server_version;
15,642✔
2469
    m_last_download_mark_received = request_ident;
15,642✔
2470
    check_for_download_completion(); // Throws
15,642✔
2471

7,734✔
2472
    return Status::OK(); // Success
15,642✔
2473
}
15,642✔
2474

2475

2476
// The caller (Connection) must discard the session if the session has become
2477
// deactivated upon return.
2478
Status Session::receive_unbound_message()
2479
{
3,990✔
2480
    logger.debug("Received: UNBOUND");
3,990✔
2481

1,778✔
2482
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,990✔
2483
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,990✔
2484
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2485
    }
×
2486

1,778✔
2487
    // The fact that the UNBIND message has been sent, but an ERROR message has
1,778✔
2488
    // not been received, implies that the deactivation process must have been
1,778✔
2489
    // initiated, so this session must be in the Deactivating state or the session
1,778✔
2490
    // has been suspended because of a client side error.
1,778✔
2491
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,990!
2492

1,778✔
2493
    m_unbound_message_received = true;
3,990✔
2494

1,778✔
2495
    // Detect completion of the unbinding process
1,778✔
2496
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,990✔
2497
        // The deactivation process completes when the unbinding process
1,778✔
2498
        // completes.
1,778✔
2499
        complete_deactivation(); // Throws
3,990✔
2500
        // Life cycle state is now Deactivated
1,778✔
2501
    }
3,990✔
2502

1,778✔
2503
    return Status::OK(); // Success
3,990✔
2504
}
3,990✔
2505

2506

2507
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2508
{
16✔
2509
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2510
    // Ignore the message if the deactivation process has been initiated,
8✔
2511
    // because in that case, the associated Realm and SessionWrapper must
8✔
2512
    // not be accessed any longer.
8✔
2513
    if (m_state == Active) {
16✔
2514
        on_flx_sync_error(query_version, message); // throws
16✔
2515
    }
16✔
2516
    return Status::OK();
16✔
2517
}
16✔
2518

2519
// The caller (Connection) must discard the session if the session has become
2520
// deactivated upon return.
2521
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2522
{
882✔
2523
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
882✔
2524
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
882✔
2525

456✔
2526
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
882✔
2527
    if (REALM_UNLIKELY(!legal_at_this_time)) {
882✔
2528
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2529
    }
×
2530

456✔
2531
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
882✔
2532
    auto status = protocol_error_to_status(protocol_error, info.message);
882✔
2533
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
882✔
2534
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2535
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2536
                             info.raw_error_code)};
×
2537
    }
×
2538

456✔
2539
    // Can't process debug hook actions once the Session is undergoing deactivation, since
456✔
2540
    // the SessionWrapper may not be available
456✔
2541
    if (m_state == Active) {
882✔
2542
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
878✔
2543
        if (debug_action == SyncClientHookAction::EarlyReturn) {
878✔
2544
            return Status::OK();
8✔
2545
        }
8✔
2546
    }
874✔
2547

452✔
2548
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
452✔
2549
    // containing the compensating write has appeared in a download message.
452✔
2550
    if (status == ErrorCodes::SyncCompensatingWrite) {
874✔
2551
        // If the client is not active, the compensating writes will not be processed now, but will be
20✔
2552
        // sent again the next time the client connects
20✔
2553
        if (m_state == Active) {
40✔
2554
            REALM_ASSERT(info.compensating_write_server_version.has_value());
40✔
2555
            m_pending_compensating_write_errors.push_back(info);
40✔
2556
        }
40✔
2557
        return Status::OK();
40✔
2558
    }
40✔
2559

432✔
2560
    m_error_message_received = true;
834✔
2561
    suspend(SessionErrorInfo{info, std::move(status)});
834✔
2562
    return Status::OK();
834✔
2563
}
834✔
2564

2565
void Session::suspend(const SessionErrorInfo& info)
2566
{
902✔
2567
    REALM_ASSERT(!m_suspended);
902✔
2568
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
902✔
2569
    logger.debug("Suspended"); // Throws
902✔
2570

466✔
2571
    m_suspended = true;
902✔
2572

466✔
2573
    // Detect completion of the unbinding process
466✔
2574
    if (m_unbind_message_send_complete && m_error_message_received) {
902!
2575
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2576
        // we received an ERROR message implies that the deactivation process must
2577
        // have been initiated, so this session must be in the Deactivating state.
UNCOV
2578
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
×
2579

2580
        // The deactivation process completes when the unbinding process
2581
        // completes.
UNCOV
2582
        complete_deactivation(); // Throws
×
2583
        // Life cycle state is now Deactivated
UNCOV
2584
    }
×
2585

466✔
2586
    // Notify the application of the suspension of the session if the session is
466✔
2587
    // still in the Active state
466✔
2588
    if (m_state == Active) {
902✔
2589
        m_conn.one_less_active_unsuspended_session(); // Throws
898✔
2590
        on_suspended(info);                           // Throws
898✔
2591
    }
898✔
2592

466✔
2593
    if (!info.is_fatal) {
902✔
2594
        begin_resumption_delay(info);
374✔
2595
    }
374✔
2596

466✔
2597
    // Ready to send the UNBIND message, if it has not been sent already
466✔
2598
    if (!m_unbind_message_sent)
902✔
2599
        ensure_enlisted_to_send(); // Throws
898✔
2600
}
902✔
2601

2602
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2603
{
44✔
2604
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
44✔
2605
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2606
                           [&](const PendingTestCommand& command) {
44✔
2607
                               return command.id == ident;
44✔
2608
                           });
44✔
2609
    if (it == m_pending_test_commands.end()) {
44✔
2610
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2611
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2612
    }
×
2613

22✔
2614
    it->promise.emplace_value(std::string{body});
44✔
2615
    m_pending_test_commands.erase(it);
44✔
2616

22✔
2617
    return Status::OK();
44✔
2618
}
44✔
2619

2620
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2621
{
374✔
2622
    REALM_ASSERT(!m_try_again_activation_timer);
374✔
2623

204✔
2624
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
374✔
2625
                                  error_info.resumption_delay_interval);
374✔
2626
    auto try_again_interval = m_try_again_delay_info.delay_interval();
374✔
2627
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
374✔
2628
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2629
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2630
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2631
        try_again_interval = std::chrono::milliseconds{1000};
20✔
2632
    }
20✔
2633
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
374✔
2634
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
374✔
2635
        if (status == ErrorCodes::OperationAborted)
374✔
2636
            return;
4✔
2637
        else if (!status.is_ok())
370✔
2638
            throw Exception(status);
×
2639

202✔
2640
        m_try_again_activation_timer.reset();
370✔
2641
        cancel_resumption_delay();
370✔
2642
    });
370✔
2643
}
374✔
2644

2645
void Session::clear_resumption_delay_state()
2646
{
44,320✔
2647
    if (m_try_again_activation_timer) {
44,320✔
2648
        logger.debug("Clearing resumption delay state after successful download");
×
2649
        m_try_again_delay_info.reset();
×
2650
    }
×
2651
}
44,320✔
2652

2653
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2654
{
44,332✔
2655
    const SyncProgress& a = m_progress;
44,332✔
2656
    const SyncProgress& b = progress;
44,332✔
2657
    std::string message;
44,332✔
2658
    if (b.latest_server_version.version < a.latest_server_version.version) {
44,332✔
2659
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2660
                               "session (current: %1, received: %2)",
×
2661
                               a.latest_server_version.version, b.latest_server_version.version);
×
2662
    }
×
2663
    if (b.upload.client_version < a.upload.client_version) {
44,332✔
2664
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2665
                               "throughout a session (current: %1, received: %2)",
×
2666
                               a.upload.client_version, b.upload.client_version);
×
2667
    }
×
2668
    if (b.upload.client_version > m_last_version_available) {
44,332✔
2669
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2670
                               "version in existence (current: %1, received: %2)",
×
2671
                               m_last_version_available, b.upload.client_version);
×
2672
    }
×
2673
    if (b.download.server_version < a.download.server_version) {
44,332✔
2674
        message =
×
2675
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2676
                         a.download.server_version, b.download.server_version);
×
2677
    }
×
2678
    if (b.download.server_version > b.latest_server_version.version) {
44,332✔
2679
        message = util::format(
×
2680
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2681
            b.download.server_version, b.latest_server_version.version);
×
2682
    }
×
2683
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
44,332✔
2684
        message = util::format(
×
2685
            "Last integrated client version on the server at the position in the server's history of the download "
×
2686
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2687
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2688
    }
×
2689
    if (b.download.last_integrated_client_version > b.upload.client_version) {
44,332✔
2690
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2691
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2692
                               "on the server (download: %1, upload: %2)",
×
2693
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2694
    }
×
2695
    if (b.download.server_version < b.upload.last_integrated_server_version) {
44,332✔
2696
        message = util::format(
×
2697
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2698
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2699
            b.download.server_version, b.upload.last_integrated_server_version);
×
2700
    }
×
2701

23,702✔
2702
    if (message.empty()) {
44,332✔
2703
        return Status::OK();
44,330✔
2704
    }
44,330✔
2705
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2706
}
2✔
2707

2708

2709
void Session::check_for_upload_completion()
2710
{
74,822✔
2711
    REALM_ASSERT_EX(m_state == Active, m_state);
74,822✔
2712
    if (!m_upload_completion_notification_requested) {
74,822✔
2713
        return;
44,134✔
2714
    }
44,134✔
2715

15,136✔
2716
    // during an ongoing client reset operation, we never upload anything
15,136✔
2717
    if (m_client_reset_operation)
30,688✔
2718
        return;
236✔
2719

15,018✔
2720
    // Upload process must have reached end of history
15,018✔
2721
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
30,452✔
2722
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
30,452✔
2723
    if (!scan_complete)
30,452✔
2724
        return;
5,068✔
2725

12,458✔
2726
    // All uploaded changesets must have been acknowledged by the server
12,458✔
2727
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,384✔
2728
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,384✔
2729
    if (!all_uploads_accepted)
25,384✔
2730
        return;
10,718✔
2731

7,250✔
2732
    m_upload_completion_notification_requested = false;
14,666✔
2733
    on_upload_completion(); // Throws
14,666✔
2734
}
14,666✔
2735

2736

2737
void Session::check_for_download_completion()
2738
{
59,766✔
2739
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
59,766✔
2740
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
59,766✔
2741
    if (m_last_download_mark_received == m_last_triggering_download_mark)
59,766✔
2742
        return;
43,920✔
2743
    if (m_last_download_mark_received < m_target_download_mark)
15,846✔
2744
        return;
444✔
2745
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
15,402✔
2746
        return;
×
2747
    m_last_triggering_download_mark = m_target_download_mark;
15,402✔
2748
    if (REALM_UNLIKELY(!m_allow_upload)) {
15,402✔
2749
        // Activate the upload process now, and enable immediate reactivation
2,004✔
2750
        // after a subsequent fast reconnect.
2,004✔
2751
        m_allow_upload = true;
4,182✔
2752
        ensure_enlisted_to_send(); // Throws
4,182✔
2753
    }
4,182✔
2754
    on_download_completion(); // Throws
15,402✔
2755
}
15,402✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc