• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_2947

01 Dec 2023 08:08PM UTC coverage: 91.739% (+0.04%) from 91.695%
jonathan.reams_2947

Pull #7160

Evergreen

jbreams
allow handle_error to decide resumability
Pull Request #7160: Prevent resuming a session that has not been fully shut down

92428 of 169414 branches covered (0.0%)

315 of 349 new or added lines in 14 files covered. (90.26%)

80 existing lines in 14 files now uncovered.

232137 of 253041 relevant lines covered (91.74%)

6882826.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.74
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/protocol.hpp>
12
#include <realm/util/assert.hpp>
13
#include <realm/util/basic_system_errors.hpp>
14
#include <realm/util/memory_stream.hpp>
15
#include <realm/util/platform_info.hpp>
16
#include <realm/util/random.hpp>
17
#include <realm/util/safe_int_ops.hpp>
18
#include <realm/util/scope_exit.hpp>
19
#include <realm/util/to_string.hpp>
20
#include <realm/util/uri.hpp>
21
#include <realm/version.hpp>
22

23
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
24

25
#include <system_error>
26
#include <sstream>
27

28
// NOTE: The protocol specification is in `/doc/protocol.md`
29

30
using namespace realm;
31
using namespace _impl;
32
using namespace realm::util;
33
using namespace realm::sync;
34
using namespace realm::sync::websocket;
35

36
// clang-format off
37
using Connection      = ClientImpl::Connection;
38
using Session         = ClientImpl::Session;
39
using UploadChangeset = ClientHistory::UploadChangeset;
40

41
// These are a work-around for a bug in MSVC. It cannot find in-class types
42
// mentioned in signature of out-of-line member function definitions.
43
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
44
using OutputBuffer                = ClientImpl::OutputBuffer;
45
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
46
// clang-format on
47

48
void ClientImpl::ReconnectInfo::reset() noexcept
49
{
1,744✔
50
    m_backoff_state.reset();
1,744✔
51
    scheduled_reset = false;
1,744✔
52
}
1,744✔
53

54

55
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
56
                                       std::optional<ResumptionDelayInfo> new_delay_info)
57
{
3,362✔
58
    m_backoff_state.update(new_reason, new_delay_info);
3,362✔
59
}
3,362✔
60

61

62
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
63
{
5,400✔
64
    if (scheduled_reset) {
5,400✔
65
        reset();
4✔
66
    }
4✔
67

2,680✔
68
    if (!m_backoff_state.triggering_error) {
5,400✔
69
        return std::chrono::milliseconds::zero();
4,164✔
70
    }
4,164✔
71

632✔
72
    switch (*m_backoff_state.triggering_error) {
1,236✔
73
        case ConnectionTerminationReason::closed_voluntarily:
84✔
74
            return std::chrono::milliseconds::zero();
84✔
75
        case ConnectionTerminationReason::server_said_do_not_reconnect:
18✔
76
            return std::chrono::milliseconds::max();
18✔
77
        default:
1,128✔
78
            if (m_reconnect_mode == ReconnectMode::testing) {
1,128✔
79
                return std::chrono::milliseconds::max();
902✔
80
            }
902✔
81

112✔
82
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
226✔
83
            return m_backoff_state.delay_interval();
226✔
84
    }
1,236✔
85
}
1,236✔
86

87

88
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
89
                                      port_type& port, std::string& path) const
90
{
3,434✔
91
    util::Uri uri(url); // Throws
3,434✔
92
    uri.canonicalize(); // Throws
3,434✔
93
    std::string userinfo, address_2, port_2;
3,434✔
94
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
3,434✔
95
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
3,434✔
96
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
3,434✔
97
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
3,434✔
98
    if (REALM_UNLIKELY(!good))
3,434✔
99
        return false;
1,542✔
100
    ProtocolEnvelope protocol_2;
3,434✔
101
    port_type port_3;
3,434✔
102
    if (realm_scheme) {
3,434✔
103
        if (uri.get_scheme() == "realm:") {
×
104
            protocol_2 = ProtocolEnvelope::realm;
×
105
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
106
        }
×
107
        else {
×
108
            protocol_2 = ProtocolEnvelope::realms;
×
109
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
110
        }
×
111
    }
×
112
    else {
3,434✔
113
        REALM_ASSERT(ws_scheme);
3,434✔
114
        if (uri.get_scheme() == "ws:") {
3,434✔
115
            protocol_2 = ProtocolEnvelope::ws;
3,430✔
116
            port_3 = 80;
3,430✔
117
        }
3,430✔
118
        else {
4✔
119
            protocol_2 = ProtocolEnvelope::wss;
4✔
120
            port_3 = 443;
4✔
121
        }
4✔
122
    }
3,434✔
123
    if (!port_2.empty()) {
3,434✔
124
        std::istringstream in(port_2);    // Throws
3,434✔
125
        in.imbue(std::locale::classic()); // Throws
3,434✔
126
        in >> port_3;
3,434✔
127
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,434✔
128
            return false;
1,542✔
129
    }
3,434✔
130
    std::string path_2 = uri.get_path(); // Throws (copy)
3,434✔
131

1,542✔
132
    protocol = protocol_2;
3,434✔
133
    address = std::move(address_2);
3,434✔
134
    port = port_3;
3,434✔
135
    path = std::move(path_2);
3,434✔
136
    return true;
3,434✔
137
}
3,434✔
138

139

140
ClientImpl::ClientImpl(ClientConfig config)
141
    : logger_ptr{config.logger ? std::move(config.logger) : util::Logger::get_default_logger()}
142
    , logger{*logger_ptr}
143
    , m_reconnect_mode{config.reconnect_mode}
144
    , m_connect_timeout{config.connect_timeout}
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
151
    , m_dry_run{config.dry_run}
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
156
    , m_socket_provider{std::move(config.socket_provider)}
157
    , m_client_protocol{} // Throws
158
    , m_one_connection_per_session{config.one_connection_per_session}
159
    , m_random{}
160
{
9,474✔
161
    // FIXME: Would be better if seeding was up to the application.
4,668✔
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,474✔
163

4,668✔
164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,474✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,474✔
166
                 get_current_protocol_version()); // Throws
9,474✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,474✔
168
    const char* build_mode;
9,474✔
169
#if REALM_DEBUG
9,474✔
170
    build_mode = "Debug";
9,474✔
171
#else
172
    build_mode = "Release";
173
#endif
174
    logger.debug("Build mode: %1", build_mode);
9,474✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,474✔
176
                 config.one_connection_per_session); // Throws
9,474✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,474✔
178
                 config.connect_timeout); // Throws
9,474✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,474✔
180
                 config.connection_linger_time); // Throws
9,474✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,474✔
182
                 config.ping_keepalive_period); // Throws
9,474✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,474✔
184
                 config.pong_keepalive_timeout); // Throws
9,474✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,474✔
186
                 config.fast_reconnect_limit); // Throws
9,474✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,474✔
188
                 config.disable_upload_compaction); // Throws
9,474✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,474✔
190
                 config.disable_sync_to_disk); // Throws
9,474✔
191
    logger.debug("Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3",
9,474✔
192
                 m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,474✔
193
                 m_reconnect_backoff_info.resumption_delay_interval.count(),
9,474✔
194
                 m_reconnect_backoff_info.resumption_delay_backoff_multiplier);
9,474✔
195

4,668✔
196
    if (config.reconnect_mode != ReconnectMode::normal) {
9,474✔
197
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
764✔
198
                    "never do this in production!");
764✔
199
    }
764✔
200

4,668✔
201
    if (config.dry_run) {
9,474✔
202
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
203
                    "never do this in production!");
×
204
    }
×
205

4,668✔
206
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,474✔
207

4,668✔
208
    if (m_one_connection_per_session) {
9,474✔
209
        // FIXME: Re-enable this warning when the load balancer is able to handle
2✔
210
        // multiplexing.
2✔
211
        //        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
2✔
212
        //            "never do this in production");
2✔
213
    }
4✔
214

4,668✔
215
    if (config.disable_upload_activation_delay) {
9,474✔
216
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
217
                    "never do this in production");
×
218
    }
×
219

4,668✔
220
    if (config.disable_sync_to_disk) {
9,474✔
221
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
222
                    "never do this in production");
×
223
    }
×
224

4,668✔
225
    m_actualize_and_finalize = create_trigger([this](Status status) {
14,592✔
226
        if (status == ErrorCodes::OperationAborted)
14,592✔
227
            return;
×
228
        else if (!status.is_ok())
14,592✔
229
            throw Exception(status);
×
230
        actualize_and_finalize_session_wrappers(); // Throws
14,592✔
231
    });
14,592✔
232
}
9,474✔
233

234

235
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
236
{
162,944✔
237
    REALM_ASSERT(m_socket_provider);
162,944✔
238
    {
162,944✔
239
        std::lock_guard lock(m_drain_mutex);
162,944✔
240
        ++m_outstanding_posts;
162,944✔
241
        m_drained = false;
162,944✔
242
    }
162,944✔
243
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
162,942✔
244
        auto decr_guard = util::make_scope_exit([&]() noexcept {
162,950✔
245
            std::lock_guard lock(m_drain_mutex);
162,950✔
246
            REALM_ASSERT(m_outstanding_posts);
162,950✔
247
            --m_outstanding_posts;
162,950✔
248
            m_drain_cv.notify_all();
162,950✔
249
        });
162,950✔
250
        handler(status);
162,942✔
251
    });
162,942✔
252
}
162,944✔
253

254

255
void ClientImpl::drain_connections()
256
{
9,474✔
257
    logger.debug("Draining connections during sync client shutdown");
9,474✔
258
    for (auto& server_slot_pair : m_server_slots) {
5,938✔
259
        auto& server_slot = server_slot_pair.second;
2,402✔
260

1,132✔
261
        if (server_slot.connection) {
2,402✔
262
            auto& conn = server_slot.connection;
2,304✔
263
            conn->force_close();
2,304✔
264
        }
2,304✔
265
        else {
98✔
266
            for (auto& conn_pair : server_slot.alt_connections) {
48✔
267
                conn_pair.second->force_close();
6✔
268
            }
6✔
269
        }
98✔
270
    }
2,402✔
271
}
9,474✔
272

273

274
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
275
                                                       SyncSocketProvider::FunctionHandler&& handler)
276
{
16,404✔
277
    REALM_ASSERT(m_socket_provider);
16,404✔
278
    {
16,404✔
279
        std::lock_guard lock(m_drain_mutex);
16,404✔
280
        ++m_outstanding_posts;
16,404✔
281
        m_drained = false;
16,404✔
282
    }
16,404✔
283
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
16,410✔
284
        handler(status);
16,410✔
285

7,918✔
286
        std::lock_guard lock(m_drain_mutex);
16,410✔
287
        REALM_ASSERT(m_outstanding_posts);
16,410✔
288
        --m_outstanding_posts;
16,410✔
289
        m_drain_cv.notify_all();
16,410✔
290
    });
16,410✔
291
}
16,404✔
292

293

294
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
295
{
11,994✔
296
    REALM_ASSERT(m_socket_provider);
11,994✔
297
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
11,994✔
298
}
11,994✔
299

300
Connection::~Connection()
301
{
2,520✔
302
    if (m_websocket_sentinel) {
2,520✔
303
        m_websocket_sentinel->destroyed = true;
×
304
        m_websocket_sentinel.reset();
×
305
    }
×
306
}
2,520✔
307

308
void Connection::activate()
309
{
2,520✔
310
    REALM_ASSERT(m_on_idle);
2,520✔
311
    m_activated = true;
2,520✔
312
    if (m_num_active_sessions == 0)
2,520✔
313
        m_on_idle->trigger();
×
314
    // We cannot in general connect immediately, because a prior failure to
1,192✔
315
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,192✔
316
    initiate_reconnect_wait(); // Throws
2,520✔
317
}
2,520✔
318

319

320
void Connection::activate_session(std::unique_ptr<Session> sess)
321
{
9,694✔
322
    REALM_ASSERT(sess);
9,694✔
323
    REALM_ASSERT(&sess->m_conn == this);
9,694✔
324
    REALM_ASSERT(!m_force_closed);
9,694✔
325
    Session& sess_2 = *sess;
9,694✔
326
    session_ident_type ident = sess->m_ident;
9,694✔
327
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
9,694✔
328
    bool was_inserted = p.second;
9,694✔
329
    REALM_ASSERT(was_inserted);
9,694✔
330
    // Save the session ident to the historical list of session idents
4,668✔
331
    m_session_history.insert(ident);
9,694✔
332
    sess_2.activate(); // Throws
9,694✔
333
    if (m_state == ConnectionState::connected) {
9,694✔
334
        bool fast_reconnect = false;
6,684✔
335
        sess_2.connection_established(fast_reconnect); // Throws
6,684✔
336
    }
6,684✔
337
    ++m_num_active_sessions;
9,694✔
338
}
9,694✔
339

340

341
void Connection::initiate_session_deactivation(Session* sess)
342
{
9,692✔
343
    REALM_ASSERT(sess);
9,692✔
344
    REALM_ASSERT(&sess->m_conn == this);
9,692✔
345
    REALM_ASSERT(m_num_active_sessions);
9,692✔
346
    // Since the client may be waiting for m_num_active_sessions to reach 0
4,666✔
347
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,666✔
348
    // decrementing the num active sessions value.
4,666✔
349
    sess->initiate_deactivation(); // Throws
9,692✔
350
    if (sess->m_state == Session::Deactivated) {
9,692✔
351
        finish_session_deactivation(sess);
1,034✔
352
    }
1,034✔
353
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
9,692✔
354
        if (m_activated && m_state == ConnectionState::disconnected)
4,042✔
355
            m_on_idle->trigger();
328✔
356
    }
4,042✔
357
}
9,692✔
358

359

360
void Connection::cancel_reconnect_delay()
361
{
1,940✔
362
    REALM_ASSERT(m_activated);
1,940✔
363

1,002✔
364
    if (m_reconnect_delay_in_progress) {
1,940✔
365
        if (m_nonzero_reconnect_delay)
1,736✔
366
            logger.detail("Canceling reconnect delay"); // Throws
872✔
367

900✔
368
        // Cancel the in-progress wait operation by destroying the timer
900✔
369
        // object. Destruction is needed in this case, because a new wait
900✔
370
        // operation might have to be initiated before the previous one
900✔
371
        // completes (its completion handler starts to execute), so the new wait
900✔
372
        // operation must be done on a new timer object.
900✔
373
        m_reconnect_disconnect_timer.reset();
1,736✔
374
        m_reconnect_delay_in_progress = false;
1,736✔
375
        m_reconnect_info.reset();
1,736✔
376
        initiate_reconnect_wait(); // Throws
1,736✔
377
        return;
1,736✔
378
    }
1,736✔
379

102✔
380
    // If we are not disconnected, then we need to make sure the next time we get disconnected
102✔
381
    // that we are allowed to re-connect as quickly as possible.
102✔
382
    //
102✔
383
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
102✔
384
    // backoff/delay state before calculating the next delay, unless a PONG message is received
102✔
385
    // for the urgent PING message we send below.
102✔
386
    //
102✔
387
    // If we get a PONG message for the urgent PING message sent below, then the connection is
102✔
388
    // healthy and we can calculate the next delay normally.
102✔
389
    if (m_state != ConnectionState::disconnected) {
204✔
390
        m_reconnect_info.scheduled_reset = true;
204✔
391
        m_ping_after_scheduled_reset_of_reconnect_info = false;
204✔
392

102✔
393
        schedule_urgent_ping(); // Throws
204✔
394
        return;
204✔
395
    }
204✔
396
    // Nothing to do in this case. The next reconnect attemp will be made as
102✔
397
    // soon as there are any sessions that are both active and unsuspended.
102✔
398
}
204✔
399

400
void ClientImpl::Connection::finish_session_deactivation(Session* sess)
401
{
8,058✔
402
    REALM_ASSERT(sess->m_state == Session::Deactivated);
8,058✔
403
    auto ident = sess->m_ident;
8,058✔
404
    m_sessions.erase(ident);
8,058✔
405
    m_session_history.erase(ident);
8,058✔
406
}
8,058✔
407

408
void Connection::force_close()
409
{
2,312✔
410
    if (m_force_closed) {
2,312✔
411
        return;
×
412
    }
×
413

1,092✔
414
    m_force_closed = true;
2,312✔
415

1,092✔
416
    if (m_state != ConnectionState::disconnected) {
2,312✔
417
        voluntary_disconnect();
2,218✔
418
    }
2,218✔
419

1,092✔
420
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
2,312✔
421
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
2,312✔
422
        m_reconnect_disconnect_timer.reset();
94✔
423
        m_reconnect_delay_in_progress = false;
94✔
424
        m_disconnect_delay_in_progress = false;
94✔
425
    }
94✔
426

1,092✔
427
    // We must copy any session pointers we want to close to a vector because force_closing
1,092✔
428
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,092✔
429
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,092✔
430
    std::vector<Session*> to_close;
2,312✔
431
    for (auto& session_pair : m_sessions) {
1,162✔
432
        if (session_pair.second->m_state == Session::State::Active) {
138✔
433
            to_close.push_back(session_pair.second.get());
138✔
434
        }
138✔
435
    }
136✔
436

1,092✔
437
    for (auto& sess : to_close) {
1,162✔
438
        sess->force_close();
138✔
439
    }
138✔
440

1,092✔
441
    logger.debug("Force closed idle connection");
2,312✔
442
}
2,312✔
443

444

445
void Connection::websocket_connected_handler(const std::string& protocol)
446
{
3,280✔
447
    if (!protocol.empty()) {
3,280✔
448
        std::string_view expected_prefix =
3,280✔
449
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
2,988✔
450
        // FIXME: Use std::string_view::begins_with() in C++20.
1,590✔
451
        auto prefix_matches = [&](std::string_view other) {
3,280✔
452
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,280✔
453
        };
3,280✔
454
        if (prefix_matches(expected_prefix)) {
3,280✔
455
            util::MemoryInputStream in;
3,280✔
456
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,280✔
457
            in.imbue(std::locale::classic());
3,280✔
458
            in.unsetf(std::ios_base::skipws);
3,280✔
459
            int value_2 = 0;
3,280✔
460
            in >> value_2;
3,280✔
461
            if (in && in.eof() && value_2 >= 0) {
3,280✔
462
                bool good_version =
3,280✔
463
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,280✔
464
                if (good_version) {
3,280✔
465
                    logger.detail("Negotiated protocol version: %1", value_2);
3,280✔
466
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
1,590✔
467
                    // will provide the appservices connection ID via a log message.
1,590✔
468
                    // TODO: Remove once the server starts sending the connection ID
1,590✔
469
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,280✔
470
                    m_negotiated_protocol_version = value_2;
3,280✔
471
                    handle_connection_established(); // Throws
3,280✔
472
                    return;
3,280✔
473
                }
3,280✔
474
            }
×
475
        }
3,280✔
476
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
477
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
NEW
478
                                       IsFatal{true}, ProtocolErrorInfo::Action::ProtocolViolation,
×
NEW
479
                                       ConnectionTerminationReason::bad_headers_in_http_response);
×
480
    }
×
481
    else {
×
482
        close_due_to_client_side_error(
×
483
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
NEW
484
            ProtocolErrorInfo::Action::ProtocolViolation, ConnectionTerminationReason::bad_headers_in_http_response);
×
485
    }
×
486
}
3,280✔
487

488

489
bool Connection::websocket_binary_message_received(util::Span<const char> data)
490
{
75,138✔
491
    if (m_force_closed) {
75,138✔
492
        logger.debug("Received binary message after connection was force closed");
×
493
        return false;
×
494
    }
×
495

38,704✔
496
    using sf = SimulatedFailure;
75,138✔
497
    if (sf::check_trigger(sf::sync_client__read_head)) {
75,138✔
498
        close_due_to_client_side_error(
432✔
499
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
432✔
500
            ProtocolErrorInfo::Action::Transient, ConnectionTerminationReason::read_or_write_error);
432✔
501
        return bool(m_websocket);
432✔
502
    }
432✔
503

38,478✔
504
    handle_message_received(data);
74,706✔
505
    return bool(m_websocket);
74,706✔
506
}
74,706✔
507

508

509
void Connection::websocket_error_handler()
510
{
522✔
511
    m_websocket_error_received = true;
522✔
512
}
522✔
513

514
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
515
{
646✔
516
    if (m_force_closed) {
646✔
517
        logger.debug("Received websocket close message after connection was force closed");
×
518
        return false;
×
519
    }
×
520
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
646✔
521

330✔
522
    switch (error_code) {
646✔
523
        case WebSocketError::websocket_ok:
72✔
524
            break;
72✔
525
        case WebSocketError::websocket_resolve_failed:
4✔
526
            [[fallthrough]];
4✔
527
        case WebSocketError::websocket_connection_failed: {
4✔
528
            close_due_to_client_side_error(
4✔
529
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false},
4✔
530
                ProtocolErrorInfo::Action::Warning, ConnectionTerminationReason::connect_operation_failed);
4✔
531
            break;
4✔
532
        }
4✔
533
        case WebSocketError::websocket_read_error:
508✔
534
            [[fallthrough]];
508✔
535
        case WebSocketError::websocket_write_error: {
508✔
536
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
508✔
537
                                         ConnectionTerminationReason::read_or_write_error);
508✔
538
            break;
508✔
539
        }
508✔
540
        case WebSocketError::websocket_going_away:
260✔
541
            [[fallthrough]];
×
542
        case WebSocketError::websocket_protocol_error:
✔
543
            [[fallthrough]];
×
544
        case WebSocketError::websocket_unsupported_data:
✔
545
            [[fallthrough]];
×
546
        case WebSocketError::websocket_invalid_payload_data:
✔
547
            [[fallthrough]];
×
548
        case WebSocketError::websocket_policy_violation:
✔
549
            [[fallthrough]];
×
550
        case WebSocketError::websocket_reserved:
✔
551
            [[fallthrough]];
×
552
        case WebSocketError::websocket_no_status_received:
✔
553
            [[fallthrough]];
×
554
        case WebSocketError::websocket_invalid_extension: {
✔
555
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
NEW
556
                                           ProtocolErrorInfo::Action::ProtocolViolation,
×
557
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
558
            break;
×
559
        }
×
560
        case WebSocketError::websocket_message_too_big: {
4✔
561
            auto message = util::format(
4✔
562
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
563
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false},
4✔
564
                                        ProtocolErrorInfo::Action::ClientReset);
4✔
565
            involuntary_disconnect(std::move(error_info),
4✔
566
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
4✔
567
            break;
4✔
568
        }
×
569
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
570
            close_due_to_client_side_error(
10✔
571
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
572
                ProtocolErrorInfo::Action::Warning,
10✔
573
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
10✔
574
            break;
10✔
575
        }
×
576
        case WebSocketError::websocket_client_too_old:
✔
577
            [[fallthrough]];
×
578
        case WebSocketError::websocket_client_too_new:
✔
579
            [[fallthrough]];
×
580
        case WebSocketError::websocket_protocol_mismatch: {
✔
581
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
NEW
582
                                           ProtocolErrorInfo::Action::ProtocolViolation,
×
583
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
584
            break;
×
585
        }
×
586
        case WebSocketError::websocket_fatal_error: {
✔
NEW
587
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{true},
×
NEW
588
                                                    ProtocolErrorInfo::Action::ApplicationBug),
×
589
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
590
            break;
×
591
        }
×
592
        case WebSocketError::websocket_forbidden: {
✔
NEW
593
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true},
×
NEW
594
                                        ProtocolErrorInfo::Action::LogOutUser);
×
595
            involuntary_disconnect(std::move(error_info),
×
596
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
597
            break;
×
598
        }
×
599
        case WebSocketError::websocket_unauthorized: {
36✔
600
            SessionErrorInfo error_info(
36✔
601
                {ErrorCodes::AuthError,
36✔
602
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
36✔
603
                IsFatal{false}, ProtocolErrorInfo::Action::RefreshUser);
36✔
604
            involuntary_disconnect(std::move(error_info),
36✔
605
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
36✔
606
            break;
36✔
607
        }
×
608
        case WebSocketError::websocket_moved_permanently: {
12✔
609
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false},
12✔
610
                                        ProtocolErrorInfo::Action::RefreshLocation);
12✔
611
            involuntary_disconnect(std::move(error_info),
12✔
612
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
12✔
613
            break;
12✔
614
        }
×
615
        case WebSocketError::websocket_abnormal_closure: {
✔
NEW
616
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false},
×
NEW
617
                                        ProtocolErrorInfo::Action::RefreshUser);
×
618
            involuntary_disconnect(std::move(error_info),
×
619
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
620
            break;
×
621
        }
×
622
        case WebSocketError::websocket_internal_server_error:
✔
623
            [[fallthrough]];
×
624
        case WebSocketError::websocket_retry_error: {
✔
NEW
625
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false},
×
NEW
626
                                                    ProtocolErrorInfo::Action::Transient),
×
627
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
628
            break;
×
629
        }
646✔
630
    }
646✔
631

330✔
632
    return bool(m_websocket);
646✔
633
}
646✔
634

635
// Guarantees that handle_reconnect_wait() is never called from within the
636
// execution of initiate_reconnect_wait() (no callback reentrance).
637
void Connection::initiate_reconnect_wait()
638
{
7,610✔
639
    REALM_ASSERT(m_activated);
7,610✔
640
    REALM_ASSERT(!m_reconnect_delay_in_progress);
7,610✔
641
    REALM_ASSERT(!m_disconnect_delay_in_progress);
7,610✔
642

3,720✔
643
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
3,720✔
644
    if (m_force_closed) {
7,610✔
645
        return;
2,216✔
646
    }
2,216✔
647

2,680✔
648
    m_reconnect_delay_in_progress = true;
5,394✔
649
    auto delay = m_reconnect_info.delay_interval();
5,394✔
650
    if (delay == std::chrono::milliseconds::max()) {
5,394✔
651
        logger.detail("Reconnection delayed indefinitely"); // Throws
920✔
652
        // Not actually starting a timer corresponds to an infinite wait
476✔
653
        m_nonzero_reconnect_delay = true;
920✔
654
        return;
920✔
655
    }
920✔
656

2,204✔
657
    if (delay == std::chrono::milliseconds::zero()) {
4,474✔
658
        m_nonzero_reconnect_delay = false;
4,248✔
659
    }
4,248✔
660
    else {
226✔
661
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
226✔
662
        m_nonzero_reconnect_delay = true;
226✔
663
    }
226✔
664

2,204✔
665
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
2,204✔
666
    // we need it to be cancelable in case the connection is terminated before the timer
2,204✔
667
    // callback is run.
2,204✔
668
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
4,480✔
669
        // If the operation is aborted, the connection object may have been
2,204✔
670
        // destroyed.
2,204✔
671
        if (status != ErrorCodes::OperationAborted)
4,480✔
672
            handle_reconnect_wait(status); // Throws
3,366✔
673
    });                                    // Throws
4,480✔
674
}
4,474✔
675

676

677
void Connection::handle_reconnect_wait(Status status)
678
{
3,366✔
679
    if (!status.is_ok()) {
3,366✔
680
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
681
        throw Exception(status);
×
682
    }
×
683

1,632✔
684
    REALM_ASSERT(m_reconnect_delay_in_progress);
3,366✔
685
    m_reconnect_delay_in_progress = false;
3,366✔
686

1,632✔
687
    if (m_num_active_unsuspended_sessions > 0)
3,366✔
688
        initiate_reconnect(); // Throws
3,362✔
689
}
3,366✔
690

691
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
692
    explicit WebSocketObserverShim(Connection* conn)
693
        : conn(conn)
694
        , sentinel(conn->m_websocket_sentinel)
695
    {
3,362✔
696
    }
3,362✔
697

698
    Connection* conn;
699
    util::bind_ptr<LifecycleSentinel> sentinel;
700

701
    void websocket_connected_handler(const std::string& protocol) override
702
    {
3,280✔
703
        if (sentinel->destroyed) {
3,280✔
704
            return;
×
705
        }
×
706

1,590✔
707
        return conn->websocket_connected_handler(protocol);
3,280✔
708
    }
3,280✔
709

710
    void websocket_error_handler() override
711
    {
522✔
712
        if (sentinel->destroyed) {
522✔
713
            return;
×
714
        }
×
715

268✔
716
        conn->websocket_error_handler();
522✔
717
    }
522✔
718

719
    bool websocket_binary_message_received(util::Span<const char> data) override
720
    {
75,136✔
721
        if (sentinel->destroyed) {
75,136✔
722
            return false;
×
723
        }
×
724

38,704✔
725
        return conn->websocket_binary_message_received(data);
75,136✔
726
    }
75,136✔
727

728
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
729
    {
646✔
730
        if (sentinel->destroyed) {
646✔
731
            return true;
×
732
        }
×
733

330✔
734
        return conn->websocket_closed_handler(was_clean, error_code, msg);
646✔
735
    }
646✔
736
};
737

738
void Connection::initiate_reconnect()
739
{
3,362✔
740
    REALM_ASSERT(m_activated);
3,362✔
741

1,630✔
742
    m_state = ConnectionState::connecting;
3,362✔
743
    report_connection_state_change(ConnectionState::connecting); // Throws
3,362✔
744
    if (m_websocket_sentinel) {
3,362✔
745
        m_websocket_sentinel->destroyed = true;
×
746
    }
×
747
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
3,362✔
748
    m_websocket.reset();
3,362✔
749

1,630✔
750
    // Watchdog
1,630✔
751
    initiate_connect_wait(); // Throws
3,362✔
752

1,630✔
753
    std::vector<std::string> sec_websocket_protocol;
3,362✔
754
    {
3,362✔
755
        auto protocol_prefix =
3,362✔
756
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,064✔
757
        int min = get_oldest_supported_protocol_version();
3,362✔
758
        int max = get_current_protocol_version();
3,362✔
759
        REALM_ASSERT_3(min, <=, max);
3,362✔
760
        // List protocol version in descending order to ensure that the server
1,630✔
761
        // selects the highest possible version.
1,630✔
762
        for (int version = max; version >= min; --version) {
33,592✔
763
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
30,230✔
764
        }
30,230✔
765
    }
3,362✔
766

1,630✔
767
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
3,362✔
768
                m_server_endpoint.port, m_http_request_path_prefix);
3,362✔
769

1,630✔
770
    m_websocket_error_received = false;
3,362✔
771
    m_websocket =
3,362✔
772
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,362✔
773
                                            WebSocketEndpoint{
3,362✔
774
                                                m_server_endpoint.address,
3,362✔
775
                                                m_server_endpoint.port,
3,362✔
776
                                                get_http_request_path(),
3,362✔
777
                                                std::move(sec_websocket_protocol),
3,362✔
778
                                                is_ssl(m_server_endpoint.envelope),
3,362✔
779
                                                /// DEPRECATED - The following will be removed in a future release
1,630✔
780
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,362✔
781
                                                m_verify_servers_ssl_certificate,
3,362✔
782
                                                m_ssl_trust_certificate_path,
3,362✔
783
                                                m_ssl_verify_callback,
3,362✔
784
                                                m_proxy_config,
3,362✔
785
                                            });
3,362✔
786
}
3,362✔
787

788

789
void Connection::initiate_connect_wait()
790
{
3,362✔
791
    // Deploy a watchdog to enforce an upper bound on the time it can take to
1,630✔
792
    // fully establish the connection (including SSL and WebSocket
1,630✔
793
    // handshakes). Without such a watchdog, connect operations could take very
1,630✔
794
    // long, or even indefinite time.
1,630✔
795
    milliseconds_type time = m_client.m_connect_timeout;
3,362✔
796

1,630✔
797
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
3,362✔
798
        // If the operation is aborted, the connection object may have been
1,630✔
799
        // destroyed.
1,630✔
800
        if (status != ErrorCodes::OperationAborted)
3,362✔
801
            handle_connect_wait(status); // Throws
×
802
    });                                  // Throws
3,362✔
803
}
3,362✔
804

805

806
void Connection::handle_connect_wait(Status status)
807
{
×
808
    if (!status.is_ok()) {
×
809
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
810
        throw Exception(status);
×
811
    }
×
812

813
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
814
    logger.info("Connect timeout"); // Throws
×
815
    involuntary_disconnect(
×
816
        SessionErrorInfo{Status{ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
×
NEW
817
                         IsFatal{false}, ProtocolErrorInfo::Action::Transient},
×
818
        ConnectionTerminationReason::sync_connect_timeout); // Throws
×
819
}
×
820

821

822
void Connection::handle_connection_established()
823
{
3,280✔
824
    // Cancel connect timeout watchdog
1,590✔
825
    m_connect_timer.reset();
3,280✔
826

1,590✔
827
    m_state = ConnectionState::connected;
3,280✔
828

1,590✔
829
    milliseconds_type now = monotonic_clock_now();
3,280✔
830
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
3,280✔
831
    initiate_ping_delay(now);     // Throws
3,280✔
832

1,590✔
833
    bool fast_reconnect = false;
3,280✔
834
    if (m_disconnect_has_occurred) {
3,280✔
835
        milliseconds_type time = now - m_disconnect_time;
944✔
836
        if (time <= m_client.m_fast_reconnect_limit)
944✔
837
            fast_reconnect = true;
944✔
838
    }
944✔
839

1,590✔
840
    for (auto& p : m_sessions) {
4,302✔
841
        Session& sess = *p.second;
4,302✔
842
        sess.connection_established(fast_reconnect); // Throws
4,302✔
843
    }
4,302✔
844

1,590✔
845
    report_connection_state_change(ConnectionState::connected); // Throws
3,280✔
846
}
3,280✔
847

848

849
void Connection::schedule_urgent_ping()
850
{
204✔
851
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
204✔
852
    if (m_ping_delay_in_progress) {
204✔
853
        m_heartbeat_timer.reset();
166✔
854
        m_ping_delay_in_progress = false;
166✔
855
        m_minimize_next_ping_delay = true;
166✔
856
        milliseconds_type now = monotonic_clock_now();
166✔
857
        initiate_ping_delay(now); // Throws
166✔
858
        return;
166✔
859
    }
166✔
860
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
38✔
861
    if (!m_send_ping)
38✔
862
        m_minimize_next_ping_delay = true;
38✔
863
}
38✔
864

865

866
void Connection::initiate_ping_delay(milliseconds_type now)
867
{
3,618✔
868
    REALM_ASSERT(!m_ping_delay_in_progress);
3,618✔
869
    REALM_ASSERT(!m_waiting_for_pong);
3,618✔
870
    REALM_ASSERT(!m_send_ping);
3,618✔
871

1,716✔
872
    milliseconds_type delay = 0;
3,618✔
873
    if (!m_minimize_next_ping_delay) {
3,618✔
874
        delay = m_client.m_ping_keepalive_period;
3,444✔
875
        // Make a randomized deduction of up to 10%, or up to 100% if this is
1,644✔
876
        // the first PING message to be sent since the connection was
1,644✔
877
        // established. The purpose of this randomized deduction is to reduce
1,644✔
878
        // the risk of many connections sending PING messages simultaneously to
1,644✔
879
        // the server.
1,644✔
880
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,334✔
881
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,444✔
882
        milliseconds_type randomized_deduction = distr(m_client.get_random());
3,444✔
883
        delay -= randomized_deduction;
3,444✔
884
        // Deduct the time spent waiting for PONG
1,644✔
885
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,444✔
886
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,444✔
887
        if (spent_time < delay) {
3,444✔
888
            delay -= spent_time;
3,436✔
889
        }
3,436✔
890
        else {
8✔
891
            delay = 0;
8✔
892
        }
8✔
893
    }
3,444✔
894
    else {
174✔
895
        m_minimize_next_ping_delay = false;
174✔
896
    }
174✔
897

1,716✔
898

1,716✔
899
    m_ping_delay_in_progress = true;
3,618✔
900

1,716✔
901
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,618✔
902
        if (status == ErrorCodes::OperationAborted)
3,618✔
903
            return;
3,432✔
904
        else if (!status.is_ok())
186✔
905
            throw Exception(status);
×
906

66✔
907
        handle_ping_delay();                                    // Throws
186✔
908
    });                                                         // Throws
186✔
909
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
3,618✔
910
}
3,618✔
911

912

913
void Connection::handle_ping_delay()
914
{
186✔
915
    REALM_ASSERT(m_ping_delay_in_progress);
186✔
916
    m_ping_delay_in_progress = false;
186✔
917
    m_send_ping = true;
186✔
918

66✔
919
    initiate_pong_timeout(); // Throws
186✔
920

66✔
921
    if (m_state == ConnectionState::connected && !m_sending)
186✔
922
        send_next_message(); // Throws
168✔
923
}
186✔
924

925

926
void Connection::initiate_pong_timeout()
927
{
186✔
928
    REALM_ASSERT(!m_ping_delay_in_progress);
186✔
929
    REALM_ASSERT(!m_waiting_for_pong);
186✔
930
    REALM_ASSERT(m_send_ping);
186✔
931

66✔
932
    m_waiting_for_pong = true;
186✔
933
    m_pong_wait_started_at = monotonic_clock_now();
186✔
934

66✔
935
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
186✔
936
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
186✔
937
        if (status == ErrorCodes::OperationAborted)
186✔
938
            return;
174✔
939
        else if (!status.is_ok())
12✔
940
            throw Exception(status);
×
941

6✔
942
        handle_pong_timeout(); // Throws
12✔
943
    });                        // Throws
12✔
944
}
186✔
945

946

947
void Connection::handle_pong_timeout()
948
{
12✔
949
    REALM_ASSERT(m_waiting_for_pong);
12✔
950
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
951
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
952
                                 ConnectionTerminationReason::pong_timeout);
12✔
953
}
12✔
954

955

956
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
957
{
93,202✔
958
    // Stop sending messages if an websocket error was received.
46,378✔
959
    if (m_websocket_error_received)
93,202✔
960
        return;
×
961

46,378✔
962
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
93,202✔
963
        if (sentinel->destroyed) {
93,090✔
964
            return;
1,498✔
965
        }
1,498✔
966
        if (!status.is_ok()) {
91,592✔
967
            if (status != ErrorCodes::Error::OperationAborted) {
×
968
                // Write errors will be handled by the websocket_write_error_handler() callback
969
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
970
            }
×
971
            return;
×
972
        }
×
973
        handle_write_message(); // Throws
91,592✔
974
    });                         // Throws
91,592✔
975
    m_sending_session = sess;
93,202✔
976
    m_sending = true;
93,202✔
977
}
93,202✔
978

979

980
void Connection::handle_write_message()
981
{
91,586✔
982
    m_sending_session->message_sent(); // Throws
91,586✔
983
    if (m_sending_session->m_state == Session::Deactivated) {
91,586✔
984
        finish_session_deactivation(m_sending_session);
108✔
985
    }
108✔
986
    m_sending_session = nullptr;
91,586✔
987
    m_sending = false;
91,586✔
988
    send_next_message(); // Throws
91,586✔
989
}
91,586✔
990

991

992
void Connection::send_next_message()
993
{
149,014✔
994
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
149,014✔
995
    REALM_ASSERT(!m_sending_session);
149,014✔
996
    REALM_ASSERT(!m_sending);
149,014✔
997
    if (m_send_ping) {
149,014✔
998
        send_ping(); // Throws
174✔
999
        return;
174✔
1000
    }
174✔
1001
    while (!m_sessions_enlisted_to_send.empty()) {
208,596✔
1002
        // The state of being connected is not supposed to be able to change
75,114✔
1003
        // across this loop thanks to the "no callback reentrance" guarantee
75,114✔
1004
        // provided by Websocket::async_write_text(), and friends.
75,114✔
1005
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
153,150✔
1006

75,114✔
1007
        Session& sess = *m_sessions_enlisted_to_send.front();
153,150✔
1008
        m_sessions_enlisted_to_send.pop_front();
153,150✔
1009
        sess.send_message(); // Throws
153,150✔
1010

75,114✔
1011
        if (sess.m_state == Session::Deactivated) {
153,150✔
1012
            finish_session_deactivation(&sess);
2,978✔
1013
        }
2,978✔
1014

75,114✔
1015
        // An enlisted session may choose to not send a message. In that case,
75,114✔
1016
        // we should pass the opportunity to the next enlisted session.
75,114✔
1017
        if (m_sending)
153,150✔
1018
            break;
93,394✔
1019
    }
153,150✔
1020
}
148,840✔
1021

1022

1023
void Connection::send_ping()
1024
{
174✔
1025
    REALM_ASSERT(!m_ping_delay_in_progress);
174✔
1026
    REALM_ASSERT(m_waiting_for_pong);
174✔
1027
    REALM_ASSERT(m_send_ping);
174✔
1028

60✔
1029
    m_send_ping = false;
174✔
1030
    if (m_reconnect_info.scheduled_reset)
174✔
1031
        m_ping_after_scheduled_reset_of_reconnect_info = true;
142✔
1032

60✔
1033
    m_last_ping_sent_at = monotonic_clock_now();
174✔
1034
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
174✔
1035
                 m_previous_ping_rtt); // Throws
174✔
1036

60✔
1037
    ClientProtocol& protocol = get_client_protocol();
174✔
1038
    OutputBuffer& out = get_output_buffer();
174✔
1039
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
174✔
1040
    initiate_write_ping(out);                                          // Throws
174✔
1041
    m_ping_sent = true;
174✔
1042
}
174✔
1043

1044

1045
void Connection::initiate_write_ping(const OutputBuffer& out)
1046
{
174✔
1047
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
174✔
1048
        if (sentinel->destroyed) {
174✔
1049
            return;
×
1050
        }
×
1051
        if (!status.is_ok()) {
174✔
1052
            if (status != ErrorCodes::Error::OperationAborted) {
×
1053
                // Write errors will be handled by the websocket_write_error_handler() callback
1054
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1055
            }
×
1056
            return;
×
1057
        }
×
1058
        handle_write_ping(); // Throws
174✔
1059
    });                      // Throws
174✔
1060
    m_sending = true;
174✔
1061
}
174✔
1062

1063

1064
void Connection::handle_write_ping()
1065
{
174✔
1066
    REALM_ASSERT(m_sending);
174✔
1067
    REALM_ASSERT(!m_sending_session);
174✔
1068
    m_sending = false;
174✔
1069
    send_next_message(); // Throws
174✔
1070
}
174✔
1071

1072

1073
void Connection::handle_message_received(util::Span<const char> data)
1074
{
74,698✔
1075
    // parse_message_received() parses the message and calls the proper handler
38,476✔
1076
    // on the Connection object (this).
38,476✔
1077
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
74,698✔
1078
}
74,698✔
1079

1080

1081
void Connection::initiate_disconnect_wait()
1082
{
4,382✔
1083
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,382✔
1084

2,086✔
1085
    if (m_disconnect_delay_in_progress) {
4,382✔
1086
        m_reconnect_disconnect_timer.reset();
2,092✔
1087
        m_disconnect_delay_in_progress = false;
2,092✔
1088
    }
2,092✔
1089

2,086✔
1090
    milliseconds_type time = m_client.m_connection_linger_time;
4,382✔
1091

2,086✔
1092
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
4,382✔
1093
        // If the operation is aborted, the connection object may have been
2,086✔
1094
        // destroyed.
2,086✔
1095
        if (status != ErrorCodes::OperationAborted)
4,382✔
1096
            handle_disconnect_wait(status); // Throws
14✔
1097
    });                                     // Throws
4,382✔
1098
    m_disconnect_delay_in_progress = true;
4,382✔
1099
}
4,382✔
1100

1101

1102
void Connection::handle_disconnect_wait(Status status)
1103
{
14✔
1104
    if (!status.is_ok()) {
14✔
1105
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1106
        throw Exception(status);
×
1107
    }
×
1108

8✔
1109
    m_disconnect_delay_in_progress = false;
14✔
1110

8✔
1111
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
14✔
1112
    if (m_num_active_unsuspended_sessions == 0) {
14✔
1113
        if (m_client.m_connection_linger_time > 0)
14✔
1114
            logger.detail("Linger time expired"); // Throws
2✔
1115
        voluntary_disconnect();                   // Throws
14✔
1116
        logger.info("Disconnected");              // Throws
14✔
1117
    }
14✔
1118
}
14✔
1119

1120

1121
void Connection::close_due_to_protocol_error(Status status)
1122
{
6✔
1123
    SessionErrorInfo error_info(std::move(status), IsFatal{true}, ProtocolErrorInfo::Action::ProtocolViolation);
6✔
1124
    involuntary_disconnect(std::move(error_info),
6✔
1125
                           ConnectionTerminationReason::sync_protocol_violation); // TProtocolErrorInfo:ohrows
6✔
1126
}
6✔
1127

1128

1129
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal,
1130
                                                ProtocolErrorInfo::Action error_action,
1131
                                                ConnectionTerminationReason reason)
1132
{
446✔
1133
    logger.info("Connection closed due to error: %1", status); // Throws
446✔
1134
    SessionErrorInfo info{std::move(status), is_fatal, error_action};
446✔
1135
    involuntary_disconnect(std::move(info), reason); // Throw
446✔
1136
}
446✔
1137

1138

1139
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
1140
{
520✔
1141
    logger.info("Connection closed due to transient error: %1", status); // Throws
520✔
1142
    SessionErrorInfo error_info{std::move(status), IsFatal{false}, ProtocolErrorInfo::Action::Transient};
520✔
1143

266✔
1144
    involuntary_disconnect(std::move(error_info), reason); // Throw
520✔
1145
}
520✔
1146

1147

1148
// Close connection due to error discovered on the server-side, and then
1149
// reported to the client by way of a connection-level ERROR message.
1150
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
1151
{
70✔
1152
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
70✔
1153
                int(error_code)); // Throws
70✔
1154

34✔
1155
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
44✔
1156
                                      : ConnectionTerminationReason::server_said_try_again_later;
60✔
1157
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
70✔
1158
                           reason); // Throws
70✔
1159
}
70✔
1160

1161

1162
void Connection::disconnect(const SessionErrorInfo& info)
1163
{
3,362✔
1164
    // Cancel connect timeout watchdog
1,630✔
1165
    m_connect_timer.reset();
3,362✔
1166

1,630✔
1167
    if (m_state == ConnectionState::connected) {
3,362✔
1168
        m_disconnect_time = monotonic_clock_now();
3,280✔
1169
        m_disconnect_has_occurred = true;
3,280✔
1170

1,590✔
1171
        // Sessions that are in the Deactivating state at this time can be
1,590✔
1172
        // immediately discarded, in part because they are no longer enlisted to
1,590✔
1173
        // send. Such sessions will be taken to the Deactivated state by
1,590✔
1174
        // Session::connection_lost(), and then they will be removed from
1,590✔
1175
        // `m_sessions`.
1,590✔
1176
        auto i = m_sessions.begin(), end = m_sessions.end();
3,280✔
1177
        while (i != end) {
6,920✔
1178
            // Prevent invalidation of the main iterator when erasing elements
1,864✔
1179
            auto j = i++;
3,640✔
1180
            Session& sess = *j->second;
3,640✔
1181
            sess.connection_lost(); // Throws
3,640✔
1182
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
3,640✔
1183
                m_sessions.erase(j);
1,636✔
1184
        }
3,640✔
1185
    }
3,280✔
1186

1,630✔
1187
    change_state_to_disconnected();
3,362✔
1188

1,630✔
1189
    m_ping_delay_in_progress = false;
3,362✔
1190
    m_waiting_for_pong = false;
3,362✔
1191
    m_send_ping = false;
3,362✔
1192
    m_minimize_next_ping_delay = false;
3,362✔
1193
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,362✔
1194
    m_ping_sent = false;
3,362✔
1195
    m_heartbeat_timer.reset();
3,362✔
1196
    m_previous_ping_rtt = 0;
3,362✔
1197

1,630✔
1198
    m_websocket_sentinel->destroyed = true;
3,362✔
1199
    m_websocket_sentinel.reset();
3,362✔
1200
    m_websocket.reset();
3,362✔
1201
    m_input_body_buffer.reset();
3,362✔
1202
    m_sending_session = nullptr;
3,362✔
1203
    m_sessions_enlisted_to_send.clear();
3,362✔
1204
    m_sending = false;
3,362✔
1205

1,630✔
1206
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,362✔
1207
    initiate_reconnect_wait();                                           // Throws
3,362✔
1208
}
3,362✔
1209

1210
bool Connection::is_flx_sync_connection() const noexcept
1211
{
105,936✔
1212
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
105,936✔
1213
}
105,936✔
1214

1215
void Connection::receive_pong(milliseconds_type timestamp)
1216
{
172✔
1217
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
172✔
1218

58✔
1219
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
172✔
1220
    if (REALM_UNLIKELY(!legal_at_this_time)) {
172✔
1221
        close_due_to_protocol_error(
×
1222
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1223
        return;
×
1224
    }
×
1225

58✔
1226
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
172✔
1227
        close_due_to_protocol_error(
×
1228
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1229
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1230
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1231
        return;
×
1232
    }
×
1233

58✔
1234
    milliseconds_type now = monotonic_clock_now();
172✔
1235
    milliseconds_type round_trip_time = now - timestamp;
172✔
1236
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
172✔
1237
    m_previous_ping_rtt = round_trip_time;
172✔
1238

58✔
1239
    // If this PONG message is a response to a PING mesage that was sent after
58✔
1240
    // the last invocation of cancel_reconnect_delay(), then the connection is
58✔
1241
    // still good, and we do not have to skip the next reconnect delay.
58✔
1242
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
172✔
1243
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
132✔
1244
        m_ping_after_scheduled_reset_of_reconnect_info = false;
132✔
1245
        m_reconnect_info.scheduled_reset = false;
132✔
1246
    }
132✔
1247

58✔
1248
    m_heartbeat_timer.reset();
172✔
1249
    m_waiting_for_pong = false;
172✔
1250

58✔
1251
    initiate_ping_delay(now); // Throws
172✔
1252

58✔
1253
    if (m_client.m_roundtrip_time_handler)
172✔
1254
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
×
1255
}
172✔
1256

1257
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
1258
{
68,722✔
1259
    if (session_ident == 0) {
68,722✔
1260
        return nullptr;
×
1261
    }
×
1262

35,582✔
1263
    auto* sess = get_session(session_ident);
68,722✔
1264
    if (REALM_LIKELY(sess)) {
68,724✔
1265
        return sess;
68,716✔
1266
    }
68,716✔
1267
    // Check the history to see if the message received was for a previous session
2,147,483,647✔
1268
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2,147,483,655✔
1269
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
2✔
1270
        close_due_to_protocol_error(
2✔
1271
            {ErrorCodes::SyncProtocolInvariantFailed,
2✔
1272
             util::format("Received message %1 for session iden %2 when that session never existed", message,
2✔
1273
                          session_ident)});
2✔
1274
    }
2✔
1275
    else {
2,147,483,653✔
1276
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,653✔
1277
                     session_ident); // Throws
2,147,483,653✔
1278
    }
2,147,483,653✔
1279
    return nullptr;
2,147,483,655✔
1280
}
2,147,483,655✔
1281

1282
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
1283
{
976✔
1284
    Session* sess = nullptr;
976✔
1285
    if (session_ident != 0) {
976✔
1286
        sess = find_and_validate_session(session_ident, "ERROR");
902✔
1287
        if (REALM_UNLIKELY(!sess)) {
902✔
1288
            return;
×
1289
        }
×
1290
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
902✔
1291
            close_due_to_protocol_error(std::move(status)); // Throws
×
1292
            return;
×
1293
        }
×
1294

474✔
1295
        if (sess->m_state == Session::Deactivated) {
902✔
1296
            finish_session_deactivation(sess);
2✔
1297
        }
2✔
1298
        return;
902✔
1299
    }
902✔
1300

36✔
1301
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
74✔
1302
                info.message, info.raw_error_code, info.is_fatal, session_ident,
74✔
1303
                info.server_requests_action); // Throws
74✔
1304

36✔
1305
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
74✔
1306
    if (REALM_LIKELY(known_error_code)) {
74✔
1307
        ProtocolError error_code = ProtocolError(info.raw_error_code);
70✔
1308
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
70✔
1309
            close_due_to_server_side_error(error_code, info); // Throws
70✔
1310
            return;
70✔
1311
        }
70✔
1312
        close_due_to_protocol_error(
×
1313
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1314
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1315
                          info.raw_error_code)});
×
1316
    }
×
1317
    else {
4✔
1318
        close_due_to_protocol_error(
4✔
1319
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1320
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
4✔
1321
    }
4✔
1322
}
74✔
1323

1324

1325
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
1326
                                             session_ident_type session_ident)
1327
{
16✔
1328
    if (session_ident == 0) {
16✔
1329
        return close_due_to_protocol_error(
×
1330
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
×
1331
    }
×
1332

8✔
1333
    if (!is_flx_sync_connection()) {
16✔
1334
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1335
                                            "Received a FLX query error message on a non-FLX sync connection"});
×
1336
    }
×
1337

8✔
1338
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
16✔
1339
    if (REALM_UNLIKELY(!sess)) {
16✔
1340
        return;
×
1341
    }
×
1342

8✔
1343
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
16✔
1344
        close_due_to_protocol_error(std::move(status));
×
1345
    }
×
1346
}
16✔
1347

1348

1349
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1350
{
3,338✔
1351
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,338✔
1352
    if (REALM_UNLIKELY(!sess)) {
3,338✔
1353
        return;
×
1354
    }
×
1355

1,566✔
1356
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
3,338✔
1357
        close_due_to_protocol_error(std::move(status)); // Throws
×
1358
}
3,338✔
1359

1360
void Connection::receive_download_message(session_ident_type session_ident, const SyncProgress& progress,
1361
                                          std::uint_fast64_t downloadable_bytes, int64_t query_version,
1362
                                          DownloadBatchState batch_state,
1363
                                          const ReceivedChangesets& received_changesets)
1364
{
44,790✔
1365
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
44,790✔
1366
    if (REALM_UNLIKELY(!sess)) {
44,790✔
1367
        return;
2✔
1368
    }
2✔
1369

23,858✔
1370
    if (auto status = sess->receive_download_message(progress, downloadable_bytes, batch_state, query_version,
44,788✔
1371
                                                     received_changesets);
44,788✔
1372
        !status.is_ok()) {
44,788✔
1373
        close_due_to_protocol_error(std::move(status));
×
1374
    }
×
1375
}
44,788✔
1376

1377
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
1378
{
15,678✔
1379
    Session* sess = find_and_validate_session(session_ident, "MARK");
15,678✔
1380
    if (REALM_UNLIKELY(!sess)) {
15,678✔
1381
        return;
×
1382
    }
×
1383

7,754✔
1384
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
15,678✔
UNCOV
1385
        close_due_to_protocol_error(std::move(status)); // Throws
×
1386
}
15,678✔
1387

1388

1389
void Connection::receive_unbound_message(session_ident_type session_ident)
1390
{
3,936✔
1391
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
3,936✔
1392
    if (REALM_UNLIKELY(!sess)) {
3,936✔
1393
        return;
×
1394
    }
×
1395

1,892✔
1396
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,936✔
1397
        close_due_to_protocol_error(std::move(status)); // Throws
×
1398
        return;
×
1399
    }
×
1400

1,892✔
1401
    if (sess->m_state == Session::Deactivated) {
3,936✔
1402
        finish_session_deactivation(sess);
3,936✔
1403
    }
3,936✔
1404
}
3,936✔
1405

1406

1407
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1408
                                               std::string_view body)
1409
{
60✔
1410
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
60✔
1411
    if (REALM_UNLIKELY(!sess)) {
60✔
1412
        return;
×
1413
    }
×
1414

30✔
1415
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
60✔
1416
        close_due_to_protocol_error(std::move(status));
×
1417
    }
×
1418
}
60✔
1419

1420

1421
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1422
                                            std::string_view message)
1423
{
5,736✔
1424
    std::string prefix;
5,736✔
1425
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
5,736✔
1426
        prefix = util::format("Server[%1]", m_appservices_coid);
5,736✔
1427
    }
5,736✔
1428
    else {
×
1429
        prefix = "Server";
×
1430
    }
×
1431

2,800✔
1432
    if (session_ident != 0) {
5,736✔
1433
        if (auto sess = get_session(session_ident)) {
3,870✔
1434
            sess->logger.log(level, "%1 log: %2", prefix, message);
3,870✔
1435
            return;
3,870✔
1436
        }
3,870✔
1437

1438
        logger.log(level, "%1 log for unknown session %2: %3", prefix, session_ident, message);
×
1439
        return;
×
1440
    }
×
1441

898✔
1442
    logger.log(level, "%1 log: %2", prefix, message);
1,866✔
1443
}
1,866✔
1444

1445

1446
void Connection::receive_appservices_request_id(std::string_view coid)
1447
{
5,146✔
1448
    // Only set once per connection
2,488✔
1449
    if (!coid.empty() && m_appservices_coid.empty()) {
5,146✔
1450
        m_appservices_coid = coid;
2,314✔
1451
        logger.info("Connected to app services with request id: \"%1\"", m_appservices_coid);
2,314✔
1452
    }
2,314✔
1453
}
5,146✔
1454

1455

1456
void Connection::handle_protocol_error(Status status)
1457
{
×
1458
    close_due_to_protocol_error(std::move(status));
×
1459
}
×
1460

1461

1462
// Sessions are guaranteed to be granted the opportunity to send a message in
1463
// the order that they enlist. Note that this is important to ensure
1464
// nonoverlapping communication with the server for consecutive sessions
1465
// associated with the same Realm file.
1466
//
1467
// CAUTION: The specified session may get destroyed before this function
1468
// returns, but only if its Session::send_message() puts it into the Deactivated
1469
// state.
1470
void Connection::enlist_to_send(Session* sess)
1471
{
154,836✔
1472
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
154,836✔
1473
    m_sessions_enlisted_to_send.push_back(sess); // Throws
154,836✔
1474
    if (!m_sending)
154,836✔
1475
        send_next_message(); // Throws
57,078✔
1476
}
154,836✔
1477

1478

1479
std::string Connection::get_active_appservices_connection_id()
1480
{
72✔
1481
    return m_appservices_coid;
72✔
1482
}
72✔
1483

1484
void Session::cancel_resumption_delay()
1485
{
4,038✔
1486
    REALM_ASSERT_EX(m_state == Active, m_state);
4,038✔
1487

2,108✔
1488
    if (!m_suspended)
4,038✔
1489
        return;
3,660✔
1490

214✔
1491
    m_suspended = false;
378✔
1492

214✔
1493
    logger.debug("Resumed"); // Throws
378✔
1494

214✔
1495
    if (unbind_process_complete())
378✔
1496
        initiate_rebind(); // Throws
378✔
1497

214✔
1498
    m_conn.one_more_active_unsuspended_session(); // Throws
378✔
1499

214✔
1500
    on_resumed(); // Throws
378✔
1501
}
378✔
1502

1503

1504
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
1505
                                                 std::vector<ProtocolErrorInfo>* out)
1506
{
21,848✔
1507
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
21,848✔
1508
        return;
21,804✔
1509
    }
21,804✔
1510

22✔
1511
#ifdef REALM_DEBUG
44✔
1512
    REALM_ASSERT_DEBUG(
44✔
1513
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
44✔
1514
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
44✔
1515
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
44✔
1516
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1517
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1518
                       }));
44✔
1519
#endif
44✔
1520

22✔
1521
    while (!m_pending_compensating_write_errors.empty() &&
88✔
1522
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1523
               changesets.back().version) {
44✔
1524
        auto& cur_error = m_pending_compensating_write_errors.front();
44✔
1525
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
44✔
1526
        out->push_back(std::move(cur_error));
44✔
1527
        m_pending_compensating_write_errors.pop_front();
44✔
1528
    }
44✔
1529
}
44✔
1530

1531

1532
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1533
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1534
                                   DownloadBatchState download_batch_state)
1535
{
42,626✔
1536
    auto& history = get_history();
42,626✔
1537
    if (received_changesets.empty()) {
42,626✔
1538
        if (download_batch_state == DownloadBatchState::MoreToCome) {
20,756✔
1539
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
1540
                                       "received empty download message that was not the last in batch",
×
1541
                                       ProtocolError::bad_progress);
×
1542
        }
×
1543
        history.set_sync_progress(progress, &downloadable_bytes, version_info); // Throws
20,756✔
1544
        return;
20,756✔
1545
    }
20,756✔
1546

11,958✔
1547
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
21,870✔
1548
    auto transact = get_db()->start_read();
21,870✔
1549
    history.integrate_server_changesets(
21,870✔
1550
        progress, &downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
21,870✔
1551
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
21,858✔
1552
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
21,848✔
1553
        }); // Throws
21,848✔
1554
    if (received_changesets.size() == 1) {
21,870✔
1555
        logger.debug("1 remote changeset integrated, producing client version %1",
15,004✔
1556
                     version_info.sync_version.version); // Throws
15,004✔
1557
    }
15,004✔
1558
    else {
6,866✔
1559
        logger.debug("%2 remote changesets integrated, producing client version %1",
6,866✔
1560
                     version_info.sync_version.version, received_changesets.size()); // Throws
6,866✔
1561
    }
6,866✔
1562

11,958✔
1563
    for (const auto& pending_error : pending_compensating_write_errors) {
11,980✔
1564
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1565
                    pending_error.compensating_write_rejected_client_version,
44✔
1566
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1567
        try {
44✔
1568
            on_connection_state_changed(
44✔
1569
                m_conn.get_state(),
44✔
1570
                SessionErrorInfo{pending_error,
44✔
1571
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
44✔
1572
                                                          pending_error.message)});
44✔
1573
        }
44✔
1574
        catch (...) {
22✔
1575
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
×
1576
        }
×
1577
    }
44✔
1578
}
21,870✔
1579

1580

1581
void Session::on_integration_failure(const IntegrationException& error)
1582
{
32✔
1583
    REALM_ASSERT_EX(m_state == Active, m_state);
32✔
1584
    REALM_ASSERT(!m_client_error && !m_error_to_send);
32✔
1585
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
32✔
1586

16✔
1587
    m_client_error = util::make_optional<IntegrationException>(error);
32✔
1588
    m_error_to_send = true;
32✔
1589

16✔
1590
    // Surface the error to the user otherwise is lost.
16✔
1591
    on_connection_state_changed(
32✔
1592
        m_conn.get_state(), SessionErrorInfo{error.to_status(), IsFatal{false}, ProtocolErrorInfo::Action::Warning});
32✔
1593

16✔
1594
    // Since the deactivation process has not been initiated, the UNBIND
16✔
1595
    // message cannot have been sent unless an ERROR message was received.
16✔
1596
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
32✔
1597
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
32✔
1598
        ensure_enlisted_to_send(); // Throws
32✔
1599
    }
32✔
1600
}
32✔
1601

1602
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1603
{
44,088✔
1604
    REALM_ASSERT_EX(m_state == Active, m_state);
44,088✔
1605
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
44,088✔
1606
    m_download_progress = progress.download;
44,088✔
1607
    bool upload_progressed = (progress.upload.client_version > m_progress.upload.client_version);
44,088✔
1608
    m_progress = progress;
44,088✔
1609
    if (upload_progressed) {
44,088✔
1610
        if (progress.upload.client_version > m_last_version_selected_for_upload) {
32,232✔
1611
            if (progress.upload.client_version > m_upload_progress.client_version)
13,608✔
1612
                m_upload_progress = progress.upload;
866✔
1613
            m_last_version_selected_for_upload = progress.upload.client_version;
13,608✔
1614
        }
13,608✔
1615

16,596✔
1616
        check_for_upload_completion();
32,232✔
1617
    }
32,232✔
1618

23,468✔
1619
    do_recognize_sync_version(client_version); // Allows upload process to resume
44,088✔
1620
    check_for_download_completion();           // Throws
44,088✔
1621

23,468✔
1622
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
23,468✔
1623
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
44,088✔
1624
        auto& flx_subscription_store = *get_flx_subscription_store();
2,380✔
1625
        get_migration_store()->create_subscriptions(flx_subscription_store);
2,380✔
1626
    }
2,380✔
1627

23,468✔
1628
    // Since the deactivation process has not been initiated, the UNBIND
23,468✔
1629
    // message cannot have been sent unless an ERROR message was received.
23,468✔
1630
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
44,088✔
1631
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
44,088✔
1632
        ensure_enlisted_to_send(); // Throws
44,084✔
1633
    }
44,084✔
1634
}
44,088✔
1635

1636

1637
Session::~Session()
1638
{
9,694✔
1639
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,668✔
1640
}
9,694✔
1641

1642

1643
std::string Session::make_logger_prefix(session_ident_type ident)
1644
{
9,694✔
1645
    std::ostringstream out;
9,694✔
1646
    out.imbue(std::locale::classic());
9,694✔
1647
    out << "Session[" << ident << "]: "; // Throws
9,694✔
1648
    return out.str();                    // Throws
9,694✔
1649
}
9,694✔
1650

1651

1652
void Session::activate()
1653
{
9,692✔
1654
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
9,692✔
1655

4,668✔
1656
    logger.debug("Activating"); // Throws
9,692✔
1657

4,668✔
1658
    bool has_pending_client_reset = false;
9,692✔
1659
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,694✔
1660
        bool file_exists = util::File::exists(get_realm_path());
9,694✔
1661
        m_performing_client_reset = get_client_reset_config().has_value();
9,694✔
1662

4,668✔
1663
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
9,694✔
1664
        if (!m_performing_client_reset) {
9,694✔
1665
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress,
9,350✔
1666
                                     &has_pending_client_reset); // Throws
9,350✔
1667
        }
9,350✔
1668
    }
9,694✔
1669
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
9,692✔
1670
                 m_client_file_ident.salt); // Throws
9,692✔
1671
    m_upload_progress = m_progress.upload;
9,692✔
1672
    m_last_version_selected_for_upload = m_upload_progress.client_version;
9,692✔
1673
    m_download_progress = m_progress.download;
9,692✔
1674
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
9,692✔
1675

4,668✔
1676
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
9,692✔
1677
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
9,692✔
1678
    logger.debug("progress_download_client_version = %1",
9,692✔
1679
                 m_progress.download.last_integrated_client_version);                                      // Throws
9,692✔
1680
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
9,692✔
1681
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
9,692✔
1682

4,668✔
1683
    reset_protocol_state();
9,692✔
1684
    m_state = Active;
9,692✔
1685

4,668✔
1686
    REALM_ASSERT(!m_suspended);
9,692✔
1687
    m_conn.one_more_active_unsuspended_session(); // Throws
9,692✔
1688

4,668✔
1689
    try {
9,692✔
1690
        process_pending_flx_bootstrap();
9,692✔
1691
    }
9,692✔
1692
    catch (const IntegrationException& error) {
4,670✔
1693
        logger.error("Error integrating bootstrap changesets: %1", error.what());
4✔
1694
        m_suspended = true;
4✔
1695
        m_conn.one_less_active_unsuspended_session(); // Throws
4✔
1696
        on_suspended(SessionErrorInfo{Status{error.code(), error.what()}, IsFatal{true},
4✔
1697
                                      ProtocolErrorInfo::Action::ApplicationBug});
4✔
1698
    }
4✔
1699

4,668✔
1700
    if (has_pending_client_reset) {
9,694✔
1701
        handle_pending_client_reset_acknowledgement();
18✔
1702
    }
18✔
1703
}
9,694✔
1704

1705

1706
// The caller (Connection) must discard the session if the session has become
1707
// deactivated upon return.
1708
void Session::initiate_deactivation()
1709
{
9,694✔
1710
    REALM_ASSERT_EX(m_state == Active, m_state);
9,694✔
1711

4,668✔
1712
    logger.debug("Initiating deactivation"); // Throws
9,694✔
1713

4,668✔
1714
    m_state = Deactivating;
9,694✔
1715

4,668✔
1716
    if (!m_suspended)
9,694✔
1717
        m_conn.one_less_active_unsuspended_session(); // Throws
9,148✔
1718

4,668✔
1719
    if (m_enlisted_to_send) {
9,694✔
1720
        REALM_ASSERT(!unbind_process_complete());
5,196✔
1721
        return;
5,196✔
1722
    }
5,196✔
1723

2,158✔
1724
    // Deactivate immediately if the BIND message has not yet been sent and the
2,158✔
1725
    // session is not enlisted to send, or if the unbinding process has already
2,158✔
1726
    // completed.
2,158✔
1727
    if (!m_bind_message_sent || unbind_process_complete()) {
4,498✔
1728
        complete_deactivation(); // Throws
1,034✔
1729
        // Life cycle state is now Deactivated
466✔
1730
        return;
1,034✔
1731
    }
1,034✔
1732

1,692✔
1733
    // Ready to send the UNBIND message, if it has not already been sent
1,692✔
1734
    if (!m_unbind_message_sent) {
3,464✔
1735
        enlist_to_send(); // Throws
3,288✔
1736
        return;
3,288✔
1737
    }
3,288✔
1738
}
3,464✔
1739

1740

1741
void Session::complete_deactivation()
1742
{
9,694✔
1743
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
9,694✔
1744
    m_state = Deactivated;
9,694✔
1745

4,668✔
1746
    logger.debug("Deactivation completed"); // Throws
9,694✔
1747
}
9,694✔
1748

1749

1750
// Called by the associated Connection object when this session is granted an
1751
// opportunity to send a message.
1752
//
1753
// The caller (Connection) must discard the session if the session has become
1754
// deactivated upon return.
1755
void Session::send_message()
1756
{
153,144✔
1757
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
153,144✔
1758
    REALM_ASSERT(m_enlisted_to_send);
153,144✔
1759
    m_enlisted_to_send = false;
153,144✔
1760
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
153,144✔
1761
        // Deactivation has been initiated. If the UNBIND message has not been
4,336✔
1762
        // sent yet, there is no point in sending it. Instead, we can let the
4,336✔
1763
        // deactivation process complete.
4,336✔
1764
        if (!m_bind_message_sent) {
8,968✔
1765
            return complete_deactivation(); // Throws
2,978✔
1766
            // Life cycle state is now Deactivated
1,424✔
1767
        }
2,978✔
1768

2,912✔
1769
        // Session life cycle state is Deactivating or the unbinding process has
2,912✔
1770
        // been initiated by a session specific ERROR message
2,912✔
1771
        if (!m_unbind_message_sent)
5,990✔
1772
            send_unbind_message(); // Throws
5,990✔
1773
        return;
5,990✔
1774
    }
5,990✔
1775

70,778✔
1776
    // Session life cycle state is Active and the unbinding process has
70,778✔
1777
    // not been initiated
70,778✔
1778
    REALM_ASSERT(!m_unbind_message_sent);
144,176✔
1779

70,778✔
1780
    if (!m_bind_message_sent)
144,176✔
1781
        return send_bind_message(); // Throws
8,338✔
1782

66,630✔
1783
    if (!m_ident_message_sent) {
135,838✔
1784
        if (have_client_file_ident())
6,672✔
1785
            send_ident_message(); // Throws
6,672✔
1786
        return;
6,672✔
1787
    }
6,672✔
1788

63,342✔
1789
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
129,166✔
1790
                                                      [](const PendingTestCommand& command) {
63,402✔
1791
                                                          return command.pending;
124✔
1792
                                                      });
124✔
1793
    if (has_pending_test_command) {
129,166✔
1794
        return send_test_command_message();
60✔
1795
    }
60✔
1796

63,312✔
1797
    if (m_error_to_send)
129,106✔
1798
        return send_json_error_message(); // Throws
28✔
1799

63,300✔
1800
    // Stop sending upload, mark and query messages when the client detects an error.
63,300✔
1801
    if (m_client_error) {
129,078✔
1802
        return;
14✔
1803
    }
14✔
1804

63,292✔
1805
    if (m_target_download_mark > m_last_download_mark_sent)
129,064✔
1806
        return send_mark_message(); // Throws
16,364✔
1807

55,190✔
1808
    auto is_upload_allowed = [&]() -> bool {
112,712✔
1809
        if (!m_is_flx_sync_session) {
112,710✔
1810
            return true;
101,510✔
1811
        }
101,510✔
1812

5,876✔
1813
        auto migration_store = get_migration_store();
11,200✔
1814
        if (!migration_store) {
11,200✔
1815
            return true;
×
1816
        }
×
1817

5,876✔
1818
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
11,200✔
1819
        if (!sentinel_query_version) {
11,200✔
1820
            return true;
11,170✔
1821
        }
11,170✔
1822

16✔
1823
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
16✔
1824
        return m_last_sent_flx_query_version != *sentinel_query_version;
30✔
1825
    };
30✔
1826

55,190✔
1827
    if (!is_upload_allowed()) {
112,700✔
1828
        return;
16✔
1829
    }
16✔
1830

55,182✔
1831
    auto check_pending_flx_version = [&]() -> bool {
112,698✔
1832
        if (!m_is_flx_sync_session) {
112,696✔
1833
            return false;
101,510✔
1834
        }
101,510✔
1835

5,868✔
1836
        if (!m_allow_upload) {
11,186✔
1837
            return false;
2,242✔
1838
        }
2,242✔
1839

4,614✔
1840
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
8,944✔
1841

4,614✔
1842
        if (!m_pending_flx_sub_set) {
8,944✔
1843
            return false;
7,390✔
1844
        }
7,390✔
1845

778✔
1846
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,554✔
1847
    };
1,554✔
1848

55,182✔
1849
    if (check_pending_flx_version()) {
112,684✔
1850
        return send_query_change_message(); // throws
890✔
1851
    }
890✔
1852

54,736✔
1853
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
111,794✔
1854
        return send_upload_message(); // Throws
55,050✔
1855
    }
55,050✔
1856
}
111,794✔
1857

1858

1859
void Session::send_bind_message()
1860
{
8,338✔
1861
    REALM_ASSERT_EX(m_state == Active, m_state);
8,338✔
1862

4,148✔
1863
    session_ident_type session_ident = m_ident;
8,338✔
1864
    bool need_client_file_ident = !have_client_file_ident();
8,338✔
1865
    const bool is_subserver = false;
8,338✔
1866

4,148✔
1867

4,148✔
1868
    ClientProtocol& protocol = m_conn.get_client_protocol();
8,338✔
1869
    int protocol_version = m_conn.get_negotiated_protocol_version();
8,338✔
1870
    OutputBuffer& out = m_conn.get_output_buffer();
8,338✔
1871
    // Discard the token since it's ignored by the server.
4,148✔
1872
    std::string empty_access_token;
8,338✔
1873
    if (m_is_flx_sync_session) {
8,338✔
1874
        nlohmann::json bind_json_data;
1,418✔
1875
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,418✔
1876
            bind_json_data["migratedPartition"] = *migrated_partition;
60✔
1877
        }
60✔
1878
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,418✔
1879
        if (logger.would_log(util::Logger::Level::debug)) {
1,418✔
1880
            std::string json_data_dump;
1,418✔
1881
            if (!bind_json_data.empty()) {
1,418✔
1882
                json_data_dump = bind_json_data.dump();
1,418✔
1883
            }
1,418✔
1884
            logger.debug(
1,418✔
1885
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
1,418✔
1886
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
1,418✔
1887
        }
1,418✔
1888
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
1,418✔
1889
                                       need_client_file_ident, is_subserver); // Throws
1,418✔
1890
    }
1,418✔
1891
    else {
6,920✔
1892
        std::string server_path = get_virt_path();
6,920✔
1893
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
6,920✔
1894
                     session_ident, need_client_file_ident, is_subserver, server_path);
6,920✔
1895
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
6,920✔
1896
                                       need_client_file_ident, is_subserver); // Throws
6,920✔
1897
    }
6,920✔
1898
    m_conn.initiate_write_message(out, this); // Throws
8,338✔
1899

4,148✔
1900
    m_bind_message_sent = true;
8,338✔
1901

4,148✔
1902
    // Ready to send the IDENT message if the file identifier pair is already
4,148✔
1903
    // available.
4,148✔
1904
    if (!need_client_file_ident)
8,338✔
1905
        enlist_to_send(); // Throws
3,508✔
1906
}
8,338✔
1907

1908

1909
void Session::send_ident_message()
1910
{
6,672✔
1911
    REALM_ASSERT_EX(m_state == Active, m_state);
6,672✔
1912
    REALM_ASSERT(m_bind_message_sent);
6,672✔
1913
    REALM_ASSERT(!m_unbind_message_sent);
6,672✔
1914
    REALM_ASSERT(have_client_file_ident());
6,672✔
1915

3,288✔
1916

3,288✔
1917
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,672✔
1918
    OutputBuffer& out = m_conn.get_output_buffer();
6,672✔
1919
    session_ident_type session_ident = m_ident;
6,672✔
1920

3,288✔
1921
    if (m_is_flx_sync_session) {
6,672✔
1922
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,026✔
1923
        const auto active_query_body = active_query_set.to_ext_json();
1,026✔
1924
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,026✔
1925
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,026✔
1926
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,026✔
1927
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,026✔
1928
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
1,026✔
1929
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
1,026✔
1930
                     active_query_body); // Throws
1,026✔
1931
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
1,026✔
1932
                                        active_query_set.version(), active_query_body); // Throws
1,026✔
1933
        m_last_sent_flx_query_version = active_query_set.version();
1,026✔
1934
    }
1,026✔
1935
    else {
5,646✔
1936
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
5,646✔
1937
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
5,646✔
1938
                     "latest_server_version_salt=%6)",
5,646✔
1939
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
5,646✔
1940
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
5,646✔
1941
                     m_progress.latest_server_version.salt);                                  // Throws
5,646✔
1942
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
5,646✔
1943
    }
5,646✔
1944
    m_conn.initiate_write_message(out, this); // Throws
6,672✔
1945

3,288✔
1946
    m_ident_message_sent = true;
6,672✔
1947

3,288✔
1948
    // Other messages may be waiting to be sent
3,288✔
1949
    enlist_to_send(); // Throws
6,672✔
1950
}
6,672✔
1951

1952
void Session::send_query_change_message()
1953
{
890✔
1954
    REALM_ASSERT_EX(m_state == Active, m_state);
890✔
1955
    REALM_ASSERT(m_ident_message_sent);
890✔
1956
    REALM_ASSERT(!m_unbind_message_sent);
890✔
1957
    REALM_ASSERT(m_pending_flx_sub_set);
890✔
1958
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
890✔
1959

446✔
1960
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
890✔
1961
        return;
×
1962
    }
×
1963

446✔
1964
    auto sub_store = get_flx_subscription_store();
890✔
1965
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
890✔
1966
    auto latest_queries = latest_sub_set.to_ext_json();
890✔
1967
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
890✔
1968
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
890✔
1969

446✔
1970
    OutputBuffer& out = m_conn.get_output_buffer();
890✔
1971
    session_ident_type session_ident = get_ident();
890✔
1972
    ClientProtocol& protocol = m_conn.get_client_protocol();
890✔
1973
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
890✔
1974
    m_conn.initiate_write_message(out, this);
890✔
1975

446✔
1976
    m_last_sent_flx_query_version = latest_sub_set.version();
890✔
1977

446✔
1978
    request_download_completion_notification();
890✔
1979
}
890✔
1980

1981
void Session::send_upload_message()
1982
{
55,054✔
1983
    REALM_ASSERT_EX(m_state == Active, m_state);
55,054✔
1984
    REALM_ASSERT(m_ident_message_sent);
55,054✔
1985
    REALM_ASSERT(!m_unbind_message_sent);
55,054✔
1986

27,534✔
1987
    if (REALM_UNLIKELY(get_client().is_dry_run()))
55,054✔
1988
        return;
27,534✔
1989

27,534✔
1990
    version_type target_upload_version = m_last_version_available;
55,054✔
1991
    if (m_pending_flx_sub_set) {
55,054✔
1992
        REALM_ASSERT(m_is_flx_sync_session);
662✔
1993
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
662✔
1994
    }
662✔
1995

27,534✔
1996
    std::vector<UploadChangeset> uploadable_changesets;
55,054✔
1997
    version_type locked_server_version = 0;
55,054✔
1998
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
55,054✔
1999
                                             locked_server_version); // Throws
55,054✔
2000

27,534✔
2001
    if (uploadable_changesets.empty()) {
55,054✔
2002
        // Nothing more to upload right now
13,754✔
2003
        check_for_upload_completion(); // Throws
28,204✔
2004
        // If we need to limit upload up to some version other than the last client version available and there are no
13,754✔
2005
        // changes to upload, then there is no need to send an empty message.
13,754✔
2006
        if (m_pending_flx_sub_set) {
28,204✔
2007
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
188✔
2008
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
188✔
2009
            // Other messages may be waiting to be sent
94✔
2010
            return enlist_to_send(); // Throws
188✔
2011
        }
188✔
2012
    }
26,850✔
2013
    else {
26,850✔
2014
        m_last_version_selected_for_upload = uploadable_changesets.back().progress.client_version;
26,850✔
2015
    }
26,850✔
2016

27,534✔
2017
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
54,960✔
2018
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
474✔
2019
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
474✔
2020
    }
474✔
2021

27,440✔
2022
    version_type progress_client_version = m_upload_progress.client_version;
54,866✔
2023
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
54,866✔
2024

27,440✔
2025
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
54,866✔
2026
                 "locked_server_version=%3, num_changesets=%4)",
54,866✔
2027
                 progress_client_version, progress_server_version, locked_server_version,
54,866✔
2028
                 uploadable_changesets.size()); // Throws
54,866✔
2029

27,440✔
2030
    ClientProtocol& protocol = m_conn.get_client_protocol();
54,866✔
2031
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
54,866✔
2032

27,440✔
2033
    for (const UploadChangeset& uc : uploadable_changesets) {
49,354✔
2034
        logger.debug("Fetching changeset for upload (client_version=%1, server_version=%2, "
42,168✔
2035
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
42,168✔
2036
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
42,168✔
2037
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
42,168✔
2038
        if (logger.would_log(util::Logger::Level::trace)) {
42,168✔
2039
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2040
            if (changeset_data.size() < 1024) {
×
2041
                logger.trace("Changeset: %1",
×
2042
                             _impl::clamped_hex_dump(changeset_data)); // Throws
×
2043
            }
×
2044
            else {
×
2045
                logger.trace("Changeset(comp): %1 %2", changeset_data.size(),
×
2046
                             protocol.compressed_hex_dump(changeset_data));
×
2047
            }
×
2048

2049
#if REALM_DEBUG
×
2050
            ChunkedBinaryInputStream in{changeset_data};
×
2051
            Changeset log;
×
2052
            try {
×
2053
                parse_changeset(in, log);
×
2054
                std::stringstream ss;
×
2055
                log.print(ss);
×
2056
                logger.trace("Changeset (parsed):\n%1", ss.str());
×
2057
            }
×
2058
            catch (const BadChangesetError& err) {
×
2059
                logger.error("Unable to parse changeset: %1", err.what());
×
2060
            }
×
2061
#endif
×
2062
        }
×
2063

20,254✔
2064
#if 0 // Upload log compaction is currently not implemented
2065
        if (!get_client().m_disable_upload_compaction) {
2066
            ChangesetEncoder::Buffer encode_buffer;
2067

2068
            {
2069
                // Upload compaction only takes place within single changesets to
2070
                // avoid another client seeing inconsistent snapshots.
2071
                ChunkedBinaryInputStream stream{uc.changeset};
2072
                Changeset changeset;
2073
                parse_changeset(stream, changeset); // Throws
2074
                // FIXME: What is the point of setting these? How can compaction care about them?
2075
                changeset.version = uc.progress.client_version;
2076
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2077
                changeset.origin_timestamp = uc.origin_timestamp;
2078
                changeset.origin_file_ident = uc.origin_file_ident;
2079

2080
                compact_changesets(&changeset, 1);
2081
                encode_changeset(changeset, encode_buffer);
2082

2083
                logger.debug("Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2084
                             encode_buffer.size()); // Throws
2085
            }
2086

2087
            upload_message_builder.add_changeset(
2088
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
2089
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
2090
        }
2091
        else
2092
#endif
2093
        {
42,168✔
2094
            upload_message_builder.add_changeset(uc.progress.client_version,
42,168✔
2095
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
42,168✔
2096
                                                 uc.origin_file_ident,
42,168✔
2097
                                                 uc.changeset); // Throws
42,168✔
2098
        }
42,168✔
2099
    }
42,168✔
2100

27,440✔
2101
    int protocol_version = m_conn.get_negotiated_protocol_version();
54,866✔
2102
    OutputBuffer& out = m_conn.get_output_buffer();
54,866✔
2103
    session_ident_type session_ident = get_ident();
54,866✔
2104
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
54,866✔
2105
                                               progress_server_version,
54,866✔
2106
                                               locked_server_version); // Throws
54,866✔
2107
    m_conn.initiate_write_message(out, this);                          // Throws
54,866✔
2108

27,440✔
2109
    // Other messages may be waiting to be sent
27,440✔
2110
    enlist_to_send(); // Throws
54,866✔
2111
}
54,866✔
2112

2113

2114
void Session::send_mark_message()
2115
{
16,364✔
2116
    REALM_ASSERT_EX(m_state == Active, m_state);
16,364✔
2117
    REALM_ASSERT(m_ident_message_sent);
16,364✔
2118
    REALM_ASSERT(!m_unbind_message_sent);
16,364✔
2119
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
16,364✔
2120

8,102✔
2121
    request_ident_type request_ident = m_target_download_mark;
16,364✔
2122
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
16,364✔
2123

8,102✔
2124
    ClientProtocol& protocol = m_conn.get_client_protocol();
16,364✔
2125
    OutputBuffer& out = m_conn.get_output_buffer();
16,364✔
2126
    session_ident_type session_ident = get_ident();
16,364✔
2127
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
16,364✔
2128
    m_conn.initiate_write_message(out, this);                      // Throws
16,364✔
2129

8,102✔
2130
    m_last_download_mark_sent = request_ident;
16,364✔
2131

8,102✔
2132
    // Other messages may be waiting to be sent
8,102✔
2133
    enlist_to_send(); // Throws
16,364✔
2134
}
16,364✔
2135

2136

2137
void Session::send_unbind_message()
2138
{
5,990✔
2139
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
5,990✔
2140
    REALM_ASSERT(m_bind_message_sent);
5,990✔
2141
    REALM_ASSERT(!m_unbind_message_sent);
5,990✔
2142

2,912✔
2143
    logger.debug("Sending: UNBIND"); // Throws
5,990✔
2144

2,912✔
2145
    ClientProtocol& protocol = m_conn.get_client_protocol();
5,990✔
2146
    OutputBuffer& out = m_conn.get_output_buffer();
5,990✔
2147
    session_ident_type session_ident = get_ident();
5,990✔
2148
    protocol.make_unbind_message(out, session_ident); // Throws
5,990✔
2149
    m_conn.initiate_write_message(out, this);         // Throws
5,990✔
2150

2,912✔
2151
    m_unbind_message_sent = true;
5,990✔
2152
}
5,990✔
2153

2154

2155
void Session::send_json_error_message()
2156
{
28✔
2157
    REALM_ASSERT_EX(m_state == Active, m_state);
28✔
2158
    REALM_ASSERT(m_ident_message_sent);
28✔
2159
    REALM_ASSERT(!m_unbind_message_sent);
28✔
2160
    REALM_ASSERT(m_error_to_send);
28✔
2161
    REALM_ASSERT(m_client_error);
28✔
2162

12✔
2163
    ClientProtocol& protocol = m_conn.get_client_protocol();
28✔
2164
    OutputBuffer& out = m_conn.get_output_buffer();
28✔
2165
    session_ident_type session_ident = get_ident();
28✔
2166
    auto protocol_error = m_client_error->error_for_server;
28✔
2167

12✔
2168
    auto message = util::format("%1", m_client_error->to_status());
28✔
2169
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
28✔
2170
                session_ident); // Throws
28✔
2171

12✔
2172
    nlohmann::json error_body_json;
28✔
2173
    error_body_json["message"] = std::move(message);
28✔
2174
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
28✔
2175
                                     error_body_json.dump()); // Throws
28✔
2176
    m_conn.initiate_write_message(out, this);                 // Throws
28✔
2177

12✔
2178
    m_error_to_send = false;
28✔
2179
    enlist_to_send(); // Throws
28✔
2180
}
28✔
2181

2182

2183
void Session::send_test_command_message()
2184
{
60✔
2185
    REALM_ASSERT_EX(m_state == Active, m_state);
60✔
2186

30✔
2187
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2188
                           [](const PendingTestCommand& command) {
60✔
2189
                               return command.pending;
60✔
2190
                           });
60✔
2191
    REALM_ASSERT(it != m_pending_test_commands.end());
60✔
2192

30✔
2193
    ClientProtocol& protocol = m_conn.get_client_protocol();
60✔
2194
    OutputBuffer& out = m_conn.get_output_buffer();
60✔
2195
    auto session_ident = get_ident();
60✔
2196

30✔
2197
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
60✔
2198
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
60✔
2199

30✔
2200
    m_conn.initiate_write_message(out, this); // Throws;
60✔
2201
    it->pending = false;
60✔
2202

30✔
2203
    enlist_to_send();
60✔
2204
}
60✔
2205

2206
bool Session::client_reset_if_needed()
2207
{
3,260✔
2208
    // Regardless of what happens, once we return from this function we will
1,548✔
2209
    // no longer be in the middle of a client reset
1,548✔
2210
    m_performing_client_reset = false;
3,260✔
2211

1,548✔
2212
    // Even if we end up not actually performing a client reset, consume the
1,548✔
2213
    // config to ensure that the resources it holds are released
1,548✔
2214
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
3,260✔
2215
    if (!client_reset_config) {
3,260✔
2216
        return false;
2,916✔
2217
    }
2,916✔
2218

172✔
2219
    auto on_flx_version_complete = [this](int64_t version) {
344✔
2220
        this->on_flx_sync_version_complete(version);
232✔
2221
    };
232✔
2222
    bool did_reset = client_reset::perform_client_reset(
344✔
2223
        logger, *get_db(), *client_reset_config->fresh_copy, client_reset_config->mode,
344✔
2224
        std::move(client_reset_config->notify_before_client_reset),
344✔
2225
        std::move(client_reset_config->notify_after_client_reset), m_client_file_ident, get_flx_subscription_store(),
344✔
2226
        on_flx_version_complete, client_reset_config->recovery_is_allowed);
344✔
2227
    if (!did_reset) {
344✔
2228
        return false;
×
2229
    }
×
2230

172✔
2231
    // The fresh Realm has been used to reset the state
172✔
2232
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
344✔
2233

172✔
2234
    SaltedFileIdent client_file_ident;
344✔
2235
    bool has_pending_client_reset = false;
344✔
2236
    get_history().get_status(m_last_version_available, client_file_ident, m_progress,
344✔
2237
                             &has_pending_client_reset); // Throws
344✔
2238
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
344✔
2239
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
344✔
2240
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
344✔
2241
                    m_progress.download.last_integrated_client_version);
344✔
2242
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
344✔
2243
    REALM_ASSERT_EX(m_progress.upload.last_integrated_server_version == 0,
344✔
2244
                    m_progress.upload.last_integrated_server_version);
344✔
2245
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
344✔
2246

172✔
2247
    m_upload_progress = m_progress.upload;
344✔
2248
    m_download_progress = m_progress.download;
344✔
2249
    // In recovery mode, there may be new changesets to upload and nothing left to download.
172✔
2250
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
172✔
2251
    // For both, we want to allow uploads again without needing external changes to download first.
172✔
2252
    m_allow_upload = true;
344✔
2253
    REALM_ASSERT_EX(m_last_version_selected_for_upload == 0, m_last_version_selected_for_upload);
344✔
2254

172✔
2255
    if (has_pending_client_reset) {
344✔
2256
        handle_pending_client_reset_acknowledgement();
272✔
2257
    }
272✔
2258

172✔
2259
    update_subscription_version_info();
344✔
2260

172✔
2261
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
172✔
2262
    if (auto migration_store = get_migration_store()) {
344✔
2263
        migration_store->complete_migration_or_rollback();
244✔
2264
    }
244✔
2265

172✔
2266
    return true;
344✔
2267
}
344✔
2268

2269
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
2270
{
3,336✔
2271
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
3,336✔
2272
                 client_file_ident.salt); // Throws
3,336✔
2273

1,566✔
2274
    // Ignore the message if the deactivation process has been initiated,
1,566✔
2275
    // because in that case, the associated Realm and SessionWrapper must
1,566✔
2276
    // not be accessed any longer.
1,566✔
2277
    if (m_state != Active)
3,336✔
2278
        return Status::OK(); // Success
78✔
2279

1,548✔
2280
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,260✔
2281
                               !m_unbound_message_received);
3,258✔
2282
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,258✔
2283
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
×
2284
    }
×
2285
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
3,258✔
2286
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2287
    }
×
2288
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
3,258✔
2289
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
×
2290
    }
×
2291

1,548✔
2292
    m_client_file_ident = client_file_ident;
3,258✔
2293

1,548✔
2294
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,258✔
2295
        // Ready to send the IDENT message
2296
        ensure_enlisted_to_send(); // Throws
×
2297
        return Status::OK();       // Success
×
2298
    }
×
2299

1,548✔
2300
    // if a client reset happens, it will take care of setting the file ident
1,548✔
2301
    // and if not, we do it here
1,548✔
2302
    bool did_client_reset = false;
3,258✔
2303
    try {
3,258✔
2304
        did_client_reset = client_reset_if_needed();
3,258✔
2305
    }
3,258✔
2306
    catch (const std::exception& e) {
1,584✔
2307
        auto err_msg = util::format("A fatal error occurred during client reset: '%1'", e.what());
72✔
2308
        logger.error(err_msg.c_str());
72✔
2309
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true},
72✔
2310
                                  ProtocolErrorInfo::Action::BackupThenDeleteRealm);
72✔
2311
        suspend(err_info);
72✔
2312
        return Status::OK();
72✔
2313
    }
72✔
2314
    if (!did_client_reset) {
3,188✔
2315
        get_history().set_client_file_ident(client_file_ident,
2,916✔
2316
                                            m_fix_up_object_ids); // Throws
2,916✔
2317
        m_progress.download.last_integrated_client_version = 0;
2,916✔
2318
        m_progress.upload.client_version = 0;
2,916✔
2319
        m_last_version_selected_for_upload = 0;
2,916✔
2320
    }
2,916✔
2321

1,512✔
2322
    // Ready to send the IDENT message
1,512✔
2323
    ensure_enlisted_to_send(); // Throws
3,188✔
2324
    return Status::OK();       // Success
3,188✔
2325
}
3,188✔
2326

2327
Status Session::receive_download_message(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
2328
                                         DownloadBatchState batch_state, int64_t query_version,
2329
                                         const ReceivedChangesets& received_changesets)
2330
{
44,788✔
2331
    REALM_ASSERT_EX(query_version >= 0, query_version);
44,788✔
2332
    // Ignore the message if the deactivation process has been initiated,
23,858✔
2333
    // because in that case, the associated Realm and SessionWrapper must
23,858✔
2334
    // not be accessed any longer.
23,858✔
2335
    if (m_state != Active)
44,788✔
2336
        return Status::OK();
496✔
2337

23,570✔
2338
    if (is_steady_state_download_message(batch_state, query_version)) {
44,292✔
2339
        batch_state = DownloadBatchState::SteadyState;
42,630✔
2340
    }
42,630✔
2341

23,570✔
2342
    logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
44,292✔
2343
                 "latest_server_version=%3, latest_server_version_salt=%4, "
44,292✔
2344
                 "upload_client_version=%5, upload_server_version=%6, downloadable_bytes=%7, "
44,292✔
2345
                 "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
44,292✔
2346
                 progress.download.server_version, progress.download.last_integrated_client_version,
44,292✔
2347
                 progress.latest_server_version.version, progress.latest_server_version.salt,
44,292✔
2348
                 progress.upload.client_version, progress.upload.last_integrated_server_version, downloadable_bytes,
44,292✔
2349
                 batch_state != DownloadBatchState::MoreToCome, query_version, received_changesets.size()); // Throws
44,292✔
2350

23,570✔
2351
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
23,570✔
2352
    // changeset over and over again.
23,570✔
2353
    if (m_client_error) {
44,292✔
2354
        logger.debug("Ignoring download message because the client detected an integration error");
×
2355
        return Status::OK();
×
2356
    }
×
2357

23,570✔
2358
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
44,296✔
2359
    if (REALM_UNLIKELY(!legal_at_this_time)) {
44,292✔
2360
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
×
2361
    }
×
2362
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
44,292✔
2363
        logger.error("Bad sync progress received (%1)", status);
×
2364
        return status;
×
2365
    }
×
2366

23,570✔
2367
    version_type server_version = m_progress.download.server_version;
44,292✔
2368
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
44,292✔
2369
    for (const RemoteChangeset& changeset : received_changesets) {
44,608✔
2370
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
22,520✔
2371
        // version must be increasing, but can stay the same during bootstraps.
22,520✔
2372
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
23,388✔
2373
                                                         : (changeset.remote_version > server_version);
42,690✔
2374
        // Each server version cannot be greater than the one in the header of the download message.
22,520✔
2375
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
43,558✔
2376
        if (!good_server_version) {
43,558✔
2377
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2378
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
×
2379
                                 changeset.remote_version, server_version, progress.download.server_version)};
×
2380
        }
×
2381
        server_version = changeset.remote_version;
43,558✔
2382
        // Check that per-changeset last integrated client version is "weakly"
22,520✔
2383
        // increasing.
22,520✔
2384
        bool good_client_version =
43,558✔
2385
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
43,558✔
2386
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
43,554✔
2387
        if (!good_client_version) {
43,558✔
2388
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2389
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
×
2390
                                 "(%1, %2, %3)",
×
2391
                                 changeset.last_integrated_local_version, last_integrated_client_version,
×
2392
                                 progress.download.last_integrated_client_version)};
×
2393
        }
×
2394
        last_integrated_client_version = changeset.last_integrated_local_version;
43,558✔
2395
        // Server shouldn't send our own changes, and zero is not a valid client
22,520✔
2396
        // file identifier.
22,520✔
2397
        bool good_file_ident =
43,558✔
2398
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
43,566✔
2399
        if (!good_file_ident) {
43,558✔
2400
            return {ErrorCodes::SyncProtocolInvariantFailed,
×
2401
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
×
2402
                                 changeset.origin_file_ident)};
×
2403
        }
×
2404
    }
43,558✔
2405

23,570✔
2406
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
44,292✔
2407
                                       batch_state, received_changesets.size());
44,292✔
2408
    if (hook_action == SyncClientHookAction::EarlyReturn) {
44,292✔
2409
        return Status::OK();
12✔
2410
    }
12✔
2411
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
44,280✔
2412

23,564✔
2413
    if (process_flx_bootstrap_message(progress, batch_state, query_version, received_changesets)) {
44,280✔
2414
        clear_resumption_delay_state();
1,656✔
2415
        return Status::OK();
1,656✔
2416
    }
1,656✔
2417

22,736✔
2418
    initiate_integrate_changesets(downloadable_bytes, batch_state, progress, received_changesets); // Throws
42,624✔
2419

22,736✔
2420
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
42,624✔
2421
                                  batch_state, received_changesets.size());
42,624✔
2422
    if (hook_action == SyncClientHookAction::EarlyReturn) {
42,624✔
2423
        return Status::OK();
×
2424
    }
×
2425
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
42,624✔
2426

22,736✔
2427
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
22,736✔
2428
    // after a retryable session error.
22,736✔
2429
    clear_resumption_delay_state();
42,624✔
2430
    return Status::OK();
42,624✔
2431
}
42,624✔
2432

2433
Status Session::receive_mark_message(request_ident_type request_ident)
2434
{
15,678✔
2435
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
15,678✔
2436

7,754✔
2437
    // Ignore the message if the deactivation process has been initiated,
7,754✔
2438
    // because in that case, the associated Realm and SessionWrapper must
7,754✔
2439
    // not be accessed any longer.
7,754✔
2440
    if (m_state != Active)
15,678✔
2441
        return Status::OK(); // Success
46✔
2442

7,718✔
2443
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
15,632✔
2444
    if (REALM_UNLIKELY(!legal_at_this_time)) {
15,632✔
UNCOV
2445
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
×
UNCOV
2446
    }
×
2447
    bool good_request_ident =
15,632✔
2448
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
15,632✔
2449
    if (REALM_UNLIKELY(!good_request_ident)) {
15,632✔
2450
        return {
×
2451
            ErrorCodes::SyncProtocolInvariantFailed,
×
2452
            util::format(
×
2453
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
×
2454
                m_last_download_mark_sent, m_last_download_mark_received)};
×
2455
    }
×
2456

7,718✔
2457
    m_server_version_at_last_download_mark = m_progress.download.server_version;
15,632✔
2458
    m_last_download_mark_received = request_ident;
15,632✔
2459
    check_for_download_completion(); // Throws
15,632✔
2460

7,718✔
2461
    return Status::OK(); // Success
15,632✔
2462
}
15,632✔
2463

2464

2465
// The caller (Connection) must discard the session if the session has become
2466
// deactivated upon return.
2467
Status Session::receive_unbound_message()
2468
{
3,936✔
2469
    logger.debug("Received: UNBOUND");
3,936✔
2470

1,892✔
2471
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,936✔
2472
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,936✔
2473
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
×
2474
    }
×
2475

1,892✔
2476
    // The fact that the UNBIND message has been sent, but an ERROR message has
1,892✔
2477
    // not been received, implies that the deactivation process must have been
1,892✔
2478
    // initiated, so this session must be in the Deactivating state or the session
1,892✔
2479
    // has been suspended because of a client side error.
1,892✔
2480
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
3,936!
2481

1,892✔
2482
    m_unbound_message_received = true;
3,936✔
2483

1,892✔
2484
    // Detect completion of the unbinding process
1,892✔
2485
    if (m_unbind_message_send_complete && m_state == Deactivating) {
3,936✔
2486
        // The deactivation process completes when the unbinding process
1,892✔
2487
        // completes.
1,892✔
2488
        complete_deactivation(); // Throws
3,936✔
2489
        // Life cycle state is now Deactivated
1,892✔
2490
    }
3,936✔
2491

1,892✔
2492
    return Status::OK(); // Success
3,936✔
2493
}
3,936✔
2494

2495

2496
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2497
{
16✔
2498
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
16✔
2499
    // Ignore the message if the deactivation process has been initiated,
8✔
2500
    // because in that case, the associated Realm and SessionWrapper must
8✔
2501
    // not be accessed any longer.
8✔
2502
    if (m_state == Active) {
16✔
2503
        on_flx_sync_error(query_version, message); // throws
16✔
2504
    }
16✔
2505
    return Status::OK();
16✔
2506
}
16✔
2507

2508
// The caller (Connection) must discard the session if the session has become
2509
// deactivated upon return.
2510
Status Session::receive_error_message(const ProtocolErrorInfo& info)
2511
{
902✔
2512
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
902✔
2513
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
902✔
2514

474✔
2515
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
902✔
2516
    if (REALM_UNLIKELY(!legal_at_this_time)) {
902✔
2517
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
×
2518
    }
×
2519

474✔
2520
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
902✔
2521
    auto status = protocol_error_to_status(protocol_error, info.message);
902✔
2522
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
902✔
2523
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2524
                util::format("Received ERROR message for session with non-session-level error code %1",
×
2525
                             info.raw_error_code)};
×
2526
    }
×
2527

474✔
2528
    // Can't process debug hook actions once the Session is undergoing deactivation, since
474✔
2529
    // the SessionWrapper may not be available
474✔
2530
    if (m_state == Active) {
902✔
2531
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
900✔
2532
        if (debug_action == SyncClientHookAction::EarlyReturn) {
900✔
2533
            return Status::OK();
8✔
2534
        }
8✔
2535
    }
894✔
2536

470✔
2537
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
470✔
2538
    // containing the compensating write has appeared in a download message.
470✔
2539
    if (status == ErrorCodes::SyncCompensatingWrite) {
894✔
2540
        // If the client is not active, the compensating writes will not be processed now, but will be
22✔
2541
        // sent again the next time the client connects
22✔
2542
        if (m_state == Active) {
44✔
2543
            REALM_ASSERT(info.compensating_write_server_version.has_value());
44✔
2544
            m_pending_compensating_write_errors.push_back(info);
44✔
2545
        }
44✔
2546
        return Status::OK();
44✔
2547
    }
44✔
2548

448✔
2549
    m_error_message_received = true;
850✔
2550
    suspend(SessionErrorInfo{info, std::move(status)});
850✔
2551
    return Status::OK();
850✔
2552
}
850✔
2553

2554
void Session::suspend(const SessionErrorInfo& info)
2555
{
922✔
2556
    REALM_ASSERT(!m_suspended);
922✔
2557
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
922!
2558
    logger.debug("Suspended"); // Throws
922✔
2559

484✔
2560
    m_suspended = true;
922✔
2561

484✔
2562
    // Detect completion of the unbinding process
484✔
2563
    if (m_unbind_message_send_complete && m_error_message_received) {
922!
2564
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2✔
2565
        // we received an ERROR message implies that the deactivation process must
2✔
2566
        // have been initiated, so this session must be in the Deactivating state.
2✔
2567
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
2!
2568

2✔
2569
        // The deactivation process completes when the unbinding process
2✔
2570
        // completes.
2✔
2571
        complete_deactivation(); // Throws
2✔
2572
        // Life cycle state is now Deactivated
2✔
2573
    }
2✔
2574

484✔
2575
    // Notify the application of the suspension of the session if the session is
484✔
2576
    // still in the Active state
484✔
2577
    if (m_state == Active) {
922✔
2578
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
920✔
2579
        m_conn.one_less_active_unsuspended_session(); // Throws
920✔
2580
        on_suspended(info);                           // Throws
920✔
2581
    }
920✔
2582

484✔
2583
    if (!info.is_fatal) {
922✔
2584
        begin_resumption_delay(info);
382✔
2585
    }
382✔
2586

484✔
2587
    // Ready to send the UNBIND message, if it has not been sent already
484✔
2588
    if (!m_unbind_message_sent)
922✔
2589
        ensure_enlisted_to_send(); // Throws
920✔
2590
}
922✔
2591

2592
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
2593
{
60✔
2594
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
60✔
2595
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
60✔
2596
                           [&](const PendingTestCommand& command) {
60✔
2597
                               return command.id == ident;
60✔
2598
                           });
60✔
2599
    if (it == m_pending_test_commands.end()) {
60✔
2600
        return {ErrorCodes::SyncProtocolInvariantFailed,
×
2601
                util::format("Received test command response for a non-existent ident %1", ident)};
×
2602
    }
×
2603

30✔
2604
    it->promise.emplace_value(std::string{body});
60✔
2605
    m_pending_test_commands.erase(it);
60✔
2606

30✔
2607
    return Status::OK();
60✔
2608
}
60✔
2609

2610
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
2611
{
382✔
2612
    REALM_ASSERT(!m_try_again_activation_timer);
382✔
2613

216✔
2614
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
382✔
2615
                                  error_info.resumption_delay_interval);
382✔
2616
    auto try_again_interval = m_try_again_delay_info.delay_interval();
382✔
2617
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
382✔
2618
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
10✔
2619
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
10✔
2620
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
10✔
2621
        try_again_interval = std::chrono::milliseconds{1000};
20✔
2622
    }
20✔
2623
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
382✔
2624
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
382✔
2625
        if (status == ErrorCodes::OperationAborted)
382✔
2626
            return;
4✔
2627
        else if (!status.is_ok())
378✔
2628
            throw Exception(status);
×
2629

214✔
2630
        m_try_again_activation_timer.reset();
378✔
2631
        cancel_resumption_delay();
378✔
2632
    });
378✔
2633
}
382✔
2634

2635
void Session::clear_resumption_delay_state()
2636
{
44,278✔
2637
    if (m_try_again_activation_timer) {
44,278✔
2638
        logger.debug("Clearing resumption delay state after successful download");
×
2639
        m_try_again_delay_info.reset();
×
2640
    }
×
2641
}
44,278✔
2642

2643
Status ClientImpl::Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2644
{
44,296✔
2645
    const SyncProgress& a = m_progress;
44,296✔
2646
    const SyncProgress& b = progress;
44,296✔
2647
    std::string message;
44,296✔
2648
    if (b.latest_server_version.version < a.latest_server_version.version) {
44,296✔
2649
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2650
                               "session (current: %1, received: %2)",
×
2651
                               a.latest_server_version.version, b.latest_server_version.version);
×
2652
    }
×
2653
    if (b.upload.client_version < a.upload.client_version) {
44,296✔
2654
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2655
                               "throughout a session (current: %1, received: %2)",
×
2656
                               a.upload.client_version, b.upload.client_version);
×
2657
    }
×
2658
    if (b.upload.client_version > m_last_version_available) {
44,296✔
2659
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2660
                               "version in existence (current: %1, received: %2)",
×
2661
                               m_last_version_available, b.upload.client_version);
×
2662
    }
×
2663
    if (b.download.server_version < a.download.server_version) {
44,296✔
2664
        message =
×
2665
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2666
                         a.download.server_version, b.download.server_version);
×
2667
    }
×
2668
    if (b.download.server_version > b.latest_server_version.version) {
44,296✔
2669
        message = util::format(
×
2670
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
×
2671
            b.download.server_version, b.latest_server_version.version);
×
2672
    }
×
2673
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
44,296✔
2674
        message = util::format(
×
2675
            "Last integrated client version on the server at the position in the server's history of the download "
×
2676
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2677
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
×
2678
    }
×
2679
    if (b.download.last_integrated_client_version > b.upload.client_version) {
44,296✔
2680
        message = util::format("Last integrated client version on the server in the position at the server's history "
×
2681
                               "of the download cursor cannot be greater than the latest client version integrated "
×
2682
                               "on the server (download: %1, upload: %2)",
×
2683
                               b.download.last_integrated_client_version, b.upload.client_version);
×
2684
    }
×
2685
    if (b.download.server_version < b.upload.last_integrated_server_version) {
44,296✔
2686
        message = util::format(
×
2687
            "The server version of the download cursor cannot be less than the server version integrated in the "
×
2688
            "latest client version acknowledged by the server (download: %1, upload: %2)",
×
2689
            b.download.server_version, b.upload.last_integrated_server_version);
×
2690
    }
×
2691

23,572✔
2692
    if (message.empty()) {
44,296✔
2693
        return Status::OK();
44,294✔
2694
    }
44,294✔
2695
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2✔
2696
}
2✔
2697

2698

2699
void Session::check_for_upload_completion()
2700
{
75,708✔
2701
    REALM_ASSERT_EX(m_state == Active, m_state);
75,708✔
2702
    if (!m_upload_completion_notification_requested) {
75,708✔
2703
        return;
45,166✔
2704
    }
45,166✔
2705

14,944✔
2706
    // during an ongoing client reset operation, we never upload anything
14,944✔
2707
    if (m_performing_client_reset)
30,542✔
2708
        return;
244✔
2709

14,822✔
2710
    // Upload process must have reached end of history
14,822✔
2711
    REALM_ASSERT_3(m_upload_progress.client_version, <=, m_last_version_available);
30,298✔
2712
    bool scan_complete = (m_upload_progress.client_version == m_last_version_available);
30,298✔
2713
    if (!scan_complete)
30,298✔
2714
        return;
5,054✔
2715

12,296✔
2716
    // All uploaded changesets must have been acknowledged by the server
12,296✔
2717
    REALM_ASSERT_3(m_progress.upload.client_version, <=, m_last_version_selected_for_upload);
25,244✔
2718
    bool all_uploads_accepted = (m_progress.upload.client_version == m_last_version_selected_for_upload);
25,244✔
2719
    if (!all_uploads_accepted)
25,244✔
2720
        return;
10,572✔
2721

7,248✔
2722
    m_upload_completion_notification_requested = false;
14,672✔
2723
    on_upload_completion(); // Throws
14,672✔
2724
}
14,672✔
2725

2726

2727
void Session::check_for_download_completion()
2728
{
59,718✔
2729
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
59,718✔
2730
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
59,718✔
2731
    if (m_last_download_mark_received == m_last_triggering_download_mark)
59,718✔
2732
        return;
43,916✔
2733
    if (m_last_download_mark_received < m_target_download_mark)
15,802✔
2734
        return;
314✔
2735
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
15,488✔
2736
        return;
×
2737
    m_last_triggering_download_mark = m_target_download_mark;
15,488✔
2738
    if (REALM_UNLIKELY(!m_allow_upload)) {
15,488✔
2739
        // Activate the upload process now, and enable immediate reactivation
2,024✔
2740
        // after a subsequent fast reconnect.
2,024✔
2741
        m_allow_upload = true;
4,242✔
2742
        ensure_enlisted_to_send(); // Throws
4,242✔
2743
    }
4,242✔
2744
    on_download_completion(); // Throws
15,488✔
2745
}
15,488✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc