• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_111

27 Oct 2023 10:49AM UTC coverage: 91.582% (+0.01%) from 91.571%
thomas.goyne_111

push

Evergreen

web-flow
Merge pull request #7085 from realm/release/13.23.2

Release/13.23.2

91742 of 168234 branches covered (0.0%)

230135 of 251288 relevant lines covered (91.58%)

6778766.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.9
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <system_error>
2
#include <sstream>
3

4
#include <realm/util/assert.hpp>
5
#include <realm/util/safe_int_ops.hpp>
6
#include <realm/util/random.hpp>
7
#include <realm/util/memory_stream.hpp>
8
#include <realm/util/basic_system_errors.hpp>
9
#include <realm/util/scope_exit.hpp>
10
#include <realm/util/to_string.hpp>
11
#include <realm/util/uri.hpp>
12
#include <realm/util/platform_info.hpp>
13
#include <realm/sync/impl/clock.hpp>
14
#include <realm/impl/simulated_failure.hpp>
15
#include <realm/sync/network/http.hpp>
16
#include <realm/sync/network/websocket.hpp>
17
#include <realm/sync/noinst/client_history_impl.hpp>
18
#include <realm/sync/noinst/client_impl_base.hpp>
19
#include <realm/sync/noinst/compact_changesets.hpp>
20
#include <realm/sync/protocol.hpp>
21
#include <realm/version.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
// NOTE: The protocol specification is in `/doc/protocol.md`
27

28
using namespace realm;
29
using namespace _impl;
30
using namespace realm::util;
31
using namespace realm::sync;
32
using namespace realm::sync::websocket;
33

34
// clang-format off
35
using Connection      = ClientImpl::Connection;
36
using Session         = ClientImpl::Session;
37
using UploadChangeset = ClientHistory::UploadChangeset;
38

39
// These are a work-around for a bug in MSVC. It cannot find in-class types
40
// mentioned in signature of out-of-line member function definitions.
41
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
42
using OutputBuffer                = ClientImpl::OutputBuffer;
43
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
44
// clang-format on
45

46
void ClientImpl::ReconnectInfo::reset() noexcept
47
{
1,648✔
48
    m_backoff_state.reset();
1,648✔
49
    scheduled_reset = false;
1,648✔
50
}
1,648✔
51

52

53
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
54
                                       std::optional<ResumptionDelayInfo> new_delay_info)
55
{
3,254✔
56
    m_backoff_state.update(new_reason, new_delay_info);
3,254✔
57
}
3,254✔
58

59

60
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
61
{
5,184✔
62
    if (scheduled_reset) {
5,184✔
63
        reset();
4✔
64
    }
4✔
65

2,640✔
66
    if (!m_backoff_state.triggering_error) {
5,184✔
67
        return std::chrono::milliseconds::zero();
4,010✔
68
    }
4,010✔
69

622✔
70
    switch (*m_backoff_state.triggering_error) {
1,174✔
71
        case ConnectionTerminationReason::closed_voluntarily:
74✔
72
            return std::chrono::milliseconds::zero();
74✔
73
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
74
            return std::chrono::milliseconds::max();
18✔
75
        default:
1,078✔
76
            if (m_reconnect_mode == ReconnectMode::testing) {
1,078✔
77
                return std::chrono::milliseconds::max();
854✔
78
            }
854✔
79

112✔
80
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
224✔
81
            return m_backoff_state.delay_interval();
224✔
82
    }
1,174✔
83
}
1,174✔
84

85

86
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
87
                                      port_type& port, std::string& path) const
88
{
3,336✔
89
    util::Uri uri(url); // Throws
3,336✔
90
    uri.canonicalize(); // Throws
3,336✔
91
    std::string userinfo, address_2, port_2;
3,336✔
92
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,336✔
93
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,336✔
94
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,336✔
95
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,336✔
96
    if (REALM_UNLIKELY(!good))
3,336✔
97
        return false;
1,494✔
98
    ProtocolEnvelope protocol_2;
3,336✔
99
    port_type port_3;
3,336✔
100
    if (realm_scheme) {
3,336✔
101
        if (uri.get_scheme() == "realm:") {
×
102
            protocol_2 = ProtocolEnvelope::realm;
×
103
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
104
        }
×
105
        else {
×
106
            protocol_2 = ProtocolEnvelope::realms;
×
107
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
108
        }
×
109
    }
×
110
    else {
3,336✔
111
        REALM_ASSERT(ws_scheme);
3,336✔
112
        if (uri.get_scheme() == "ws:") {
3,336✔
113
            protocol_2 = ProtocolEnvelope::ws;
3,332✔
114
            port_3 = 80;
3,332✔
115
        }
3,332✔
116
        else {
4✔
117
            protocol_2 = ProtocolEnvelope::wss;
4✔
118
            port_3 = 443;
4✔
119
        }
4✔
120
    }
3,336✔
121
    if (!port_2.empty()) {
3,336✔
122
        std::istringstream in(port_2);    // Throws
3,336✔
123
        in.imbue(std::locale::classic()); // Throws
3,336✔
124
        in >> port_3;
3,336✔
125
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,336✔
126
            return false;
1,494✔
127
    }
3,336✔
128
    std::string path_2 = uri.get_path(); // Throws (copy)
3,336✔
129

1,494✔
130
    protocol = protocol_2;
3,336✔
131
    address = std::move(address_2);
3,336✔
132
    port = port_3;
3,336✔
133
    path = std::move(path_2);
3,336✔
134
    return true;
3,336✔
135
}
3,336✔
136

137

138
ClientImpl::ClientImpl(ClientConfig config)
139
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
140
    , logger{*logger_ptr}
141
    , m_reconnect_mode{config.reconnect_mode}
142
    , m_connect_timeout{config.connect_timeout}
143
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
144
    , m_ping_keepalive_period{config.ping_keepalive_period}
145
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
146
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
147
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
148
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
149
    , m_dry_run{config.dry_run}
150
    , m_enable_default_port_hack{config.enable_default_port_hack}
151
    , m_disable_upload_compaction{config.disable_upload_compaction}
152
    , m_fix_up_object_ids{config.fix_up_object_ids}
153
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
154
    , m_socket_provider{std::move(config.socket_provider)}
155
    , m_client_protocol{} // Throws
156
    , m_one_connection_per_session{config.one_connection_per_session}
157
    , m_random{}
8,974✔
158
{
4,418✔
159
    // FIXME: Would be better if seeding was up to the application.
8,974✔
160
    util::seed_prng_nondeterministically(m_random); // Throws
4,418✔
161

8,974✔
162
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
8,974✔
163
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
8,974✔
164
                 get_current_protocol_version()); // Throws
8,974✔
165
    logger.info("Platform: %1", util::get_platform_info());
8,974✔
166
    const char* build_mode;
8,974✔
167
#if REALM_DEBUG
8,974✔
168
    build_mode = "Debug";
169
#else
170
    build_mode = "Release";
171
#endif
8,974✔
172
    logger.debug("Build mode: %1", build_mode);
8,974✔
173
    logger.debug("Config param: one_connection_per_session = %1",
8,974✔
174
                 config.one_connection_per_session); // Throws
8,974✔
175
    logger.debug("Config param: connect_timeout = %1 ms",
8,974✔
176
                 config.connect_timeout); // Throws
8,974✔
177
    logger.debug("Config param: connection_linger_time = %1 ms",
8,974✔
178
                 config.connection_linger_time); // Throws
8,974✔
179
    logger.debug("Config param: ping_keepalive_period = %1 ms",
8,974✔
180
                 config.ping_keepalive_period); // Throws
8,974✔
181
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
8,974✔
182
                 config.pong_keepalive_timeout); // Throws
8,974✔
183
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
8,974✔
184
                 config.fast_reconnect_limit); // Throws
8,974✔
185
    logger.debug("Config param: disable_upload_compaction = %1",
8,974✔
186
                 config.disable_upload_compaction); // Throws
8,974✔
187
    logger.debug("Config param: disable_sync_to_disk = %1",
8,974✔
188
                 config.disable_sync_to_disk); // Throws
8,974✔
189
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
8,974✔
190
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
8,974✔
191
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
8,974✔
192
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
4,418✔
193

8,974✔
194
    if (config.reconnect_mode != ReconnectMode::normal) {
772✔
195
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
772✔
196
                    "never do this in production!");
772✔
197
    }
4,418✔
198

8,974✔
199
    if (config.dry_run) {
4,418✔
200
        logger.warn("Testing/debugging feature 'dry run' enabled - "
8,974✔
201
                    "never do this in production!");
2✔
202
    }
2✔
203

2✔
204
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
2✔
205

4✔
206
    if (m_one_connection_per_session) {
4,418✔
207
        // FIXME: Re-enable this warning when the load balancer is able to handle
8,974✔
208
        // multiplexing.
×
209
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
×
210
        //            "never do this in production");
×
211
    }
4,418✔
212

8,974✔
213
    if (config.disable_upload_activation_delay) {
×
214
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
215
                    "never do this in production");
×
216
    }
4,418✔
217

14,198✔
218
    if (config.disable_sync_to_disk) {
14,198✔
219
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
220
                    "never do this in production");
14,198✔
221
    }
×
222

14,198✔
223
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,198✔
224
        if (status == ErrorCodes::OperationAborted)
8,974✔
225
            return;
226
        else if (!status.is_ok())
227
            throw Exception(status);
228
        actualize_and_finalize_session_wrappers(); // Throws
160,776✔
229
    });
160,776✔
230
}
160,776✔
231

160,776✔
232

160,776✔
233
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
160,776✔
234
{
160,776✔
235
    REALM_ASSERT(m_socket_provider);
160,762✔
236
    {
160,770✔
237
        std::lock_guard lock(m_drain_mutex);
160,770✔
238
        ++m_outstanding_posts;
160,770✔
239
        m_drained = false;
160,770✔
240
    }
160,770✔
241
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
160,770✔
242
        auto decr_guard = util::make_scope_exit([&]() noexcept {
160,762✔
243
            std::lock_guard lock(m_drain_mutex);
160,762✔
244
            REALM_ASSERT(m_outstanding_posts);
160,776✔
245
            --m_outstanding_posts;
246
            m_drain_cv.notify_all();
247
        });
248
        handler(status);
8,974✔
249
    });
8,974✔
250
}
5,658✔
251

2,344✔
252

1,104✔
253
void ClientImpl::drain_connections()
2,344✔
254
{
2,248✔
255
    logger.debug("Draining connections during sync client shutdown");
2,248✔
256
    for (auto& server_slot_pair : m_server_slots) {
2,248✔
257
        auto& server_slot = server_slot_pair.second;
96✔
258

48✔
259
        if (server_slot.connection) {
6✔
260
            auto& conn = server_slot.connection;
6✔
261
            conn->force_close();
96✔
262
        }
2,344✔
263
        else {
8,974✔
264
            for (auto& conn_pair : server_slot.alt_connections) {
265
                conn_pair.second->force_close();
266
            }
267
        }
268
    }
16,002✔
269
}
16,002✔
270

16,002✔
271

16,002✔
272
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
16,002✔
273
                                                       SyncSocketProvider::FunctionHandler&& handler)
16,002✔
274
{
16,002✔
275
    REALM_ASSERT(m_socket_provider);
16,000✔
276
    {
16,000✔
277
        std::lock_guard lock(m_drain_mutex);
7,836✔
278
        ++m_outstanding_posts;
16,000✔
279
        m_drained = false;
16,000✔
280
    }
16,000✔
281
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,000✔
282
        handler(status);
16,000✔
283

16,002✔
284
        std::lock_guard lock(m_drain_mutex);
285
        REALM_ASSERT(m_outstanding_posts);
286
        --m_outstanding_posts;
287
        m_drain_cv.notify_all();
11,430✔
288
    });
11,430✔
289
}
11,430✔
290

11,430✔
291

292
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
293
{
2,456✔
294
    REALM_ASSERT(m_socket_provider);
2,456✔
295
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
×
296
}
×
297

×
298
Connection::~Connection()
2,456✔
299
{
300
    if (m_websocket_sentinel) {
301
        m_websocket_sentinel->destroyed = true;
2,458✔
302
        m_websocket_sentinel.reset();
2,458✔
303
    }
2,458✔
304
}
2,458✔
305

×
306
void Connection::activate()
1,160✔
307
{
1,160✔
308
    REALM_ASSERT(m_on_idle);
2,458✔
309
    m_activated = true;
2,458✔
310
    if (m_num_active_sessions == 0)
311
        m_on_idle->trigger();
312
    // We cannot in general connect immediately, because a prior failure to
313
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
9,616✔
314
    initiate_reconnect_wait(); // Throws
9,616✔
315
}
9,616✔
316

9,616✔
317

9,616✔
318
void Connection::activate_session(std::unique_ptr<Session> sess)
9,616✔
319
{
9,616✔
320
    REALM_ASSERT(sess);
9,616✔
321
    REALM_ASSERT(&sess->m_conn == this);
9,616✔
322
    REALM_ASSERT(!m_force_closed);
4,630✔
323
    Session& sess_2 = *sess;
9,616✔
324
    session_ident_type ident = sess->m_ident;
9,616✔
325
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,616✔
326
    bool was_inserted = p.second;
6,716✔
327
    REALM_ASSERT(was_inserted);
6,716✔
328
    // Save the session ident to the historical list of session idents
6,716✔
329
    m_session_history.insert(ident);
9,616✔
330
    sess_2.activate(); // Throws
9,616✔
331
    if (m_state == ConnectionState::connected) {
332
        bool fast_reconnect = false;
333
        sess_2.connection_established(fast_reconnect); // Throws
334
    }
9,614✔
335
    ++m_num_active_sessions;
9,614✔
336
}
9,614✔
337

9,614✔
338

4,630✔
339
void Connection::initiate_session_deactivation(Session* sess)
4,630✔
340
{
4,630✔
341
    REALM_ASSERT(sess);
9,614✔
342
    REALM_ASSERT(&sess->m_conn == this);
9,614✔
343
    REALM_ASSERT(m_num_active_sessions);
978✔
344
    // Since the client may be waiting for m_num_active_sessions to reach 0
978✔
345
    // in stop_and_wait() (on a separate thread), deactivate Session before
9,614✔
346
    // decrementing the num active sessions value.
3,960✔
347
    sess->initiate_deactivation(); // Throws
332✔
348
    if (sess->m_state == Session::Deactivated) {
3,960✔
349
        finish_session_deactivation(sess);
9,614✔
350
    }
351
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
352
        if (m_activated && m_state == ConnectionState::disconnected)
353
            m_on_idle->trigger();
1,852✔
354
    }
1,852✔
355
}
1,006✔
356

1,852✔
357

1,640✔
358
void Connection::cancel_reconnect_delay()
824✔
359
{
900✔
360
    REALM_ASSERT(m_activated);
900✔
361

900✔
362
    if (m_reconnect_delay_in_progress) {
900✔
363
        if (m_nonzero_reconnect_delay)
900✔
364
            logger.detail("Canceling reconnect delay"); // Throws
900✔
365

1,640✔
366
        // Cancel the in-progress wait operation by destroying the timer
1,640✔
367
        // object. Destruction is needed in this case, because a new wait
1,640✔
368
        // operation might have to be initiated before the previous one
1,640✔
369
        // completes (its completion handler starts to execute), so the new wait
1,640✔
370
        // operation must be done on a new timer object.
1,640✔
371
        m_reconnect_disconnect_timer.reset();
106✔
372
        m_reconnect_delay_in_progress = false;
106✔
373
        m_reconnect_info.reset();
106✔
374
        initiate_reconnect_wait(); // Throws
106✔
375
        return;
106✔
376
    }
106✔
377

106✔
378
    // If we are not disconnected, then we need to make sure the next time we get disconnected
106✔
379
    // that we are allowed to re-connect as quickly as possible.
106✔
380
    //
106✔
381
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
212✔
382
    // backoff/delay state before calculating the next delay, unless a PONG message is received
212✔
383
    // for the urgent PING message we send below.
212✔
384
    //
106✔
385
    // If we get a PONG message for the urgent PING message sent below, then the connection is
212✔
386
    // healthy and we can calculate the next delay normally.
212✔
387
    if (m_state != ConnectionState::disconnected) {
212✔
388
        m_reconnect_info.scheduled_reset = true;
106✔
389
        m_ping_after_scheduled_reset_of_reconnect_info = false;
106✔
390

212✔
391
        schedule_urgent_ping(); // Throws
392
        return;
393
    }
7,912✔
394
    // Nothing to do in this case. The next reconnect attemp will be made as
7,912✔
395
    // soon as there are any sessions that are both active and unsuspended.
7,912✔
396
}
7,912✔
397

7,912✔
398
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
7,912✔
399
{
400
    REALM_ASSERT(sess->m_state == Session::Deactivated);
401
    auto ident = sess->m_ident;
2,254✔
402
    m_sessions.erase(ident);
2,254✔
403
    m_session_history.erase(ident);
×
404
}
×
405

1,062✔
406
void Connection::force_close()
2,254✔
407
{
1,062✔
408
    if (m_force_closed) {
2,254✔
409
        return;
2,168✔
410
    }
2,168✔
411

1,062✔
412
    m_force_closed = true;
2,254✔
413

2,254✔
414
    if (m_state != ConnectionState::disconnected) {
86✔
415
        voluntary_disconnect();
86✔
416
    }
86✔
417

86✔
418
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
1,062✔
419
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
1,062✔
420
        m_reconnect_disconnect_timer.reset();
1,062✔
421
        m_reconnect_delay_in_progress = false;
1,062✔
422
        m_disconnect_delay_in_progress = false;
2,254✔
423
    }
1,136✔
424

150✔
425
    // We must copy any session pointers we want to close to a vector because force_closing
150✔
426
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
150✔
427
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
150✔
428
    std::vector<Session*> to_close;
1,062✔
429
    for (auto& session_pair : m_sessions) {
1,136✔
430
        if (session_pair.second->m_state == Session::State::Active) {
150✔
431
            to_close.push_back(session_pair.second.get());
150✔
432
        }
1,062✔
433
    }
2,254✔
434

2,254✔
435
    for (auto& sess : to_close) {
436
        sess->force_close();
437
    }
438

3,134✔
439
    logger.debug("Force closed idle connection");
3,134✔
440
}
3,134✔
441

2,868✔
442

1,538✔
443
void Connection::websocket_connected_handler(const std::string& protocol)
3,134✔
444
{
3,134✔
445
    if (!protocol.empty()) {
3,134✔
446
        std::string_view expected_prefix =
3,134✔
447
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,134✔
448
        // FIXME: Use std::string_view::begins_with() in C++20.
3,134✔
449
        auto prefix_matches = [&](std::string_view other) {
3,134✔
450
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,134✔
451
        };
3,134✔
452
        if (prefix_matches(expected_prefix)) {
3,134✔
453
            util::MemoryInputStream in;
3,134✔
454
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,134✔
455
            in.imbue(std::locale::classic());
3,134✔
456
            in.unsetf(std::ios_base::skipws);
3,134✔
457
            int value_2 = 0;
3,134✔
458
            in >> value_2;
1,538✔
459
            if (in && in.eof() && value_2 >= 0) {
1,538✔
460
                bool good_version =
1,538✔
461
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,134✔
462
                if (good_version) {
3,134✔
463
                    logger.detail("Negotiated protocol version: %1", value_2);
3,134✔
464
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
3,134✔
465
                    // will provide the appservices connection ID via a log message.
3,134✔
466
                    // TODO: Remove once the server starts sending the connection ID
×
467
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,134✔
468
                    m_negotiated_protocol_version = value_2;
×
469
                    handle_connection_established(); // Throws
×
470
                    return;
×
471
                }
×
472
            }
×
473
        }
×
474
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
475
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
476
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
477
    }
3,134✔
478
    else {
479
        close_due_to_client_side_error(
480
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
481
            ConnectionTerminationReason::bad_headers_in_http_response);
78,316✔
482
    }
78,316✔
483
}
×
484

×
485

×
486
bool Connection::websocket_binary_message_received(util::Span<const char> data)
39,006✔
487
{
78,316✔
488
    if (m_force_closed) {
78,316✔
489
        logger.debug("Received binary message after connection was force closed");
392✔
490
        return false;
392✔
491
    }
392✔
492

392✔
493
    using sf = SimulatedFailure;
392✔
494
    if (sf::check_trigger(sf::sync_client__read_head)) {
38,782✔
495
        close_due_to_client_side_error(
77,924✔
496
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
77,924✔
497
            ConnectionTerminationReason::read_or_write_error);
77,924✔
498
        return bool(m_websocket);
499
    }
500

501
    handle_message_received(data);
518✔
502
    return bool(m_websocket);
518✔
503
}
518✔
504

505

506
void Connection::websocket_error_handler()
646✔
507
{
646✔
508
    m_websocket_error_received = true;
×
509
}
×
510

×
511
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
646✔
512
{
336✔
513
    if (m_force_closed) {
646✔
514
        logger.debug("Received websocket close message after connection was force closed");
76✔
515
        return false;
76✔
516
    }
4✔
517
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
4✔
518

4✔
519
    switch (error_code) {
4✔
520
        case WebSocketError::websocket_ok:
4✔
521
            break;
4✔
522
        case WebSocketError::websocket_resolve_failed:
4✔
523
            [[fallthrough]];
4✔
524
        case WebSocketError::websocket_connection_failed: {
504✔
525
            SessionErrorInfo error_info(
504✔
526
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
504✔
527
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
504✔
528
            break;
504✔
529
        }
504✔
530
        case WebSocketError::websocket_read_error:
504✔
531
            [[fallthrough]];
264✔
532
        case WebSocketError::websocket_write_error: {
×
533
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
✔
534
                                         ConnectionTerminationReason::read_or_write_error);
×
535
            break;
✔
536
        }
×
537
        case WebSocketError::websocket_going_away:
✔
538
            [[fallthrough]];
×
539
        case WebSocketError::websocket_protocol_error:
✔
540
            [[fallthrough]];
×
541
        case WebSocketError::websocket_unsupported_data:
✔
542
            [[fallthrough]];
×
543
        case WebSocketError::websocket_invalid_payload_data:
✔
544
            [[fallthrough]];
×
545
        case WebSocketError::websocket_policy_violation:
✔
546
            [[fallthrough]];
×
547
        case WebSocketError::websocket_reserved:
×
548
            [[fallthrough]];
×
549
        case WebSocketError::websocket_no_status_received:
×
550
            [[fallthrough]];
4✔
551
        case WebSocketError::websocket_invalid_extension: {
4✔
552
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
4✔
553
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
554
            break;
4✔
555
        }
4✔
556
        case WebSocketError::websocket_message_too_big: {
4✔
557
            auto message = util::format(
4✔
558
                "Sync websocket closed because the server received a message that was too large: %1", msg);
×
559
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
10✔
560
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
10✔
561
            involuntary_disconnect(std::move(error_info),
10✔
562
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
10✔
563
            break;
10✔
564
        }
×
565
        case WebSocketError::websocket_tls_handshake_failed: {
✔
566
            close_due_to_client_side_error(
×
567
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
✔
568
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
×
569
            break;
✔
570
        }
×
571
        case WebSocketError::websocket_client_too_old:
×
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_client_too_new:
×
574
            [[fallthrough]];
✔
575
        case WebSocketError::websocket_protocol_mismatch: {
×
576
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
577
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
578
            break;
×
579
        }
✔
580
        case WebSocketError::websocket_fatal_error: {
×
581
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true}),
×
582
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
583
            break;
×
584
        }
×
585
        case WebSocketError::websocket_forbidden: {
×
586
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
36✔
587
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
36✔
588
            involuntary_disconnect(std::move(error_info),
36✔
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
36✔
590
            break;
36✔
591
        }
36✔
592
        case WebSocketError::websocket_unauthorized: {
36✔
593
            SessionErrorInfo error_info(
36✔
594
                {ErrorCodes::AuthError,
36✔
595
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
×
596
                IsFatal{false});
12✔
597
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
12✔
598
            involuntary_disconnect(std::move(error_info),
12✔
599
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
600
            break;
12✔
601
        }
12✔
602
        case WebSocketError::websocket_moved_permanently: {
×
603
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
✔
604
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
605
            involuntary_disconnect(std::move(error_info),
×
606
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
607
            break;
×
608
        }
×
609
        case WebSocketError::websocket_abnormal_closure: {
×
610
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
✔
611
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
612
            involuntary_disconnect(std::move(error_info),
✔
613
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
614
            break;
×
615
        }
×
616
        case WebSocketError::websocket_internal_server_error:
646✔
617
            [[fallthrough]];
646✔
618
        case WebSocketError::websocket_retry_error: {
336✔
619
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
646✔
620
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
646✔
621
            break;
622
        }
623
    }
624

625
    return bool(m_websocket);
7,348✔
626
}
7,348✔
627

7,348✔
628
// Guarantees that handle_reconnect_wait() is never called from within the
7,348✔
629
// execution of initiate_reconnect_wait() (no callback reentrance).
3,658✔
630
void Connection::initiate_reconnect_wait()
3,658✔
631
{
7,348✔
632
    REALM_ASSERT(m_activated);
2,168✔
633
    REALM_ASSERT(!m_reconnect_delay_in_progress);
2,168✔
634
    REALM_ASSERT(!m_disconnect_delay_in_progress);
2,640✔
635

5,180✔
636
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
5,180✔
637
    if (m_force_closed) {
5,180✔
638
        return;
872✔
639
    }
476✔
640

872✔
641
    m_reconnect_delay_in_progress = true;
872✔
642
    auto delay = m_reconnect_info.delay_interval();
872✔
643
    if (delay == std::chrono::milliseconds::max()) {
2,164✔
644
        logger.detail("Reconnection delayed indefinitely"); // Throws
4,308✔
645
        // Not actually starting a timer corresponds to an infinite wait
4,080✔
646
        m_nonzero_reconnect_delay = true;
4,080✔
647
        return;
228✔
648
    }
228✔
649

228✔
650
    if (delay == std::chrono::milliseconds::zero()) {
228✔
651
        m_nonzero_reconnect_delay = false;
2,164✔
652
    }
2,164✔
653
    else {
2,164✔
654
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
2,164✔
655
        m_nonzero_reconnect_delay = true;
4,312✔
656
    }
2,164✔
657

2,164✔
658
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
4,312✔
659
    // we need it to be cancelable in case the connection is terminated before the timer
3,252✔
660
    // callback is run.
4,312✔
661
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,308✔
662
        // If the operation is aborted, the connection object may have been
663
        // destroyed.
664
        if (status != ErrorCodes::OperationAborted)
665
            handle_reconnect_wait(status); // Throws
3,252✔
666
    });                                    // Throws
3,252✔
667
}
×
668

×
669

×
670
void Connection::handle_reconnect_wait(Status status)
1,596✔
671
{
3,252✔
672
    if (!status.is_ok()) {
3,252✔
673
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
1,596✔
674
        throw Exception(status);
3,252✔
675
    }
3,252✔
676

3,252✔
677
    REALM_ASSERT(m_reconnect_delay_in_progress);
678
    m_reconnect_delay_in_progress = false;
679

680
    if (m_num_active_unsuspended_sessions > 0)
681
        initiate_reconnect(); // Throws
682
}
3,254✔
683

3,254✔
684
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
685
    explicit WebSocketObserverShim(Connection* conn)
686
        : conn(conn)
687
        , sentinel(conn->m_websocket_sentinel)
688
    {
689
    }
3,134✔
690

3,134✔
691
    Connection* conn;
×
692
    util::bind_ptr<LifecycleSentinel> sentinel;
×
693

1,538✔
694
    void websocket_connected_handler(const std::string& protocol) override
3,134✔
695
    {
3,134✔
696
        if (sentinel->destroyed) {
697
            return;
698
        }
518✔
699

518✔
700
        return conn->websocket_connected_handler(protocol);
×
701
    }
×
702

272✔
703
    void websocket_error_handler() override
518✔
704
    {
518✔
705
        if (sentinel->destroyed) {
706
            return;
707
        }
78,316✔
708

78,316✔
709
        conn->websocket_error_handler();
×
710
    }
×
711

39,006✔
712
    bool websocket_binary_message_received(util::Span<const char> data) override
78,316✔
713
    {
78,316✔
714
        if (sentinel->destroyed) {
715
            return false;
716
        }
646✔
717

646✔
718
        return conn->websocket_binary_message_received(data);
×
719
    }
×
720

336✔
721
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
646✔
722
    {
646✔
723
        if (sentinel->destroyed) {
724
            return true;
725
        }
726

3,252✔
727
        return conn->websocket_closed_handler(was_clean, error_code, msg);
3,252✔
728
    }
1,596✔
729
};
3,252✔
730

3,252✔
731
void Connection::initiate_reconnect()
3,252✔
732
{
×
733
    REALM_ASSERT(m_activated);
×
734

3,252✔
735
    m_state = ConnectionState::connecting;
3,252✔
736
    report_connection_state_change(ConnectionState::connecting); // Throws
1,596✔
737
    if (m_websocket_sentinel) {
1,596✔
738
        m_websocket_sentinel->destroyed = true;
3,252✔
739
    }
1,596✔
740
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,252✔
741
    m_websocket.reset();
3,252✔
742

3,252✔
743
    // Watchdog
2,980✔
744
    initiate_connect_wait(); // Throws
3,252✔
745

3,252✔
746
    std::vector<std::string> sec_websocket_protocol;
3,252✔
747
    {
1,596✔
748
        auto protocol_prefix =
1,596✔
749
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
32,522✔
750
        int min = get_oldest_supported_protocol_version();
29,270✔
751
        int max = get_current_protocol_version();
29,270✔
752
        REALM_ASSERT_3(min, <=, max);
3,252✔
753
        // List protocol version in descending order to ensure that the server
1,596✔
754
        // selects the highest possible version.
3,252✔
755
        for (int version = max; version >= min; --version) {
3,252✔
756
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
1,596✔
757
        }
3,252✔
758
    }
3,252✔
759

3,252✔
760
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,252✔
761
                m_server_endpoint.port, m_http_request_path_prefix);
3,252✔
762

3,252✔
763
    m_websocket_error_received = false;
3,252✔
764
    m_websocket =
3,252✔
765
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,252✔
766
                                            WebSocketEndpoint{
1,596✔
767
                                                m_server_endpoint.address,
3,252✔
768
                                                m_server_endpoint.port,
3,252✔
769
                                                get_http_request_path(),
3,252✔
770
                                                std::move(sec_websocket_protocol),
3,252✔
771
                                                is_ssl(m_server_endpoint.envelope),
3,252✔
772
                                                /// DEPRECATED - The following will be removed in a future release
3,252✔
773
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,252✔
774
                                                m_verify_servers_ssl_certificate,
775
                                                m_ssl_trust_certificate_path,
776
                                                m_ssl_verify_callback,
777
                                                m_proxy_config,
3,252✔
778
                                            });
1,598✔
779
}
1,598✔
780

1,598✔
781

1,598✔
782
void Connection::initiate_connect_wait()
3,252✔
783
{
1,598✔
784
    // Deploy a watchdog to enforce an upper bound on the time it can take to
3,254✔
785
    // fully establish the connection (including SSL and WebSocket
1,598✔
786
    // handshakes). Without such a watchdog, connect operations could take very
1,598✔
787
    // long, or even indefinite time.
3,254✔
788
    milliseconds_type time = m_client.m_connect_timeout;
×
789

3,254✔
790
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,252✔
791
        // If the operation is aborted, the connection object may have been
792
        // destroyed.
793
        if (status != ErrorCodes::OperationAborted)
794
            handle_connect_wait(status); // Throws
×
795
    });                                  // Throws
×
796
}
×
797

×
798

×
799
void Connection::handle_connect_wait(Status status)
800
{
×
801
    if (!status.is_ok()) {
×
802
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
803
        throw Exception(status);
×
804
    }
×
805

×
806
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
807
    logger.info("Connect timeout"); // Throws
808
    involuntary_disconnect(
809
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
810
                         IsFatal{false}},
3,134✔
811
        ConnectionTerminationReason::sync_connect_timeout); // Throws
1,538✔
812
}
3,134✔
813

1,538✔
814

3,134✔
815
void Connection::handle_connection_established()
1,538✔
816
{
3,134✔
817
    // Cancel connect timeout watchdog
3,134✔
818
    m_connect_timer.reset();
3,134✔
819

1,538✔
820
    m_state = ConnectionState::connected;
3,134✔
821

3,134✔
822
    milliseconds_type now = monotonic_clock_now();
880✔
823
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
880✔
824
    initiate_ping_delay(now);     // Throws
880✔
825

880✔
826
    bool fast_reconnect = false;
1,538✔
827
    if (m_disconnect_has_occurred) {
4,118✔
828
        milliseconds_type time = now - m_disconnect_time;
4,118✔
829
        if (time <= m_client.m_fast_reconnect_limit)
4,118✔
830
            fast_reconnect = true;
4,118✔
831
    }
1,538✔
832

3,134✔
833
    for (auto& p : m_sessions) {
3,134✔
834
        Session& sess = *p.second;
835
        sess.connection_established(fast_reconnect); // Throws
836
    }
837

212✔
838
    report_connection_state_change(ConnectionState::connected); // Throws
212✔
839
}
212✔
840

206✔
841

206✔
842
void Connection::schedule_urgent_ping()
206✔
843
{
206✔
844
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
206✔
845
    if (m_ping_delay_in_progress) {
206✔
846
        m_heartbeat_timer.reset();
206✔
847
        m_ping_delay_in_progress = false;
6!
848
        m_minimize_next_ping_delay = true;
6!
849
        milliseconds_type now = monotonic_clock_now();
6✔
850
        initiate_ping_delay(now); // Throws
6✔
851
        return;
852
    }
853
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
854
    if (!m_send_ping)
3,562✔
855
        m_minimize_next_ping_delay = true;
3,562✔
856
}
3,562✔
857

3,562✔
858

1,746✔
859
void Connection::initiate_ping_delay(milliseconds_type now)
3,562✔
860
{
3,562✔
861
    REALM_ASSERT(!m_ping_delay_in_progress);
3,354✔
862
    REALM_ASSERT(!m_waiting_for_pong);
1,644✔
863
    REALM_ASSERT(!m_send_ping);
1,644✔
864

1,644✔
865
    milliseconds_type delay = 0;
1,644✔
866
    if (!m_minimize_next_ping_delay) {
1,644✔
867
        delay = m_client.m_ping_keepalive_period;
3,240✔
868
        // Make a randomized deduction of up to 10%, or up to 100% if this is
3,354✔
869
        // the first PING message to be sent since the connection was
3,354✔
870
        // established. The purpose of this randomized deduction is to reduce
3,354✔
871
        // the risk of many connections sending PING messages simultaneously to
1,644✔
872
        // the server.
3,354✔
873
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,354✔
874
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,354✔
875
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,346✔
876
        delay -= randomized_deduction;
3,346✔
877
        // Deduct the time spent waiting for PONG
8✔
878
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
8✔
879
        milliseconds_type spent_time = now - m_pong_wait_started_at;
8✔
880
        if (spent_time < delay) {
3,354✔
881
            delay -= spent_time;
208✔
882
        }
208✔
883
        else {
208✔
884
            delay = 0;
1,746✔
885
        }
1,746✔
886
    }
3,562✔
887
    else {
1,746✔
888
        m_minimize_next_ping_delay = false;
3,558✔
889
    }
3,558✔
890

3,324✔
891

234✔
892
    m_ping_delay_in_progress = true;
×
893

116✔
894
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
234✔
895
        if (status == ErrorCodes::OperationAborted)
234✔
896
            return;
3,562✔
897
        else if (!status.is_ok())
3,562✔
898
            throw Exception(status);
899

900
        handle_ping_delay();                                    // Throws
901
    });                                                         // Throws
238✔
902
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
238✔
903
}
238✔
904

238✔
905

116✔
906
void Connection::handle_ping_delay()
238✔
907
{
116✔
908
    REALM_ASSERT(m_ping_delay_in_progress);
238✔
909
    m_ping_delay_in_progress = false;
208✔
910
    m_send_ping = true;
238✔
911

912
    initiate_pong_timeout(); // Throws
913

914
    if (m_state == ConnectionState::connected && !m_sending)
238✔
915
        send_next_message(); // Throws
238✔
916
}
238✔
917

238✔
918

116✔
919
void Connection::initiate_pong_timeout()
238✔
920
{
238✔
921
    REALM_ASSERT(!m_ping_delay_in_progress);
116✔
922
    REALM_ASSERT(!m_waiting_for_pong);
238✔
923
    REALM_ASSERT(m_send_ping);
238✔
924

238✔
925
    m_waiting_for_pong = true;
226✔
926
    m_pong_wait_started_at = monotonic_clock_now();
12✔
927

×
928
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
6✔
929
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
12✔
930
        if (status == ErrorCodes::OperationAborted)
12✔
931
            return;
238✔
932
        else if (!status.is_ok())
933
            throw Exception(status);
934

935
        handle_pong_timeout(); // Throws
12✔
936
    });                        // Throws
12✔
937
}
12✔
938

12✔
939

12✔
940
void Connection::handle_pong_timeout()
12✔
941
{
942
    REALM_ASSERT(m_waiting_for_pong);
943
    logger.debug("Timeout on reception of PONG message"); // Throws
944
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
96,694✔
945
                                 ConnectionTerminationReason::pong_timeout);
46,634✔
946
}
96,694✔
947

×
948

46,634✔
949
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
96,694✔
950
{
96,594✔
951
    // Stop sending messages if an websocket error was received.
1,434✔
952
    if (m_websocket_error_received)
1,434✔
953
        return;
95,160✔
954

×
955
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
956
        if (sentinel->destroyed) {
×
957
            return;
×
958
        }
×
959
        if (!status.is_ok()) {
×
960
            if (status != ErrorCodes::Error::OperationAborted) {
95,160✔
961
                // Write errors will be handled by the websocket_write_error_handler() callback
95,160✔
962
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
96,694✔
963
            }
96,694✔
964
            return;
96,694✔
965
        }
966
        handle_write_message(); // Throws
967
    });                         // Throws
968
    m_sending_session = sess;
95,162✔
969
    m_sending = true;
95,162✔
970
}
95,162✔
971

100✔
972

100✔
973
void Connection::handle_write_message()
95,162✔
974
{
95,162✔
975
    m_sending_session->message_sent(); // Throws
95,162✔
976
    if (m_sending_session->m_state == Session::Deactivated) {
95,162✔
977
        finish_session_deactivation(m_sending_session);
978
    }
979
    m_sending_session = nullptr;
980
    m_sending = false;
153,162✔
981
    send_next_message(); // Throws
153,162✔
982
}
153,162✔
983

153,162✔
984

153,162✔
985
void Connection::send_next_message()
226✔
986
{
226✔
987
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
226✔
988
    REALM_ASSERT(!m_sending_session);
212,414✔
989
    REALM_ASSERT(!m_sending);
74,970✔
990
    if (m_send_ping) {
74,970✔
991
        send_ping(); // Throws
74,970✔
992
        return;
156,384✔
993
    }
74,970✔
994
    while (!m_sessions_enlisted_to_send.empty()) {
156,384✔
995
        // The state of being connected is not supposed to be able to change
156,384✔
996
        // across this loop thanks to the "no callback reentrance" guarantee
156,384✔
997
        // provided by Websocket::async_write_text(), and friends.
74,970✔
998
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
156,384✔
999

2,008✔
1000
        Session& sess = *m_sessions_enlisted_to_send.front();
2,008✔
1001
        m_sessions_enlisted_to_send.pop_front();
74,970✔
1002
        sess.send_message(); // Throws
74,970✔
1003

74,970✔
1004
        if (sess.m_state == Session::Deactivated) {
156,384✔
1005
            finish_session_deactivation(&sess);
96,906✔
1006
        }
156,384✔
1007

152,936✔
1008
        // An enlisted session may choose to not send a message. In that case,
1009
        // we should pass the opportunity to the next enlisted session.
1010
        if (m_sending)
1011
            break;
226✔
1012
    }
226✔
1013
}
226✔
1014

226✔
1015

110✔
1016
void Connection::send_ping()
226✔
1017
{
226✔
1018
    REALM_ASSERT(!m_ping_delay_in_progress);
202✔
1019
    REALM_ASSERT(m_waiting_for_pong);
110✔
1020
    REALM_ASSERT(m_send_ping);
226✔
1021

226✔
1022
    m_send_ping = false;
226✔
1023
    if (m_reconnect_info.scheduled_reset)
110✔
1024
        m_ping_after_scheduled_reset_of_reconnect_info = true;
226✔
1025

226✔
1026
    m_last_ping_sent_at = monotonic_clock_now();
226✔
1027
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
226✔
1028
                 m_previous_ping_rtt); // Throws
226✔
1029

226✔
1030
    ClientProtocol& protocol = get_client_protocol();
1031
    OutputBuffer& out = get_output_buffer();
1032
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
1033
    initiate_write_ping(out);                                          // Throws
226✔
1034
    m_ping_sent = true;
226✔
1035
}
226✔
1036

×
1037

×
1038
void Connection::initiate_write_ping(const OutputBuffer& out)
226✔
1039
{
×
1040
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
1041
        if (sentinel->destroyed) {
×
1042
            return;
×
1043
        }
×
1044
        if (!status.is_ok()) {
×
1045
            if (status != ErrorCodes::Error::OperationAborted) {
226✔
1046
                // Write errors will be handled by the websocket_write_error_handler() callback
226✔
1047
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
226✔
1048
            }
226✔
1049
            return;
1050
        }
1051
        handle_write_ping(); // Throws
1052
    });                      // Throws
226✔
1053
    m_sending = true;
226✔
1054
}
226✔
1055

226✔
1056

226✔
1057
void Connection::handle_write_ping()
226✔
1058
{
1059
    REALM_ASSERT(m_sending);
1060
    REALM_ASSERT(!m_sending_session);
1061
    m_sending = false;
77,926✔
1062
    send_next_message(); // Throws
38,784✔
1063
}
38,784✔
1064

77,926✔
1065

77,926✔
1066
void Connection::handle_message_received(util::Span<const char> data)
1067
{
1068
    // parse_message_received() parses the message and calls the proper handler
1069
    // on the Connection object (this).
4,268✔
1070
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
4,268✔
1071
}
2,016✔
1072

4,268✔
1073

2,044✔
1074
void Connection::initiate_disconnect_wait()
2,044✔
1075
{
2,044✔
1076
    REALM_ASSERT(!m_reconnect_delay_in_progress);
2,016✔
1077

4,268✔
1078
    if (m_disconnect_delay_in_progress) {
2,016✔
1079
        m_reconnect_disconnect_timer.reset();
4,268✔
1080
        m_disconnect_delay_in_progress = false;
2,016✔
1081
    }
2,016✔
1082

4,268✔
1083
    milliseconds_type time = m_client.m_connection_linger_time;
6✔
1084

4,268✔
1085
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,268✔
1086
        // If the operation is aborted, the connection object may have been
4,268✔
1087
        // destroyed.
1088
        if (status != ErrorCodes::OperationAborted)
1089
            handle_disconnect_wait(status); // Throws
1090
    });                                     // Throws
6✔
1091
    m_disconnect_delay_in_progress = true;
6✔
1092
}
×
1093

×
1094

×
1095
void Connection::handle_disconnect_wait(Status status)
1096
{
6✔
1097
    if (!status.is_ok()) {
1098
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
6✔
1099
        throw Exception(status);
6✔
1100
    }
6✔
1101

×
1102
    m_disconnect_delay_in_progress = false;
6✔
1103

6✔
1104
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
6✔
1105
    if (m_num_active_unsuspended_sessions == 0) {
6✔
1106
        if (m_client.m_connection_linger_time > 0)
1107
            logger.detail("Linger time expired"); // Throws
1108
        voluntary_disconnect();                   // Throws
1109
        logger.info("Disconnected");              // Throws
4✔
1110
    }
4✔
1111
}
4✔
1112

4✔
1113

4✔
1114
void Connection::close_due_to_protocol_error(Status status)
4✔
1115
{
1116
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
1117
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
1118
    involuntary_disconnect(std::move(error_info),
402✔
1119
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
402✔
1120
}
230✔
1121

402✔
1122

402✔
1123
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
1124
{
1125
    logger.info("Connection closed due to error: %1", status); // Throws
1126

516✔
1127
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
516✔
1128
}
516✔
1129

516✔
1130

270✔
1131
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
516✔
1132
{
516✔
1133
    logger.info("Connection closed due to transient error: %1", status); // Throws
1134
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
1135
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
1136

1137
    involuntary_disconnect(std::move(error_info), reason); // Throw
1138
}
66✔
1139

66✔
1140

66✔
1141
// Close connection due to error discovered on the server-side, and then
32✔
1142
// reported to the client by way of a connection-level ERROR message.
42✔
1143
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
56✔
1144
{
66✔
1145
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
66✔
1146
                int(error_code)); // Throws
66✔
1147

1148
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
1149
                                      : ConnectionTerminationReason::server_said_try_again_later;
1150
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
3,254✔
1151
                           reason); // Throws
1,598✔
1152
}
3,254✔
1153

1,598✔
1154

3,254✔
1155
void Connection::disconnect(const SessionErrorInfo& info)
3,134✔
1156
{
3,134✔
1157
    // Cancel connect timeout watchdog
1,538✔
1158
    m_connect_timer.reset();
1,538✔
1159

1,538✔
1160
    if (m_state == ConnectionState::connected) {
1,538✔
1161
        m_disconnect_time = monotonic_clock_now();
1,538✔
1162
        m_disconnect_has_occurred = true;
1,538✔
1163

3,134✔
1164
        // Sessions that are in the Deactivating state at this time can be
6,716✔
1165
        // immediately discarded, in part because they are no longer enlisted to
1,926✔
1166
        // send. Such sessions will be taken to the Deactivated state by
3,582✔
1167
        // Session::connection_lost(), and then they will be removed from
3,582✔
1168
        // `m_sessions`.
3,582✔
1169
        auto i = m_sessions.begin(), end = m_sessions.end();
3,584✔
1170
        while (i != end) {
1,704✔
1171
            // Prevent invalidation of the main iterator when erasing elements
3,582✔
1172
            auto j = i++;
3,134✔
1173
            Session& sess = *j->second;
1,598✔
1174
            sess.connection_lost(); // Throws
3,254✔
1175
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
1,598✔
1176
                m_sessions.erase(j);
3,254✔
1177
        }
3,254✔
1178
    }
3,254✔
1179

3,254✔
1180
    change_state_to_disconnected();
3,254✔
1181

3,254✔
1182
    m_ping_delay_in_progress = false;
3,254✔
1183
    m_waiting_for_pong = false;
3,254✔
1184
    m_send_ping = false;
1,598✔
1185
    m_minimize_next_ping_delay = false;
3,254✔
1186
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,254✔
1187
    m_ping_sent = false;
3,254✔
1188
    m_heartbeat_timer.reset();
3,254✔
1189
    m_previous_ping_rtt = 0;
3,254✔
1190

3,254✔
1191
    m_websocket_sentinel->destroyed = true;
3,254✔
1192
    m_websocket_sentinel.reset();
1,598✔
1193
    m_websocket.reset();
3,254✔
1194
    m_input_body_buffer.reset();
3,254✔
1195
    m_sending_session = nullptr;
3,254✔
1196
    m_sessions_enlisted_to_send.clear();
1197
    m_sending = false;
1198

106,642✔
1199
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
106,642✔
1200
    initiate_reconnect_wait();                                           // Throws
106,642✔
1201
}
1202

1203
bool Connection::is_flx_sync_connection() const noexcept
222✔
1204
{
222✔
1205
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
108✔
1206
}
222✔
1207

222✔
1208
void Connection::receive_pong(milliseconds_type timestamp)
×
1209
{
×
1210
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
×
1211

×
1212
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
108✔
1213
    if (REALM_UNLIKELY(!legal_at_this_time)) {
222✔
1214
        close_due_to_protocol_error(
×
1215
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1216
        return;
×
1217
    }
×
1218

×
1219
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
×
1220
        close_due_to_protocol_error(
108✔
1221
            {ErrorCodes::SyncProtocolInvariantFailed,
222✔
1222
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
222✔
1223
                          m_last_ping_sent_at, timestamp)}); // Throws
222✔
1224
        return;
222✔
1225
    }
108✔
1226

108✔
1227
    milliseconds_type now = monotonic_clock_now();
108✔
1228
    milliseconds_type round_trip_time = now - timestamp;
108✔
1229
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
222✔
1230
    m_previous_ping_rtt = round_trip_time;
198✔
1231

198✔
1232
    // If this PONG message is a response to a PING mesage that was sent after
198✔
1233
    // the last invocation of cancel_reconnect_delay(), then the connection is
198✔
1234
    // still good, and we do not have to skip the next reconnect delay.
108✔
1235
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
222✔
1236
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
222✔
1237
        m_ping_after_scheduled_reset_of_reconnect_info = false;
108✔
1238
        m_reconnect_info.scheduled_reset = false;
222✔
1239
    }
108✔
1240

222✔
1241
    m_heartbeat_timer.reset();
×
1242
    m_waiting_for_pong = false;
222✔
1243

1244
    initiate_ping_delay(now); // Throws
1245

70,984✔
1246
    if (m_client.m_roundtrip_time_handler)
70,984✔
1247
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1248
}
×
1249

35,816✔
1250
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
70,984✔
1251
{
70,988✔
1252
    if (session_ident == 0) {
70,986✔
1253
        return nullptr;
70,986✔
1254
    }
2,147,483,647✔
1255

2,147,483,649✔
1256
    auto* sess = get_session(session_ident);
×
1257
    if (REALM_LIKELY(sess)) {
×
1258
        return sess;
×
1259
    }
×
1260
    // Check the history to see if the message received was for a previous session
×
1261
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
×
1262
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
2,147,483,649✔
1263
        close_due_to_protocol_error(
2,147,483,649✔
1264
            {ErrorCodes::SyncProtocolInvariantFailed,
2,147,483,649✔
1265
             util::format("Received message %1 for session iden %2 when that session never existed", message,
2,147,483,649✔
1266
                          session_ident)});
2,147,483,649✔
1267
    }
2,147,483,649✔
1268
    else {
1269
        logger.error("Received %1 message for closed session, session_ident = %2", message,
1270
                     session_ident); // Throws
946✔
1271
    }
946✔
1272
    return nullptr;
946✔
1273
}
876✔
1274

876✔
1275
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
×
1276
{
×
1277
    Session* sess = nullptr;
876✔
1278
    if (session_ident != 0) {
×
1279
        sess = find_and_validate_session(session_ident, "ERROR");
×
1280
        if (REALM_UNLIKELY(!sess)) {
×
1281
            return;
448✔
1282
        }
876✔
1283
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
×
1284
            close_due_to_protocol_error(std::move(status)); // Throws
×
1285
            return;
876✔
1286
        }
876✔
1287

34✔
1288
        if (sess->m_state == Session::Deactivated) {
70✔
1289
            finish_session_deactivation(sess);
70✔
1290
        }
70✔
1291
        return;
34✔
1292
    }
70✔
1293

70✔
1294
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
66✔
1295
                info.message, info.raw_error_code, info.is_fatal, session_ident,
66✔
1296
                info.server_requests_action); // Throws
66✔
1297

66✔
1298
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
66✔
1299
    if (REALM_LIKELY(known_error_code)) {
×
1300
        ProtocolError error_code = ProtocolError(info.raw_error_code);
×
1301
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
×
1302
            close_due_to_server_side_error(error_code, info); // Throws
×
1303
            return;
×
1304
        }
4✔
1305
        close_due_to_protocol_error(
4✔
1306
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1307
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
4✔
1308
                          info.raw_error_code)});
4✔
1309
    }
70✔
1310
    else {
1311
        close_due_to_protocol_error(
1312
            {ErrorCodes::SyncProtocolInvariantFailed,
1313
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
1314
    }
16✔
1315
}
16✔
1316

×
1317

×
1318
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
×
1319
                                             session_ident_type session_ident)
8✔
1320
{
16✔
1321
    if (session_ident == 0) {
×
1322
        return close_due_to_protocol_error(
×
1323
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1324
    }
8✔
1325

16✔
1326
    if (!is_flx_sync_connection()) {
16✔
1327
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1328
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1329
    }
8✔
1330

16✔
1331
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
×
1332
    if (REALM_UNLIKELY(!sess)) {
×
1333
        return;
16✔
1334
    }
1335

1336
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
1337
        close_due_to_protocol_error(std::move(status));
3,236✔
1338
    }
3,236✔
1339
}
3,236✔
1340

×
1341

×
1342
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1,514✔
1343
{
3,236✔
1344
    Session* sess = find_and_validate_session(session_ident, "IDENT");
×
1345
    if (REALM_UNLIKELY(!sess)) {
3,236✔
1346
        return;
1347
    }
1348

1349
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
1350
        close_due_to_protocol_error(std::move(status)); // Throws
1351
}
45,308✔
1352

45,308✔
1353
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
45,308✔
1354
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
×
1355
                                          DownloadBatchState batch_state,
×
1356
                                          const ReceivedChangesets& received_changesets)
24,050✔
1357
{
45,308✔
1358
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
45,308✔
1359
    if (REALM_UNLIKELY(!sess)) {
45,308✔
1360
        return;
×
1361
    }
×
1362

45,308✔
1363
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
1364
                                                     received_changesets);
1365
        !status.is_ok()) {
16,680✔
1366
        close_due_to_protocol_error(std::move(status));
16,680✔
1367
    }
16,680✔
1368
}
×
1369

×
1370
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
7,746✔
1371
{
16,680✔
1372
    Session* sess = find_and_validate_session(session_ident, "MARK");
×
1373
    if (REALM_UNLIKELY(!sess)) {
16,680✔
1374
        return;
1375
    }
1376

1377
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
4,826✔
1378
        close_due_to_protocol_error(std::move(status)); // Throws
4,826✔
1379
}
4,826✔
1380

×
1381

×
1382
void Connection::receive_unbound_message(session_ident_type session_ident)
2,030✔
1383
{
4,826✔
1384
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
×
1385
    if (REALM_UNLIKELY(!sess)) {
×
1386
        return;
×
1387
    }
2,030✔
1388

4,826✔
1389
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
4,826✔
1390
        close_due_to_protocol_error(std::move(status)); // Throws
4,826✔
1391
        return;
4,826✔
1392
    }
1393

1394
    if (sess->m_state == Session::Deactivated) {
1395
        finish_session_deactivation(sess);
1396
    }
44✔
1397
}
44✔
1398

44✔
1399

×
1400
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
×
1401
                                               std::string_view body)
22✔
1402
{
44✔
1403
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
×
1404
    if (REALM_UNLIKELY(!sess)) {
×
1405
        return;
44✔
1406
    }
1407

1408
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
1409
        close_due_to_protocol_error(std::move(status));
1410
    }
6,646✔
1411
}
6,646✔
1412

6,648✔
1413

6,648✔
1414
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
6,648✔
1415
                                            std::string_view message)
2,147,483,647✔
1416
{
2,147,483,647✔
1417
    std::string prefix;
2,147,483,647✔
1418
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
2,822✔
1419
        prefix = util::format("Server[%1]", m_appservices_coid);
6,646✔
1420
    }
4,820✔
1421
    else {
4,818✔
1422
        prefix = "Server";
4,818✔
1423
    }
4,818✔
1424

1425
    if (session_ident != 0) {
2✔
1426
        if (auto sess = get_session(session_ident)) {
2✔
1427
            sess->logger.log(level, "%1 log: %2", prefix, message);
2✔
1428
            return;
902✔
1429
        }
1,826✔
1430

1,826✔
1431
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
1432
        return;
1433
    }
1434

4,962✔
1435
    logger.log(level, "%1 log: %2", prefix, message);
2,440✔
1436
}
4,962✔
1437

2,232✔
1438

2,232✔
1439
void Connection::receive_appservices_request_id(std::string_view coid)
2,232✔
1440
{
4,962✔
1441
    // Only set once per connection
1442
    if (!coid.empty() && m_appservices_coid.empty()) {
1443
        m_appservices_coid = coid;
1444
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
×
1445
    }
×
1446
}
×
1447

1448

1449
void Connection::handle_protocol_error(Status status)
1450
{
1451
    close_due_to_protocol_error(std::move(status));
1452
}
1453

1454

1455
// Sessions are guaranteed to be granted the opportunity to send a message in
1456
// the order that they enlist. Note that this is important to ensure
1457
// nonoverlapping communication with the server for consecutive sessions
1458
// associated with the same Realm file.
157,984✔
1459
//
157,984✔
1460
// CAUTION: The specified session may get destroyed before this function
157,984✔
1461
// returns, but only if its Session::send_message() puts it into the Deactivated
157,984✔
1462
// state.
57,566✔
1463
void Connection::enlist_to_send(Session* sess)
157,984✔
1464
{
1465
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
1466
    m_sessions_enlisted_to_send.push_back(sess); // Throws
1467
    if (!m_sending)
72✔
1468
        send_next_message(); // Throws
72✔
1469
}
72✔
1470

1471

1472
std::string Connection::get_active_appservices_connection_id()
3,844✔
1473
{
3,844✔
1474
    return m_appservices_coid;
2,094✔
1475
}
3,844✔
1476

3,472✔
1477
void Session::cancel_resumption_delay()
198✔
1478
{
372✔
1479
    REALM_ASSERT_EX(m_state == Active, m_state);
198✔
1480

372✔
1481
    if (!m_suspended)
198✔
1482
        return;
372✔
1483

364✔
1484
    m_suspended = false;
198✔
1485

372✔
1486
    logger.debug("Resumed"); // Throws
198✔
1487

372✔
1488
    if (unbind_process_complete())
372✔
1489
        initiate_rebind(); // Throws
1490

1491
    m_conn.one_more_active_unsuspended_session(); // Throws
1492

1493
    on_resumed(); // Throws
21,246✔
1494
}
21,246✔
1495

21,208✔
1496

21,208✔
1497
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
18✔
1498
                                                 std::vector<ProtocolErrorInfo>* out)
38✔
1499
{
38✔
1500
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
38✔
1501
        return;
38✔
1502
    }
38✔
1503

38✔
1504
#ifdef REALM_DEBUG
38✔
1505
    REALM_ASSERT_DEBUG(
38✔
1506
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
38✔
1507
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
18✔
1508
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
78✔
1509
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
58✔
1510
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
40✔
1511
                       }));
40✔
1512
#endif
40✔
1513

40✔
1514
    while (!m_pending_compensating_write_errors.empty() &&
40✔
1515
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
40✔
1516
               changesets.back().version) {
38✔
1517
        auto& cur_error = m_pending_compensating_write_errors.front();
1518
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
1519
        out->push_back(std::move(cur_error));
1520
        m_pending_compensating_write_errors.pop_front();
1521
    }
1522
}
1523

43,212✔
1524

43,212✔
1525
void Session::integrate_changesets(ClientReplication& repl, const SyncProgress& progress,
43,212✔
1526
                                   std::uint_fast64_t downloadable_bytes,
21,938✔
1527
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
×
1528
                                   DownloadBatchState download_batch_state)
×
1529
{
×
1530
    auto& history = repl.get_history();
×
1531
    if (received_changesets.empty()) {
21,938✔
1532
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,938✔
1533
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
21,938✔
1534
                                       "received empty download message that was not the last in batch",
11,852✔
1535
                                       ProtocolError::bad_progress);
21,274✔
1536
        }
21,274✔
1537
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
21,274✔
1538
        return;
21,274✔
1539
    }
21,260✔
1540

21,246✔
1541
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,246✔
1542
    auto transact = get_db()->start_read();
21,274✔
1543
    history.integrate_server_changesets(
14,590✔
1544
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
14,590✔
1545
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
14,590✔
1546
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
6,684✔
1547
        }); // Throws
6,684✔
1548
    if (received_changesets.size() == 1) {
6,684✔
1549
        logger.debug("1 remote changeset integrated, producing client version %1",
6,684✔
1550
                     version_info.sync_version.version); // Throws
11,852✔
1551
    }
11,872✔
1552
    else {
40✔
1553
        logger.debug("%2 remote changesets integrated, producing client version %1",
40✔
1554
                     version_info.sync_version.version, received_changesets.size()); // Throws
40✔
1555
    }
40✔
1556

40✔
1557
    for (const auto& pending_error : pending_compensating_write_errors) {
40✔
1558
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
40✔
1559
                    pending_error.compensating_write_rejected_client_version,
40✔
1560
                    *pending_error.compensating_write_server_version, pending_error.message);
40✔
1561
        try {
40✔
1562
            on_connection_state_changed(
20✔
1563
                m_conn.get_state(),
×
1564
                SessionErrorInfo{pending_error,
×
1565
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
40✔
1566
                                                          pending_error.message)});
21,274✔
1567
        }
1568
        catch (...) {
1569
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
1570
        }
32✔
1571
    }
32✔
1572
}
32✔
1573

32✔
1574

16✔
1575
void Session::on_integration_failure(const IntegrationException& error)
32✔
1576
{
32✔
1577
    REALM_ASSERT_EX(m_state == Active, m_state);
16✔
1578
    REALM_ASSERT(!m_client_error && !m_error_to_send);
16✔
1579
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1580

16✔
1581
    m_client_error = util::make_optional<IntegrationException>(error);
16✔
1582
    m_error_to_send = true;
16✔
1583

32✔
1584
    // Surface the error to the user otherwise is lost.
32✔
1585
    on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}});
32✔
1586

32✔
1587
    // Since the deactivation process has not been initiated, the UNBIND
32✔
1588
    // message cannot have been sent unless an ERROR message was received.
1589
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
1590
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
44,580✔
1591
        ensure_enlisted_to_send(); // Throws
44,580✔
1592
    }
44,580✔
1593
}
44,580✔
1594

44,580✔
1595
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
44,580✔
1596
{
44,580✔
1597
    REALM_ASSERT_EX(m_state == Active, m_state);
32,206✔
1598
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
13,748✔
1599
    m_download_progress = progress.download;
1,384✔
1600
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
13,748✔
1601
    m_progress = progress;
13,748✔
1602
    if (upload_progressed) {
16,592✔
1603
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
32,206✔
1604
            if (progress.upload.client_version > m_upload_progress.client_version)
32,206✔
1605
                m_upload_progress = progress.upload;
23,664✔
1606
            m_last_version_selected_for_upload = progress.upload.client_version;
44,580✔
1607
        }
44,580✔
1608

23,664✔
1609
        check_for_upload_completion();
23,664✔
1610
    }
44,580✔
1611

2,438✔
1612
    do_recognize_sync_version(client_version); // Allows upload process to resume
2,438✔
1613
    check_for_download_completion();           // Throws
2,438✔
1614

23,664✔
1615
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,664✔
1616
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
23,664✔
1617
        auto& flx_subscription_store = *get_flx_subscription_store();
44,580✔
1618
        get_migration_store()->create_subscriptions(flx_subscription_store);
44,580✔
1619
    }
44,578✔
1620

44,578✔
1621
    // Since the deactivation process has not been initiated, the UNBIND
44,580✔
1622
    // message cannot have been sent unless an ERROR message was received.
1623
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
1624
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
1625
        ensure_enlisted_to_send(); // Throws
9,612✔
1626
    }
4,628✔
1627
}
9,612✔
1628

1629

1630
Session::~Session()
1631
{
9,614✔
1632
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
9,614✔
1633
}
9,614✔
1634

9,614✔
1635

9,614✔
1636
std::string Session::make_logger_prefix(session_ident_type ident)
9,614✔
1637
{
1638
    std::ostringstream out;
1639
    out.imbue(std::locale::classic());
1640
    out << "Session[" << ident << "]: "; // Throws
9,616✔
1641
    return out.str();                    // Throws
9,616✔
1642
}
4,630✔
1643

9,616✔
1644

4,630✔
1645
void Session::activate()
9,616✔
1646
{
4,630✔
1647
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
4,630✔
1648

4,630✔
1649
    logger.debug("Activating"); // Throws
4,630✔
1650

4,630✔
1651
    bool has_pending_client_reset = false;
4,630✔
1652
    if (REALM_LIKELY(!get_client().is_dry_run())) {
4,630✔
1653
        // The reason we need a mutable reference from get_client_reset_config() is because we
4,630✔
1654
        // don't want the session to keep a strong reference to the client_reset_config->fresh_copy
4,630✔
1655
        // DB. If it did, then the fresh DB would stay alive for the duration of this sync session
9,616✔
1656
        // and we want to clean it up once the reset is finished. Additionally, the fresh copy will
4,630✔
1657
        // be set to a new copy on every reset so there is no reason to keep a reference to it.
9,616✔
1658
        // The modification to the client reset config happens via std::move(client_reset_config->fresh_copy).
4,630✔
1659
        // If the client reset config were a `const &` then this std::move would create another strong
9,616✔
1660
        // reference which we don't want to happen.
9,616✔
1661
        util::Optional<ClientReset>& client_reset_config = get_client_reset_config();
9,610✔
1662

9,616✔
1663
        bool file_exists = util::File::exists(get_realm_path());
9,616✔
1664

336✔
1665
        logger.info("client_reset_config = %1, Realm exists = %2, "
336✔
1666
                    "client reset = %3",
336✔
1667
                    client_reset_config ? "true" : "false", file_exists ? "true" : "false",
336✔
1668
                    (client_reset_config && file_exists) ? "true" : "false"); // Throws
336✔
1669
        if (client_reset_config && !m_client_reset_operation) {
336✔
1670
            m_client_reset_operation = std::make_unique<_impl::ClientResetOperation>(
4,630✔
1671
                logger, get_db(), std::move(client_reset_config->fresh_copy), client_reset_config->mode,
9,616✔
1672
                std::move(client_reset_config->notify_before_client_reset),
9,278✔
1673
                std::move(client_reset_config->notify_after_client_reset),
9,278✔
1674
                client_reset_config->recovery_is_allowed); // Throws
9,278✔
1675
        }
9,278✔
1676

9,616✔
1677
        if (!m_client_reset_operation) {
9,616✔
1678
            const ClientReplication& repl = access_realm(); // Throws
9,616✔
1679
            repl.get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,616✔
1680
                                          &has_pending_client_reset); // Throws
9,616✔
1681
        }
9,616✔
1682
    }
4,630✔
1683
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,616✔
1684
                 m_client_file_ident.salt); // Throws
9,616✔
1685
    m_upload_progress = m_progress.upload;
9,616✔
1686
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,616✔
1687
    m_download_progress = m_progress.download;
9,616✔
1688
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,616✔
1689

4,630✔
1690
    logger.debug("last_version_available  = %1", m_last_version_available);           // Throws
9,616✔
1691
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,616✔
1692
    logger.debug("progress_download_client_version = %1",
4,630✔
1693
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,616✔
1694
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,616✔
1695
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
4,630✔
1696

9,616✔
1697
    reset_protocol_state();
9,616✔
1698
    m_state = Active;
9,616✔
1699

4,632✔
1700
    REALM_ASSERT(!m_suspended);
4✔
1701
    m_conn.one_more_active_unsuspended_session(); // Throws
4✔
1702

4✔
1703
    try {
4✔
1704
        process_pending_flx_bootstrap();
4✔
1705
    }
4,630✔
1706
    catch (const IntegrationException& error) {
9,612✔
1707
        logger.error("Error integrating bootstrap changesets: %1", error.what());
24✔
1708
        m_suspended = true;
24✔
1709
        m_conn.one_less_active_unsuspended_session(); // Throws
9,612✔
1710
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true}});
1711
    }
1712

1713
    if (has_pending_client_reset) {
1714
        handle_pending_client_reset_acknowledgement();
1715
    }
9,616✔
1716
}
9,616✔
1717

4,630✔
1718

9,616✔
1719
// The caller (Connection) must discard the session if the session has become
4,630✔
1720
// deactivated upon return.
9,616✔
1721
void Session::initiate_deactivation()
4,630✔
1722
{
9,616✔
1723
    REALM_ASSERT_EX(m_state == Active, m_state);
9,092✔
1724

4,630✔
1725
    logger.debug("Initiating deactivation"); // Throws
9,616✔
1726

4,754✔
1727
    m_state = Deactivating;
4,754✔
1728

4,754✔
1729
    if (!m_suspended)
2,338✔
1730
        m_conn.one_less_active_unsuspended_session(); // Throws
2,338✔
1731

2,338✔
1732
    if (m_enlisted_to_send) {
2,338✔
1733
        REALM_ASSERT(!unbind_process_complete());
4,862✔
1734
        return;
978✔
1735
    }
434✔
1736

978✔
1737
    // Deactivate immediately if the BIND message has not yet been sent and the
978✔
1738
    // session is not enlisted to send, or if the unbinding process has already
1,904✔
1739
    // completed.
1,904✔
1740
    if (!m_bind_message_sent || unbind_process_complete()) {
3,884✔
1741
        complete_deactivation(); // Throws
3,716✔
1742
        // Life cycle state is now Deactivated
3,716✔
1743
        return;
3,716✔
1744
    }
3,884✔
1745

1746
    // Ready to send the UNBIND message, if it has not already been sent
1747
    if (!m_unbind_message_sent) {
1748
        enlist_to_send(); // Throws
9,616✔
1749
        return;
9,616✔
1750
    }
9,616✔
1751
}
4,630✔
1752

9,616✔
1753

9,616✔
1754
void Session::complete_deactivation()
1755
{
1756
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
1757
    m_state = Deactivated;
1758

1759
    logger.debug("Deactivation completed"); // Throws
1760
}
1761

1762

156,386✔
1763
// Called by the associated Connection object when this session is granted an
156,386✔
1764
// opportunity to send a message.
156,386✔
1765
//
156,386✔
1766
// The caller (Connection) must discard the session if the session has become
156,386✔
1767
// deactivated upon return.
4,302✔
1768
void Session::send_message()
4,302✔
1769
{
4,302✔
1770
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
8,920✔
1771
    REALM_ASSERT(m_enlisted_to_send);
2,008✔
1772
    m_enlisted_to_send = false;
1,216✔
1773
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
2,008✔
1774
        // Deactivation has been initiated. If the UNBIND message has not been
3,086✔
1775
        // sent yet, there is no point in sending it. Instead, we can let the
3,086✔
1776
        // deactivation process complete.
3,086✔
1777
        if (!m_bind_message_sent) {
6,912✔
1778
            return complete_deactivation(); // Throws
6,912✔
1779
            // Life cycle state is now Deactivated
6,912✔
1780
        }
6,912✔
1781

70,668✔
1782
        // Session life cycle state is Deactivating or the unbinding process has
70,668✔
1783
        // been initiated by a session specific ERROR message
70,668✔
1784
        if (!m_unbind_message_sent)
147,466✔
1785
            send_unbind_message(); // Throws
70,668✔
1786
        return;
147,466✔
1787
    }
9,166✔
1788

66,330✔
1789
    // Session life cycle state is Active and the unbinding process has
138,300✔
1790
    // not been initiated
7,526✔
1791
    REALM_ASSERT(!m_unbind_message_sent);
7,526✔
1792

7,526✔
1793
    if (!m_bind_message_sent)
7,526✔
1794
        return send_bind_message(); // Throws
63,074✔
1795

130,774✔
1796
    if (!m_ident_message_sent) {
63,146✔
1797
        if (have_client_file_ident())
144✔
1798
            send_ident_message(); // Throws
144✔
1799
        return;
130,774✔
1800
    }
44✔
1801

44✔
1802
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
63,052✔
1803
                                                      [](const PendingTestCommand& command) {
130,730✔
1804
                                                          return command.pending;
30✔
1805
                                                      });
63,038✔
1806
    if (has_pending_test_command) {
63,038✔
1807
        return send_test_command_message();
130,700✔
1808
    }
16✔
1809

16✔
1810
    if (m_error_to_send)
63,030✔
1811
        return send_json_error_message(); // Throws
130,684✔
1812

17,252✔
1813
    // Stop sending upload, mark and query messages when the client detects an error.
54,994✔
1814
    if (m_client_error) {
113,438✔
1815
        return;
113,438✔
1816
    }
102,444✔
1817

102,444✔
1818
    if (m_target_download_mark > m_last_download_mark_sent)
5,670✔
1819
        return send_mark_message(); // Throws
10,994✔
1820

10,994✔
1821
    auto is_upload_allowed = [&]() -> bool {
×
1822
        if (!m_is_flx_sync_session) {
×
1823
            return true;
5,670✔
1824
        }
10,994✔
1825

10,994✔
1826
        auto migration_store = get_migration_store();
10,964✔
1827
        if (!migration_store) {
10,964✔
1828
            return true;
16✔
1829
        }
16✔
1830

30✔
1831
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
30✔
1832
        if (!sentinel_query_version) {
54,994✔
1833
            return true;
113,432✔
1834
        }
16✔
1835

16✔
1836
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
54,986✔
1837
        return m_last_sent_flx_query_version != *sentinel_query_version;
113,424✔
1838
    };
113,424✔
1839

102,446✔
1840
    if (!is_upload_allowed()) {
102,446✔
1841
        return;
5,662✔
1842
    }
10,978✔
1843

2,184✔
1844
    auto check_pending_flx_version = [&]() -> bool {
2,184✔
1845
        if (!m_is_flx_sync_session) {
4,566✔
1846
            return false;
8,794✔
1847
        }
8,794✔
1848

4,566✔
1849
        if (!m_allow_upload) {
8,794✔
1850
            return false;
7,312✔
1851
        }
7,312✔
1852

742✔
1853
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(
1,482✔
1854
            m_last_sent_flx_query_version, m_upload_progress.client_version);
1,482✔
1855

54,986✔
1856
        if (!m_pending_flx_sub_set) {
113,416✔
1857
            return false;
836✔
1858
        }
836✔
1859

54,568✔
1860
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
112,580✔
1861
    };
55,144✔
1862

55,144✔
1863
    if (check_pending_flx_version()) {
112,580✔
1864
        return send_query_change_message(); // throws
1865
    }
1866

1867
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
9,166✔
1868
        return send_upload_message(); // Throws
9,166✔
1869
    }
4,338✔
1870
}
9,166✔
1871

9,166✔
1872

9,166✔
1873
void Session::send_bind_message()
4,338✔
1874
{
4,338✔
1875
    REALM_ASSERT_EX(m_state == Active, m_state);
9,166✔
1876

9,166✔
1877
    session_ident_type session_ident = m_ident;
9,166✔
1878
    bool need_client_file_ident = !have_client_file_ident();
4,338✔
1879
    const bool is_subserver = false;
9,166✔
1880

9,166✔
1881

1,346✔
1882
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,346✔
1883
    int protocol_version = m_conn.get_negotiated_protocol_version();
60✔
1884
    OutputBuffer& out = m_conn.get_output_buffer();
60✔
1885
    // Discard the token since it's ignored by the server.
1,346✔
1886
    std::string empty_access_token;
1,346✔
1887
    if (m_is_flx_sync_session) {
1,346✔
1888
        nlohmann::json bind_json_data;
1,346✔
1889
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,346✔
1890
            bind_json_data["migratedPartition"] = *migrated_partition;
1,346✔
1891
        }
1,346✔
1892
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,346✔
1893
        if (logger.would_log(util::Logger::Level::debug)) {
1,346✔
1894
            std::string json_data_dump;
1,346✔
1895
            if (!bind_json_data.empty()) {
1,346✔
1896
                json_data_dump = bind_json_data.dump();
1,346✔
1897
            }
1,346✔
1898
            logger.debug(
7,820✔
1899
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
7,820✔
1900
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
7,820✔
1901
        }
7,820✔
1902
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
7,820✔
1903
                                       need_client_file_ident, is_subserver); // Throws
7,820✔
1904
    }
7,820✔
1905
    else {
9,166✔
1906
        std::string server_path = get_virt_path();
4,338✔
1907
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
9,166✔
1908
                     session_ident, need_client_file_ident, is_subserver, server_path);
4,338✔
1909
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
4,338✔
1910
                                       need_client_file_ident, is_subserver); // Throws
4,338✔
1911
    }
9,166✔
1912
    m_conn.initiate_write_message(out, this); // Throws
4,442✔
1913

9,166✔
1914
    m_bind_message_sent = true;
1915

1916
    // Ready to send the IDENT message if the file identifier pair is already
1917
    // available.
7,526✔
1918
    if (!need_client_file_ident)
7,526✔
1919
        enlist_to_send(); // Throws
7,526✔
1920
}
7,526✔
1921

7,526✔
1922

3,256✔
1923
void Session::send_ident_message()
3,256✔
1924
{
7,526✔
1925
    REALM_ASSERT_EX(m_state == Active, m_state);
7,526✔
1926
    REALM_ASSERT(m_bind_message_sent);
7,526✔
1927
    REALM_ASSERT(!m_unbind_message_sent);
3,256✔
1928
    REALM_ASSERT(have_client_file_ident());
7,526✔
1929

964✔
1930

964✔
1931
    ClientProtocol& protocol = m_conn.get_client_protocol();
964✔
1932
    OutputBuffer& out = m_conn.get_output_buffer();
964✔
1933
    session_ident_type session_ident = m_ident;
964✔
1934

964✔
1935
    if (m_is_flx_sync_session) {
964✔
1936
        const auto active_query_set = get_flx_subscription_store()->get_active();
964✔
1937
        const auto active_query_body = active_query_set.to_ext_json();
964✔
1938
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
964✔
1939
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
964✔
1940
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
964✔
1941
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
964✔
1942
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,562✔
1943
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
6,562✔
1944
                     active_query_body); // Throws
6,562✔
1945
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
6,562✔
1946
                                        active_query_set.version(), active_query_body); // Throws
6,562✔
1947
        m_last_sent_flx_query_version = active_query_set.version();
6,562✔
1948
    }
6,562✔
1949
    else {
6,562✔
1950
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,562✔
1951
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
7,526✔
1952
                     "latest_server_version_salt=%6)",
3,256✔
1953
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
7,526✔
1954
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
3,256✔
1955
                     m_progress.latest_server_version.salt);                                  // Throws
3,256✔
1956
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
7,526✔
1957
    }
7,526✔
1958
    m_conn.initiate_write_message(out, this); // Throws
1959

1960
    m_ident_message_sent = true;
836✔
1961

836✔
1962
    // Other messages may be waiting to be sent
836✔
1963
    enlist_to_send(); // Throws
836✔
1964
}
836✔
1965

836✔
1966
void Session::send_query_change_message()
418✔
1967
{
836✔
1968
    REALM_ASSERT_EX(m_state == Active, m_state);
836✔
1969
    REALM_ASSERT(m_ident_message_sent);
836✔
1970
    REALM_ASSERT(!m_unbind_message_sent);
836✔
1971
    REALM_ASSERT(m_pending_flx_sub_set);
836✔
1972
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
418✔
1973

836✔
1974
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
836✔
1975
        return;
836✔
1976
    }
836✔
1977

836✔
1978
    auto sub_store = get_flx_subscription_store();
418✔
1979
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
836✔
1980
    auto latest_queries = latest_sub_set.to_ext_json();
418✔
1981
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
836✔
1982
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
836✔
1983

1984
    OutputBuffer& out = m_conn.get_output_buffer();
1985
    session_ident_type session_ident = get_ident();
55,144✔
1986
    ClientProtocol& protocol = m_conn.get_client_protocol();
55,144✔
1987
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
55,144✔
1988
    m_conn.initiate_write_message(out, this);
55,144✔
1989

27,574✔
1990
    m_last_sent_flx_query_version = latest_sub_set.version();
55,144✔
1991

55,144✔
1992
    request_download_completion_notification();
646✔
1993
}
646✔
1994

646✔
1995
void Session::send_upload_message()
55,144✔
1996
{
422✔
1997
    REALM_ASSERT_EX(m_state == Active, m_state);
422✔
1998
    REALM_ASSERT(m_ident_message_sent);
27,574✔
1999
    REALM_ASSERT(!m_unbind_message_sent);
55,144✔
2000

27,574✔
2001
    if (REALM_UNLIKELY(get_client().is_dry_run()))
55,144✔
2002
        return;
55,144✔
2003

55,144✔
2004
    version_type target_upload_version = get_db()->get_version_of_latest_snapshot();
55,144✔
2005
    if (m_pending_flx_sub_set) {
27,574✔
2006
        REALM_ASSERT(m_is_flx_sync_session);
55,144✔
2007
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
13,806✔
2008
    }
28,728✔
2009
    if (target_upload_version > m_last_version_available) {
13,806✔
2010
        m_last_version_available = target_upload_version;
13,806✔
2011
    }
28,728✔
2012

208✔
2013
    const ClientReplication& repl = access_realm(); // Throws
208✔
2014

104✔
2015
    std::vector<UploadChangeset> uploadable_changesets;
208✔
2016
    version_type locked_server_version = 0;
208✔
2017
    repl.get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
26,416✔
2018
                                                  locked_server_version); // Throws
26,416✔
2019

26,416✔
2020
    if (uploadable_changesets.empty()) {
26,416✔
2021
        // Nothing more to upload right now
27,574✔
2022
        check_for_upload_completion(); // Throws
55,040✔
2023
        // If we need to limit upload up to some version other than the last client version available and there are no
438✔
2024
        // changes to upload, then there is no need to send an empty message.
438✔
2025
        if (m_pending_flx_sub_set) {
438✔
2026
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
27,470✔
2027
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
54,936✔
2028
            // Other messages may be waiting to be sent
54,936✔
2029
            return enlist_to_send(); // Throws
27,470✔
2030
        }
54,936✔
2031
    }
54,936✔
2032
    else {
54,936✔
2033
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
54,936✔
2034
    }
27,470✔
2035

54,936✔
2036
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
54,936✔
2037
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
27,470✔
2038
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
49,130✔
2039
    }
41,784✔
2040

41,784✔
2041
    version_type progress_client_version = m_upload_progress.client_version;
41,784✔
2042
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
41,784✔
2043

41,784✔
2044
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
×
2045
                 "locked_server_version=%3, num_changesets=%4)",
×
2046
                 progress_client_version, progress_server_version, locked_server_version,
×
2047
                 uploadable_changesets.size()); // Throws
×
2048

×
2049
    ClientProtocol& protocol = m_conn.get_client_protocol();
×
2050
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
×
2051

×
2052
    for (const UploadChangeset& uc : uploadable_changesets) {
×
2053
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
2054
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
×
2055
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
×
2056
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
×
2057
        if (logger.would_log(util::Logger::Level::trace)) {
×
2058
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2059
            if (changeset_data.size() < 1024) {
×
2060
                logger.trace("Changeset: %1",
×
2061
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2062
            }
×
2063
            else {
×
2064
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2065
                             protocol.compressed_hex_dump(changeset_data));
×
2066
            }
×
2067

×
2068
#if REALM_DEBUG
20,124✔
2069
            ChunkedBinaryInputStream in{changeset_data};
2070
            Changeset log;
2071
            try {
2072
                parse_changeset(in, log);
2073
                std::stringstream ss;
2074
                log.print(ss);
2075
                logger.trace("Changeset (parsed):\n%1", ss.str());
2076
            }
2077
            catch (const BadChangesetError& err) {
2078
                logger.error("Unable to parse changeset: %1", err.what());
2079
            }
2080
#endif
2081
        }
2082

2083
#if 0 // Upload log compaction is currently not implemented
2084
        if (!get_client().m_disable_upload_compaction) {
2085
            ChangesetEncoder::Buffer encode_buffer;
2086

2087
            {
2088
                // Upload compaction only takes place within single changesets to
2089
                // avoid another client seeing inconsistent snapshots.
2090
                ChunkedBinaryInputStream stream{uc.changeset};
2091
                Changeset changeset;
2092
                parse_changeset(stream, changeset); // Throws
2093
                // FIXME: What is the point of setting these? How can compaction care about them?
2094
                changeset.version = uc.progress.client_version;
2095
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2096
                changeset.origin_timestamp = uc.origin_timestamp;
2097
                changeset.origin_file_ident = uc.origin_file_ident;
2098

41,784✔
2099
                compact_changesets(&changeset, 1);
41,784✔
2100
                encode_changeset(changeset, encode_buffer);
41,784✔
2101

41,784✔
2102
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
41,784✔
2103
                             encode_buffer.size()); // Throws
41,784✔
2104
            }
41,784✔
2105

27,470✔
2106
            upload_message_builder.add_changeset(
54,936✔
2107
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
54,936✔
2108
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
54,936✔
2109
        }
54,936✔
2110
        else
54,936✔
2111
#endif
54,936✔
2112
        {
54,936✔
2113
            upload_message_builder.add_changeset(uc.progress.client_version,
27,470✔
2114
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
27,470✔
2115
                                                 uc.origin_file_ident,
54,936✔
2116
                                                 uc.changeset); // Throws
54,936✔
2117
        }
2118
    }
2119

2120
    int protocol_version = m_conn.get_negotiated_protocol_version();
17,252✔
2121
    OutputBuffer& out = m_conn.get_output_buffer();
17,252✔
2122
    session_ident_type session_ident = get_ident();
17,252✔
2123
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
17,252✔
2124
                                               progress_server_version,
17,252✔
2125
                                               locked_server_version); // Throws
8,036✔
2126
    m_conn.initiate_write_message(out, this);                          // Throws
17,252✔
2127

17,252✔
2128
    // Other messages may be waiting to be sent
8,036✔
2129
    enlist_to_send(); // Throws
17,252✔
2130
}
17,252✔
2131

17,252✔
2132

17,252✔
2133
void Session::send_mark_message()
17,252✔
2134
{
8,036✔
2135
    REALM_ASSERT_EX(m_state == Active, m_state);
17,252✔
2136
    REALM_ASSERT(m_ident_message_sent);
8,036✔
2137
    REALM_ASSERT(!m_unbind_message_sent);
8,036✔
2138
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,252✔
2139

17,252✔
2140
    request_ident_type request_ident = m_target_download_mark;
2141
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
2142

2143
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,912✔
2144
    OutputBuffer& out = m_conn.get_output_buffer();
6,912✔
2145
    session_ident_type session_ident = get_ident();
6,912✔
2146
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
6,912✔
2147
    m_conn.initiate_write_message(out, this);                      // Throws
3,086✔
2148

6,912✔
2149
    m_last_download_mark_sent = request_ident;
3,086✔
2150

6,912✔
2151
    // Other messages may be waiting to be sent
6,912✔
2152
    enlist_to_send(); // Throws
6,912✔
2153
}
6,912✔
2154

6,912✔
2155

3,086✔
2156
void Session::send_unbind_message()
6,912✔
2157
{
6,912✔
2158
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
2159
    REALM_ASSERT(m_bind_message_sent);
2160
    REALM_ASSERT(!m_unbind_message_sent);
2161

30✔
2162
    logger.debug("Sending: UNBIND"); // Throws
30✔
2163

30✔
2164
    ClientProtocol& protocol = m_conn.get_client_protocol();
30✔
2165
    OutputBuffer& out = m_conn.get_output_buffer();
30✔
2166
    session_ident_type session_ident = get_ident();
30✔
2167
    protocol.make_unbind_message(out, session_ident); // Throws
14✔
2168
    m_conn.initiate_write_message(out, this);         // Throws
30✔
2169

30✔
2170
    m_unbind_message_sent = true;
30✔
2171
}
30✔
2172

14✔
2173

30✔
2174
void Session::send_json_error_message()
30✔
2175
{
30✔
2176
    REALM_ASSERT_EX(m_state == Active, m_state);
14✔
2177
    REALM_ASSERT(m_ident_message_sent);
30✔
2178
    REALM_ASSERT(!m_unbind_message_sent);
30✔
2179
    REALM_ASSERT(m_error_to_send);
30✔
2180
    REALM_ASSERT(m_client_error);
30✔
2181

30✔
2182
    ClientProtocol& protocol = m_conn.get_client_protocol();
14✔
2183
    OutputBuffer& out = m_conn.get_output_buffer();
30✔
2184
    session_ident_type session_ident = get_ident();
30✔
2185
    auto protocol_error = m_client_error->error_for_server;
30✔
2186

2187
    auto message = util::format("%1", m_client_error->to_status());
2188
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
2189
                session_ident); // Throws
44✔
2190

44✔
2191
    nlohmann::json error_body_json;
22✔
2192
    error_body_json["message"] = std::move(message);
44✔
2193
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
44✔
2194
                                     error_body_json.dump()); // Throws
44✔
2195
    m_conn.initiate_write_message(out, this);                 // Throws
44✔
2196

44✔
2197
    m_error_to_send = false;
22✔
2198
    enlist_to_send(); // Throws
44✔
2199
}
44✔
2200

44✔
2201

22✔
2202
void Session::send_test_command_message()
44✔
2203
{
44✔
2204
    REALM_ASSERT_EX(m_state == Active, m_state);
22✔
2205

44✔
2206
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
44✔
2207
                           [](const PendingTestCommand& command) {
22✔
2208
                               return command.pending;
44✔
2209
                           });
44✔
2210
    REALM_ASSERT(it != m_pending_test_commands.end());
2211

2212
    ClientProtocol& protocol = m_conn.get_client_protocol();
2213
    OutputBuffer& out = m_conn.get_output_buffer();
3,236✔
2214
    auto session_ident = get_ident();
3,236✔
2215

3,236✔
2216
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
1,514✔
2217
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
1,514✔
2218

1,514✔
2219
    m_conn.initiate_write_message(out, this); // Throws;
1,514✔
2220
    it->pending = false;
3,236✔
2221

62✔
2222
    enlist_to_send();
1,504✔
2223
}
3,174✔
2224

3,174✔
2225

3,174✔
2226
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
×
2227
{
×
2228
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,174✔
2229
                 client_file_ident.salt); // Throws
×
2230

×
2231
    // Ignore the message if the deactivation process has been initiated,
3,174✔
2232
    // because in that case, the associated Realm and SessionWrapper must
×
2233
    // not be accessed any longer.
×
2234
    if (m_state != Active)
1,504✔
2235
        return Status::OK(); // Success
3,174✔
2236

1,504✔
2237
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
1,504✔
2238
                               !m_unbound_message_received);
1,504✔
2239
    if (REALM_UNLIKELY(!legal_at_this_time)) {
1,504✔
2240
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
3,174✔
2241
    }
1,504✔
2242
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,174✔
2243
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
3,174✔
2244
    }
2,838✔
2245
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
2,838✔
2246
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
168✔
2247
    }
168✔
2248

168✔
2249
    m_client_file_ident = client_file_ident;
168✔
2250

168✔
2251
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
168✔
2252
        // Ready to send the IDENT message
336✔
2253
        ensure_enlisted_to_send(); // Throws
220✔
2254
        return Status::OK();       // Success
104✔
2255
    }
104✔
2256

336✔
2257
    // access before the client reset (if applicable) because
168✔
2258
    // the reset can take a while and the sync session might have died
×
2259
    // by the time the reset finishes.
×
2260
    ClientReplication& repl = access_realm(); // Throws
168✔
2261

168✔
2262
    auto client_reset_if_needed = [&]() -> bool {
336✔
2263
        if (!m_client_reset_operation) {
168✔
2264
            return false;
336✔
2265
        }
336✔
2266

336✔
2267
        // ClientResetOperation::finalize() will return true only if the operation actually did
336✔
2268
        // a client reset. It may choose not to do a reset if the local Realm does not exist
336✔
2269
        // at this point (in that case there is nothing to reset). But in any case, we must
336✔
2270
        // clean up m_client_reset_operation at this point as sync should be able to continue from
336✔
2271
        // this point forward.
336✔
2272
        auto client_reset_operation = std::move(m_client_reset_operation);
336✔
2273
        util::UniqueFunction<void(int64_t)> on_flx_subscription_complete = [this](int64_t version) {
336✔
2274
            this->on_flx_sync_version_complete(version);
336✔
2275
        };
336✔
2276
        if (!client_reset_operation->finalize(client_file_ident, get_flx_subscription_store(),
168✔
2277
                                              std::move(on_flx_subscription_complete))) {
336✔
2278
            return false;
336✔
2279
        }
168✔
2280

168✔
2281
        // The fresh Realm has been used to reset the state
168✔
2282
        logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
336✔
2283

336✔
2284
        SaltedFileIdent client_file_ident;
168✔
2285
        bool has_pending_client_reset = false;
336✔
2286
        repl.get_history().get_status(m_last_version_available, client_file_ident, m_progress,
268✔
2287
                                      &has_pending_client_reset); // Throws
268✔
2288
        REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
168✔
2289
        REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
168✔
2290
        REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
336✔
2291
                        m_progress.download.last_integrated_client_version);
240✔
2292
        REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
240✔
2293
        REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
168✔
2294
                        m_progress.upload.last_integrated_server_version);
336✔
2295
        logger.trace("last_version_available  = %1", m_last_version_available); // Throws
336✔
2296

1,504✔
2297
        m_upload_progress = m_progress.upload;
1,504✔
2298
        m_download_progress = m_progress.download;
3,174✔
2299
        // In recovery mode, there may be new changesets to upload and nothing left to download.
3,174✔
2300
        // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
3,174✔
2301
        // For both, we want to allow uploads again without needing external changes to download first.
3,174✔
2302
        m_allow_upload = true;
1,538✔
2303
        REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
68✔
2304

68✔
2305
        if (has_pending_client_reset) {
68✔
2306
            handle_pending_client_reset_acknowledgement();
68✔
2307
        }
68✔
2308

68✔
2309
        // If a migration or rollback is in progress, mark it complete when client reset is completed.
3,106✔
2310
        if (auto migration_store = get_migration_store()) {
2,838✔
2311
            migration_store->complete_migration_or_rollback();
2,838✔
2312
        }
2,838✔
2313

2,838✔
2314
        return true;
2,838✔
2315
    };
2,838✔
2316
    // if a client reset happens, it will take care of setting the file ident
1,470✔
2317
    // and if not, we do it here
1,470✔
2318
    bool did_client_reset = false;
3,106✔
2319
    try {
3,106✔
2320
        did_client_reset = client_reset_if_needed();
3,106✔
2321
    }
2322
    catch (const std::exception& e) {
2323
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
2324
        logger.error(err_msg.c_str());
2325
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
45,310✔
2326
        suspend(err_info);
45,310✔
2327
        return Status::OK();
24,052✔
2328
    }
24,052✔
2329
    if (!did_client_reset) {
24,052✔
2330
        repl.get_history().set_client_file_ident(client_file_ident,
45,310✔
2331
                                                 m_fix_up_object_ids); // Throws
522✔
2332
        m_progress.download.last_integrated_client_version = 0;
23,768✔
2333
        m_progress.upload.client_version = 0;
44,788✔
2334
        m_last_version_selected_for_upload = 0;
43,214✔
2335
    }
43,214✔
2336

23,768✔
2337
    // Ready to send the IDENT message
44,788✔
2338
    ensure_enlisted_to_send(); // Throws
44,788✔
2339
    return Status::OK();       // Success
44,788✔
2340
}
44,788✔
2341

44,788✔
2342
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
44,788✔
2343
                                         DownloadBatchState batch_state, int64_t query_version,
44,788✔
2344
                                         const ReceivedChangesets& received_changesets)
44,788✔
2345
{
23,768✔
2346
    REALM_ASSERT_EX(query_version >= 0, query_version);
23,768✔
2347
    // Ignore the message if the deactivation process has been initiated,
23,768✔
2348
    // because in that case, the associated Realm and SessionWrapper must
44,788✔
2349
    // not be accessed any longer.
×
2350
    if (m_state != Active)
×
2351
        return Status::OK();
×
2352

23,768✔
2353
    if (is_steady_state_download_message(batch_state, query_version)) {
44,790✔
2354
        batch_state = DownloadBatchState::SteadyState;
44,788✔
2355
    }
×
2356

×
2357
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
44,788✔
2358
                 "latest_server_version=%3, latest_server_version_salt=%4, "
×
2359
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
×
2360
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
×
2361
                 progress.download.server_version, progress.download.last_integrated_client_version,
23,768✔
2362
                 progress.latest_server_version.version, progress.latest_server_version.salt,
44,788✔
2363
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
44,788✔
2364
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
44,274✔
2365

22,110✔
2366
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
22,110✔
2367
    // changeset over and over again.
22,930✔
2368
    if (m_client_error) {
41,796✔
2369
        logger.debug("Ignoring download message because the client detected an integration error");
22,110✔
2370
        return Status::OK();
42,616✔
2371
    }
42,616✔
2372

×
2373
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
×
2374
    if (REALM_UNLIKELY(!legal_at_this_time)) {
×
2375
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2376
    }
42,616✔
2377
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
22,110✔
2378
        logger.error("Bad sync progress received (%1)", status);
22,110✔
2379
        return status;
42,616✔
2380
    }
42,616✔
2381

42,616✔
2382
    version_type server_version = m_progress.download.server_version;
42,616✔
2383
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
×
2384
    for (const Transformer::RemoteChangeset& changeset : received_changesets) {
×
2385
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
×
2386
        // version must be increasing, but can stay the same during bootstraps.
×
2387
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
×
2388
                                                         : (changeset.remote_version > server_version);
×
2389
        // Each server version cannot be greater than the one in the header of the download message.
42,616✔
2390
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
22,110✔
2391
        if (!good_server_version) {
22,110✔
2392
            return {ErrorCodes::SyncProtocolInvariantFailed,
42,616✔
2393
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
42,616✔
2394
                                 changeset.remote_version, server_version, progress.download.server_version)};
42,616✔
2395
        }
×
2396
        server_version = changeset.remote_version;
×
2397
        // Check that per-changeset last integrated client version is "weakly"
×
2398
        // increasing.
×
2399
        bool good_client_version =
42,616✔
2400
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
23,768✔
2401
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
44,788✔
2402
        if (!good_client_version) {
44,788✔
2403
            return {ErrorCodes::SyncProtocolInvariantFailed,
44,788✔
2404
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
12✔
2405
                                 "(%1, %2, %3)",
12✔
2406
                                 changeset.last_integrated_local_version, last_integrated_client_version,
44,776✔
2407
                                 progress.download.last_integrated_client_version)};
23,762✔
2408
        }
44,776✔
2409
        last_integrated_client_version = changeset.last_integrated_local_version;
1,566✔
2410
        // Server shouldn't send our own changes, and zero is not a valid client
1,566✔
2411
        // file identifier.
1,566✔
2412
        bool good_file_ident =
22,978✔
2413
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,210✔
2414
        if (!good_file_ident) {
22,978✔
2415
            return {ErrorCodes::SyncProtocolInvariantFailed,
43,210✔
2416
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
43,210✔
2417
                                 changeset.origin_file_ident)};
43,210✔
2418
        }
×
2419
    }
×
2420

43,210✔
2421
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
22,978✔
2422
                                       batch_state, received_changesets.size());
22,978✔
2423
    if (hook_action == SyncClientHookAction::EarlyReturn) {
22,978✔
2424
        return Status::OK();
43,210✔
2425
    }
43,210✔
2426
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
43,210✔
2427

2428
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
2429
        clear_resumption_delay_state();
16,680✔
2430
        return Status::OK();
16,680✔
2431
    }
7,746✔
2432

7,746✔
2433
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
7,746✔
2434

7,746✔
2435
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
16,680✔
2436
                                  batch_state, received_changesets.size());
264✔
2437
    if (hook_action == SyncClientHookAction::EarlyReturn) {
7,712✔
2438
        return Status::OK();
16,416✔
2439
    }
16,416✔
2440
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
×
2441

×
2442
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
16,416✔
2443
    // after a retryable session error.
16,416✔
2444
    clear_resumption_delay_state();
16,416✔
2445
    return Status::OK();
×
2446
}
×
2447

×
2448
Status Session::receive_mark_message(request_ident_type request_ident)
×
2449
{
×
2450
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
×
2451

7,712✔
2452
    // Ignore the message if the deactivation process has been initiated,
16,416✔
2453
    // because in that case, the associated Realm and SessionWrapper must
16,416✔
2454
    // not be accessed any longer.
16,416✔
2455
    if (m_state != Active)
7,712✔
2456
        return Status::OK(); // Success
16,416✔
2457

16,416✔
2458
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
2459
    if (REALM_UNLIKELY(!legal_at_this_time)) {
2460
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
2461
    }
2462
    bool good_request_ident =
2463
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
4,826✔
2464
    if (REALM_UNLIKELY(!good_request_ident)) {
4,826✔
2465
        return {
2,030✔
2466
            ErrorCodes::SyncProtocolInvariantFailed,
4,826✔
2467
            util::format(
4,826✔
2468
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2469
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2470
    }
2,030✔
2471

2,030✔
2472
    m_server_version_at_last_download_mark = m_progress.download.server_version;
2,030✔
2473
    m_last_download_mark_received = request_ident;
2,030✔
2474
    check_for_download_completion(); // Throws
2,030✔
2475

4,826!
2476
    return Status::OK(); // Success
2,030✔
2477
}
4,826✔
2478

2,030✔
2479

2,030✔
2480
// The caller (Connection) must discard the session if the session has become
4,826✔
2481
// deactivated upon return.
2,030✔
2482
Status Session::receive_unbound_message()
2,030✔
2483
{
4,826✔
2484
    logger.debug("Received: UNBOUND");
2,030✔
2485

4,826✔
2486
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
2,030✔
2487
    if (REALM_UNLIKELY(!legal_at_this_time)) {
4,826✔
2488
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
4,826✔
2489
    }
2490

2491
    // The fact that the UNBIND message has been sent, but an ERROR message has
2492
    // not been received, implies that the deactivation process must have been
16✔
2493
    // initiated, so this session must be in the Deactivating state or the session
16✔
2494
    // has been suspended because of a client side error.
8✔
2495
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
8✔
2496

8✔
2497
    m_unbound_message_received = true;
16✔
2498

16✔
2499
    // Detect completion of the unbinding process
16✔
2500
    if (m_unbind_message_send_complete && m_state == Deactivating) {
16✔
2501
        // The deactivation process completes when the unbinding process
16✔
2502
        // completes.
2503
        complete_deactivation(); // Throws
2504
        // Life cycle state is now Deactivated
2505
    }
2506

876✔
2507
    return Status::OK(); // Success
876✔
2508
}
876✔
2509

448✔
2510

876✔
2511
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
876✔
2512
{
×
2513
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
×
2514
    // Ignore the message if the deactivation process has been initiated,
448✔
2515
    // because in that case, the associated Realm and SessionWrapper must
876✔
2516
    // not be accessed any longer.
876✔
2517
    if (m_state == Active) {
876✔
2518
        on_flx_sync_error(query_version, message); // throws
×
2519
    }
×
2520
    return Status::OK();
×
2521
}
×
2522

448✔
2523
// The caller (Connection) must discard the session if the session has become
448✔
2524
// deactivated upon return.
448✔
2525
Status Session::receive_error_message(const ProtocolErrorInfo& info)
876✔
2526
{
872✔
2527
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
872✔
2528
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
8✔
2529

8✔
2530
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
868✔
2531
    if (REALM_UNLIKELY(!legal_at_this_time)) {
444✔
2532
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
444✔
2533
    }
444✔
2534

868✔
2535
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
20✔
2536
    auto status = protocol_error_to_status(protocol_error, info.message);
20✔
2537
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
40✔
2538
        return {ErrorCodes::SyncProtocolInvariantFailed,
40✔
2539
                util::format("Received ERROR message for session with non-session-level error code %1",
40✔
2540
                             info.raw_error_code)};
40✔
2541
    }
40✔
2542

40✔
2543
    // Can't process debug hook actions once the Session is undergoing deactivation, since
424✔
2544
    // the SessionWrapper may not be available
828✔
2545
    if (m_state == Active) {
828✔
2546
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
828✔
2547
        if (debug_action == SyncClientHookAction::EarlyReturn) {
828✔
2548
            return Status::OK();
2549
        }
2550
    }
896✔
2551

896✔
2552
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
896✔
2553
    // containing the compensating write has appeared in a download message.
896✔
2554
    if (status == ErrorCodes::SyncCompensatingWrite) {
458✔
2555
        // If the client is not active, the compensating writes will not be processed now, but will be
896✔
2556
        // sent again the next time the client connects
458✔
2557
        if (m_state == Active) {
458✔
2558
            REALM_ASSERT(info.compensating_write_server_version.has_value());
896!
2559
            m_pending_compensating_write_errors.push_back(info);
2560
        }
2561
        return Status::OK();
2562
    }
×
2563

2564
    m_error_message_received = true;
2565
    suspend(SessionErrorInfo{info, std::move(status)});
2566
    return Status::OK();
×
2567
}
2568

×
2569
void Session::suspend(const SessionErrorInfo& info)
458✔
2570
{
458✔
2571
    REALM_ASSERT(!m_suspended);
458✔
2572
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
896✔
2573
    logger.debug("Suspended"); // Throws
892✔
2574

892✔
2575
    m_suspended = true;
892✔
2576

458✔
2577
    // Detect completion of the unbinding process
896✔
2578
    if (m_unbind_message_send_complete && m_error_message_received) {
368✔
2579
        // The fact that the UNBIND message has been sent, but we are not being suspended because
368✔
2580
        // we received an ERROR message implies that the deactivation process must
458✔
2581
        // have been initiated, so this session must be in the Deactivating state.
458✔
2582
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
896✔
2583

892✔
2584
        // The deactivation process completes when the unbinding process
896✔
2585
        // completes.
2586
        complete_deactivation(); // Throws
2587
        // Life cycle state is now Deactivated
44✔
2588
    }
44✔
2589

44✔
2590
    // Notify the application of the suspension of the session if the session is
44✔
2591
    // still in the Active state
44✔
2592
    if (m_state == Active) {
44✔
2593
        m_conn.one_less_active_unsuspended_session(); // Throws
44✔
2594
        on_suspended(info);                           // Throws
×
2595
    }
×
2596

×
2597
    if (!info.is_fatal) {
22✔
2598
        begin_resumption_delay(info);
44✔
2599
    }
44✔
2600

22✔
2601
    // Ready to send the UNBIND message, if it has not been sent already
44✔
2602
    if (!m_unbind_message_sent)
44✔
2603
        ensure_enlisted_to_send(); // Throws
2604
}
2605

368✔
2606
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
368✔
2607
{
196✔
2608
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
368✔
2609
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
368✔
2610
                           [&](const PendingTestCommand& command) {
368✔
2611
                               return command.id == ident;
368✔
2612
                           });
10✔
2613
    if (it == m_pending_test_commands.end()) {
10✔
2614
        return {ErrorCodes::SyncProtocolInvariantFailed,
10✔
2615
                util::format("Received test command response for a non-existent ident %1", ident)};
20✔
2616
    }
20✔
2617

368✔
2618
    it->promise.emplace_value(std::string{body});
368✔
2619
    m_pending_test_commands.erase(it);
368✔
2620

4✔
2621
    return Status::OK();
364✔
2622
}
×
2623

194✔
2624
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
364✔
2625
{
364✔
2626
    REALM_ASSERT(!m_try_again_activation_timer);
364✔
2627

368✔
2628
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
2629
                                  error_info.resumption_delay_interval);
2630
    auto try_again_interval = m_try_again_delay_info.delay_interval();
44,776✔
2631
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
44,776✔
2632
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
×
2633
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
×
2634
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
×
2635
        try_again_interval = std::chrono::milliseconds{1000};
44,776✔
2636
    }
2637
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
2638
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
44,790✔
2639
        if (status == ErrorCodes::OperationAborted)
44,790✔
2640
            return;
44,790✔
2641
        else if (!status.is_ok())
44,790✔
2642
            throw Exception(status);
44,790✔
2643

×
2644
        m_try_again_activation_timer.reset();
×
2645
        cancel_resumption_delay();
×
2646
    });
×
2647
}
44,790✔
2648

×
2649
void Session::clear_resumption_delay_state()
×
2650
{
×
2651
    if (m_try_again_activation_timer) {
×
2652
        logger.debug("Clearing resumption delay state after successful download");
44,790✔
2653
        m_try_again_delay_info.reset();
×
2654
    }
×
2655
}
×
2656

×
2657
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
44,790✔
2658
{
×
2659
    const SyncProgress& a = m_progress;
×
2660
    const SyncProgress& b = progress;
×
2661
    std::string message;
×
2662
    if (b.latest_server_version.version < a.latest_server_version.version) {
44,790✔
2663
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2664
                               "session (current: %1, received: %2)",
×
2665
                               a.latest_server_version.version, b.latest_server_version.version);
×
2666
    }
×
2667
    if (b.upload.client_version < a.upload.client_version) {
44,790✔
2668
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2669
                               "throughout a session (current: %1, received: %2)",
×
2670
                               a.upload.client_version, b.upload.client_version);
×
2671
    }
×
2672
    if (b.upload.client_version > m_last_version_available) {
×
2673
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
44,790✔
2674
                               "version in existence (current: %1, received: %2)",
×
2675
                               m_last_version_available, b.upload.client_version);
×
2676
    }
×
2677
    if (b.download.server_version < a.download.server_version) {
×
2678
        message =
×
2679
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
44,790✔
2680
                         a.download.server_version, b.download.server_version);
×
2681
    }
×
2682
    if (b.download.server_version > b.latest_server_version.version) {
×
2683
        message = util::format(
×
2684
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2685
            b.download.server_version, b.latest_server_version.version);
23,770✔
2686
    }
44,790✔
2687
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
44,788✔
2688
        message = util::format(
44,788✔
2689
            "Last integrated client version on the server at the position in the server's history of the download "
2✔
2690
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
2✔
2691
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
2692
    }
2693
    if (b.download.last_integrated_client_version > b.upload.client_version) {
2694
        message = util::format("Last integrated client version on the server in the position at the server's history "
76,182✔
2695
                               "of the download cursor cannot be greater than the latest client version integrated "
76,182✔
2696
                               "on the server (download: %1, upload: %2)",
76,182✔
2697
                               b.download.last_integrated_client_version, b.upload.client_version);
44,548✔
2698
    }
44,548✔
2699
    if (b.download.server_version < b.upload.last_integrated_server_version) {
15,144✔
2700
        message = util::format(
15,144✔
2701
            "The server version of the download cursor cannot be less than the server version integrated in the "
31,634✔
2702
            "latest client version acknowledged by the server (download: %1, upload: %2)",
236✔
2703
            b.download.server_version, b.upload.last_integrated_server_version);
15,026✔
2704
    }
15,026✔
2705

31,398✔
2706
    if (message.empty()) {
31,398✔
2707
        return Status::OK();
31,398✔
2708
    }
6,032✔
2709
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
12,508✔
2710
}
12,508✔
2711

25,366✔
2712

25,366✔
2713
void Session::check_for_upload_completion()
25,366✔
2714
{
10,710✔
2715
    REALM_ASSERT_EX(m_state == Active, m_state);
7,242✔
2716
    if (!m_upload_completion_notification_requested) {
14,656✔
2717
        return;
14,656✔
2718
    }
14,656✔
2719

2720
    // during an ongoing client reset operation, we never upload anything
2721
    if (m_client_reset_operation)
2722
        return;
60,996✔
2723

60,996✔
2724
    // Upload process must have reached end of history
60,996✔
2725
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
60,996✔
2726
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
44,378✔
2727
    if (!scan_complete)
16,618✔
2728
        return;
448✔
2729

16,170✔
2730
    // All uploaded changesets must have been acknowledged by the server
×
2731
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
16,170✔
2732
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
16,170✔
2733
    if (!all_uploads_accepted)
1,980✔
2734
        return;
1,980✔
2735

4,954✔
2736
    m_upload_completion_notification_requested = false;
4,954✔
2737
    on_upload_completion(); // Throws
4,954✔
2738
}
16,170✔
2739

16,170✔
2740

2741
void Session::check_for_download_completion()
2742
{
2743
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
2744
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
2745
    if (m_last_download_mark_received == m_last_triggering_download_mark)
2746
        return;
2747
    if (m_last_download_mark_received < m_target_download_mark)
2748
        return;
2749
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
2750
        return;
2751
    m_last_triggering_download_mark = m_target_download_mark;
2752
    if (REALM_UNLIKELY(!m_allow_upload)) {
2753
        // Activate the upload process now, and enable immediate reactivation
2754
        // after a subsequent fast reconnect.
2755
        m_allow_upload = true;
2756
        ensure_enlisted_to_send(); // Throws
2757
    }
2758
    on_download_completion(); // Throws
2759
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc