• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_439

02 Jul 2024 07:51PM UTC coverage: 91.316% (+0.3%) from 90.974%
thomas.goyne_439

push

Evergreen

web-flow
[RCORE-2146] CAPI Remove `is_fatal` flag flip (#7751)

104592 of 183414 branches covered (57.03%)

0 of 1 new or added line in 1 file covered. (0.0%)

11 existing lines in 5 files now uncovered.

218041 of 238776 relevant lines covered (91.32%)

5606911.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.74
/src/realm/sync/noinst/client_impl_base.cpp
1
#include <realm/sync/noinst/client_impl_base.hpp>
2

3
#include <realm/impl/simulated_failure.hpp>
4
#include <realm/sync/changeset_parser.hpp>
5
#include <realm/sync/impl/clock.hpp>
6
#include <realm/sync/network/http.hpp>
7
#include <realm/sync/network/websocket.hpp>
8
#include <realm/sync/noinst/client_history_impl.hpp>
9
#include <realm/sync/noinst/compact_changesets.hpp>
10
#include <realm/sync/noinst/client_reset_operation.hpp>
11
#include <realm/sync/noinst/sync_schema_migration.hpp>
12
#include <realm/sync/protocol.hpp>
13
#include <realm/util/assert.hpp>
14
#include <realm/util/basic_system_errors.hpp>
15
#include <realm/util/memory_stream.hpp>
16
#include <realm/util/platform_info.hpp>
17
#include <realm/util/random.hpp>
18
#include <realm/util/safe_int_ops.hpp>
19
#include <realm/util/scope_exit.hpp>
20
#include <realm/util/to_string.hpp>
21
#include <realm/util/uri.hpp>
22
#include <realm/version.hpp>
23

24
#include <realm/sync/network/websocket.hpp> // Only for websocket::Error TODO remove
25

26
#include <system_error>
27
#include <sstream>
28

29
// NOTE: The protocol specification is in `/doc/protocol.md`
30

31
using namespace realm;
32
using namespace _impl;
33
using namespace realm::util;
34
using namespace realm::sync;
35
using namespace realm::sync::websocket;
36

37
// clang-format off
38
using Connection      = ClientImpl::Connection;
39
using Session         = ClientImpl::Session;
40
using UploadChangeset = ClientHistory::UploadChangeset;
41

42
// These are a work-around for a bug in MSVC. It cannot find in-class types
43
// mentioned in signature of out-of-line member function definitions.
44
using ConnectionTerminationReason = ClientImpl::ConnectionTerminationReason;
45
using OutputBuffer                = ClientImpl::OutputBuffer;
46
using ReceivedChangesets          = ClientProtocol::ReceivedChangesets;
47
// clang-format on
48

1,012✔
49
void ClientImpl::ReconnectInfo::reset() noexcept
1,012✔
50
{
2,036✔
51
    m_backoff_state.reset();
2,036✔
52
    scheduled_reset = false;
1,024✔
53
}
1,024✔
54

55

56
void ClientImpl::ReconnectInfo::update(ConnectionTerminationReason new_reason,
1,846✔
57
                                       std::optional<ResumptionDelayInfo> new_delay_info)
1,846✔
58
{
3,834✔
59
    m_backoff_state.update(new_reason, new_delay_info);
1,988✔
60
}
1,988✔
61

62

3,038✔
63
std::chrono::milliseconds ClientImpl::ReconnectInfo::delay_interval()
3,038✔
64
{
3,186✔
65
    if (scheduled_reset) {
3,186✔
66
        reset();
4✔
67
    }
3,042✔
68

2,300✔
69
    if (!m_backoff_state.triggering_error) {
5,482✔
70
        return std::chrono::milliseconds::zero();
2,434✔
71
    }
3,172✔
72

40✔
73
    switch (*m_backoff_state.triggering_error) {
788✔
74
        case ConnectionTerminationReason::closed_voluntarily:
48✔
75
            return std::chrono::milliseconds::zero();
48✔
76
        case ConnectionTerminationReason::server_said_do_not_reconnect:
700✔
77
            return std::chrono::milliseconds::max();
700✔
78
        default:
1,212✔
79
            if (m_reconnect_mode == ReconnectMode::testing) {
1,212✔
80
                return std::chrono::milliseconds::max();
522✔
81
            }
694✔
82

172✔
83
            REALM_ASSERT(m_reconnect_mode == ReconnectMode::normal);
910✔
84
            return m_backoff_state.delay_interval();
910✔
85
    }
748✔
86
}
748✔
87

88

89
bool ClientImpl::decompose_server_url(const std::string& url, ProtocolEnvelope& protocol, std::string& address,
1,856✔
90
                                      port_type& port, std::string& path) const
1,856✔
91
{
4,048✔
92
    util::Uri uri(url); // Throws
4,048✔
93
    uri.canonicalize(); // Throws
4,048✔
94
    std::string userinfo, address_2, port_2;
4,048✔
95
    bool realm_scheme = (uri.get_scheme() == "realm:" || uri.get_scheme() == "realms:");
4,048✔
96
    bool ws_scheme = (uri.get_scheme() == "ws:" || uri.get_scheme() == "wss:");
4,048✔
97
    bool good = ((realm_scheme || ws_scheme) && uri.get_auth(userinfo, address_2, port_2) && userinfo.empty() &&
4,048✔
98
                 !address_2.empty() && uri.get_query().empty() && uri.get_frag().empty()); // Throws
2,192✔
99
    if (REALM_UNLIKELY(!good))
4,048✔
100
        return false;
1,856✔
101
    ProtocolEnvelope protocol_2;
4,048✔
102
    port_type port_3;
2,192!
103
    if (realm_scheme) {
2,192✔
104
        if (uri.get_scheme() == "realm:") {
×
105
            protocol_2 = ProtocolEnvelope::realm;
×
106
            port_3 = (m_enable_default_port_hack ? 80 : 7800);
×
107
        }
×
108
        else {
×
109
            protocol_2 = ProtocolEnvelope::realms;
×
110
            port_3 = (m_enable_default_port_hack ? 443 : 7801);
×
111
        }
1,856✔
112
    }
1,856✔
113
    else {
4,048✔
114
        REALM_ASSERT(ws_scheme);
4,008✔
115
        if (uri.get_scheme() == "ws:") {
4,008✔
116
            protocol_2 = ProtocolEnvelope::ws;
3,968✔
117
            port_3 = 80;
2,192✔
118
        }
2,192✔
119
        else {
80✔
120
            protocol_2 = ProtocolEnvelope::wss;
80✔
121
            port_3 = 443;
1,896✔
122
        }
1,896✔
123
    }
3,996✔
124
    if (!port_2.empty()) {
3,996✔
125
        std::istringstream in(port_2);    // Throws
3,944✔
126
        in.imbue(std::locale::classic()); // Throws
3,944✔
127
        in >> port_3;
2,140✔
128
        if (REALM_UNLIKELY(!in || !in.eof() || port_3 < 1))
3,944✔
129
            return false;
1,856✔
130
    }
2,140✔
131
    std::string path_2 = uri.get_path(); // Throws (copy)
4,048✔
132

1,856✔
133
    protocol = protocol_2;
4,048✔
134
    address = std::move(address_2);
4,048✔
135
    port = port_3;
4,048✔
136
    path = std::move(path_2);
4,048✔
137
    return true;
2,192✔
138
}
2,192✔
139

4,902✔
140
ClientImpl::ClientImpl(ClientConfig config)
4,902✔
141
    : logger_ptr{std::make_shared<util::CategoryLogger>(util::LogCategory::session, std::move(config.logger))}
4,902✔
142
    , logger{*logger_ptr}
4,902✔
143
    , m_reconnect_mode{config.reconnect_mode}
4,902✔
144
    , m_connect_timeout{config.connect_timeout}
4,902✔
145
    , m_connection_linger_time{config.one_connection_per_session ? 0 : config.connection_linger_time}
4,902✔
146
    , m_ping_keepalive_period{config.ping_keepalive_period}
4,902✔
147
    , m_pong_keepalive_timeout{config.pong_keepalive_timeout}
4,902✔
148
    , m_fast_reconnect_limit{config.fast_reconnect_limit}
4,902✔
149
    , m_reconnect_backoff_info{config.reconnect_backoff_info}
4,902✔
150
    , m_disable_upload_activation_delay{config.disable_upload_activation_delay}
4,902✔
151
    , m_dry_run{config.dry_run}
4,902✔
152
    , m_enable_default_port_hack{config.enable_default_port_hack}
4,902✔
153
    , m_disable_upload_compaction{config.disable_upload_compaction}
4,902✔
154
    , m_fix_up_object_ids{config.fix_up_object_ids}
4,902✔
155
    , m_roundtrip_time_handler{std::move(config.roundtrip_time_handler)}
4,902✔
156
    , m_socket_provider{std::move(config.socket_provider)}
4,902✔
157
    , m_client_protocol{} // Throws
4,902✔
158
    , m_one_connection_per_session{config.one_connection_per_session}
4,902✔
159
    , m_random{}
160
{
9,938✔
161
    // FIXME: Would be better if seeding was up to the application.
162
    util::seed_prng_nondeterministically(m_random); // Throws
9,938✔
163

4,902✔
164
    logger.info("Realm sync client (%1)", REALM_VER_CHUNK); // Throws
9,938✔
165
    logger.debug("Supported protocol versions: %1-%2", get_oldest_supported_protocol_version(),
9,938✔
166
                 get_current_protocol_version()); // Throws
9,938✔
167
    logger.info("Platform: %1", util::get_platform_info());
9,938✔
168
    const char* build_mode;
9,938✔
169
#if REALM_DEBUG
5,036✔
170
    build_mode = "Debug";
5,036✔
171
#else
172
    build_mode = "Release";
4,902✔
173
#endif
4,902✔
174
    logger.debug("Build mode: %1", build_mode);
9,938✔
175
    logger.debug("Config param: one_connection_per_session = %1",
9,938✔
176
                 config.one_connection_per_session); // Throws
9,938✔
177
    logger.debug("Config param: connect_timeout = %1 ms",
9,938✔
178
                 config.connect_timeout); // Throws
9,938✔
179
    logger.debug("Config param: connection_linger_time = %1 ms",
9,938✔
180
                 config.connection_linger_time); // Throws
9,938✔
181
    logger.debug("Config param: ping_keepalive_period = %1 ms",
9,938✔
182
                 config.ping_keepalive_period); // Throws
9,938✔
183
    logger.debug("Config param: pong_keepalive_timeout = %1 ms",
9,938✔
184
                 config.pong_keepalive_timeout); // Throws
9,938✔
185
    logger.debug("Config param: fast_reconnect_limit = %1 ms",
9,938✔
186
                 config.fast_reconnect_limit); // Throws
9,938✔
187
    logger.debug("Config param: disable_upload_compaction = %1",
9,938✔
188
                 config.disable_upload_compaction); // Throws
9,938✔
189
    logger.debug("Config param: disable_sync_to_disk = %1",
9,938✔
190
                 config.disable_sync_to_disk); // Throws
9,938✔
191
    logger.debug(
9,938✔
192
        "Config param: reconnect backoff info: max_delay: %1 ms, initial_delay: %2 ms, multiplier: %3, jitter: 1/%4",
9,938✔
193
        m_reconnect_backoff_info.max_resumption_delay_interval.count(),
9,938✔
194
        m_reconnect_backoff_info.resumption_delay_interval.count(),
5,036✔
195
        m_reconnect_backoff_info.resumption_delay_backoff_multiplier, m_reconnect_backoff_info.delay_jitter_divisor);
9,938✔
196

390✔
197
    if (config.reconnect_mode != ReconnectMode::normal) {
5,426✔
198
        logger.warn("Testing/debugging feature 'nonnormal reconnect mode' enabled - "
776✔
199
                    "never do this in production!");
386✔
200
    }
5,288✔
201

202
    if (config.dry_run) {
5,036✔
203
        logger.warn("Testing/debugging feature 'dry run' enabled - "
×
204
                    "never do this in production!");
205
    }
4,902✔
206

207
    REALM_ASSERT_EX(m_socket_provider, "Must provide socket provider in sync Client config");
9,938✔
208

2✔
209
    if (m_one_connection_per_session) {
5,038✔
210
        logger.warn("Testing/debugging feature 'one connection per session' enabled - "
4✔
211
                    "never do this in production");
2✔
212
    }
4,904✔
213

214
    if (config.disable_upload_activation_delay) {
5,036✔
215
        logger.warn("Testing/debugging feature 'disable_upload_activation_delay' enabled - "
×
216
                    "never do this in production");
217
    }
4,902✔
218

219
    if (config.disable_sync_to_disk) {
5,036✔
220
        logger.warn("Testing/debugging feature 'disable_sync_to_disk' enabled - "
×
221
                    "never do this in production");
222
    }
6,976✔
223

6,976✔
224
    m_actualize_and_finalize = create_trigger([this](Status status) {
7,384✔
225
        if (status == ErrorCodes::OperationAborted)
14,360✔
226
            return;
×
227
        else if (!status.is_ok())
14,360✔
228
            throw Exception(status);
6,976✔
229
        actualize_and_finalize_session_wrappers(); // Throws
12,286✔
230
    });
7,384✔
231
}
5,036✔
232

94,914✔
233
void ClientImpl::incr_outstanding_posts()
94,914✔
234
{
197,912✔
235
    util::CheckedLockGuard lock(m_drain_mutex);
197,912✔
236
    ++m_outstanding_posts;
197,912✔
237
    m_drained = false;
102,998✔
238
}
102,998✔
239

94,914✔
240
void ClientImpl::decr_outstanding_posts()
94,914✔
241
{
197,920✔
242
    util::CheckedLockGuard lock(m_drain_mutex);
197,920✔
243
    REALM_ASSERT(m_outstanding_posts);
103,006✔
244
    if (--m_outstanding_posts <= 0) {
103,006✔
245
        // Notify must happen with lock held or another thread could destroy
9,022✔
246
        // ClientImpl between when we release the lock and when we call notify
9,022✔
247
        m_drain_cv.notify_all();
104,098✔
248
    }
9,184✔
249
}
103,006✔
250

27,344✔
251
void ClientImpl::post(SyncSocketProvider::FunctionHandler&& handler)
27,344✔
252
{
55,466✔
253
    REALM_ASSERT(m_socket_provider);
55,466✔
254
    incr_outstanding_posts();
55,466✔
255
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
55,466✔
256
        auto decr_guard = util::make_scope_exit([&]() noexcept {
55,468✔
257
            decr_outstanding_posts();
55,468✔
258
        });
55,468✔
259
        handler(status);
55,466✔
260
    });
28,122✔
261
}
28,122✔
262

59,224✔
263
void ClientImpl::post(util::UniqueFunction<void()>&& handler)
59,224✔
264
{
124,954✔
265
    REALM_ASSERT(m_socket_provider);
124,954✔
266
    incr_outstanding_posts();
124,952✔
267
    m_socket_provider->post([handler = std::move(handler), this](Status status) {
124,952✔
268
        auto decr_guard = util::make_scope_exit([&]() noexcept {
124,952✔
269
            decr_outstanding_posts();
124,950✔
270
        });
65,728✔
271
        if (status == ErrorCodes::OperationAborted)
124,952✔
272
            return;
×
273
        if (!status.is_ok())
124,952✔
274
            throw Exception(status);
59,222✔
275
        handler();
124,954✔
276
    });
65,730✔
277
}
65,730✔
278

279

4,902✔
280
void ClientImpl::drain_connections()
4,902✔
281
{
9,938✔
282
    logger.debug("Draining connections during sync client shutdown");
6,318✔
283
    for (auto& server_slot_pair : m_server_slots) {
5,036✔
284
        auto& server_slot = server_slot_pair.second;
2,690✔
285

1,168✔
286
        if (server_slot.connection) {
2,576✔
287
            auto& conn = server_slot.connection;
2,462✔
288
            conn->force_close();
1,408✔
289
        }
1,408✔
290
        else {
120✔
291
            for (auto& conn_pair : server_slot.alt_connections) {
120✔
292
                conn_pair.second->force_close();
114✔
293
            }
1,282✔
294
        }
5,016✔
295
    }
1,408✔
296
}
5,036✔
297

298

299
SyncSocketProvider::SyncTimer ClientImpl::create_timer(std::chrono::milliseconds delay,
8,346✔
300
                                                       SyncSocketProvider::FunctionHandler&& handler)
8,346✔
301
{
17,500✔
302
    REALM_ASSERT(m_socket_provider);
17,502✔
303
    incr_outstanding_posts();
17,502✔
304
    return m_socket_provider->create_timer(delay, [handler = std::move(handler), this](Status status) {
17,506✔
305
        auto decr_guard = util::make_scope_exit([&]() noexcept {
17,508✔
306
            decr_outstanding_posts();
17,508✔
307
        });
17,508✔
308
        handler(status);
17,504✔
309
    });
9,158✔
310
}
9,154✔
311

312

6,238✔
313
ClientImpl::SyncTrigger ClientImpl::create_trigger(SyncSocketProvider::FunctionHandler&& handler)
6,238✔
314
{
12,736✔
315
    REALM_ASSERT(m_socket_provider);
12,736✔
316
    return std::make_unique<Trigger<ClientImpl>>(this, std::move(handler));
6,498✔
317
}
6,498✔
318

1,336✔
319
Connection::~Connection()
1,336✔
320
{
1,462✔
321
    if (m_websocket_sentinel) {
1,462✔
322
        m_websocket_sentinel->destroyed = true;
×
323
        m_websocket_sentinel.reset();
1,336✔
324
    }
325
}
1,462✔
326

1,336✔
327
void Connection::activate()
1,336✔
328
{
2,800✔
329
    REALM_ASSERT(m_on_idle);
2,800✔
330
    m_activated = true;
1,464✔
331
    if (m_num_active_sessions == 0)
1,464✔
332
        m_on_idle->trigger();
333
    // We cannot in general connect immediately, because a prior failure to
1,336✔
334
    // connect may require a delay before reconnecting (see `m_reconnect_info`).
1,336✔
335
    initiate_reconnect_wait(); // Throws
1,464✔
336
}
1,464✔
337

338

4,868✔
339
void Connection::activate_session(std::unique_ptr<Session> sess)
4,868✔
340
{
10,072✔
341
    REALM_ASSERT(sess);
10,072✔
342
    REALM_ASSERT(&sess->m_conn == this);
10,072✔
343
    REALM_ASSERT(!m_force_closed);
10,072✔
344
    Session& sess_2 = *sess;
10,072✔
345
    session_ident_type ident = sess->m_ident;
10,072✔
346
    auto p = m_sessions.emplace(ident, std::move(sess)); // Throws
10,072✔
347
    bool was_inserted = p.second;
5,204✔
348
    REALM_ASSERT(was_inserted);
10,072✔
349
    // Save the session ident to the historical list of session idents
4,868✔
350
    m_session_history.insert(ident);
10,072✔
351
    sess_2.activate(); // Throws
8,624✔
352
    if (m_state == ConnectionState::connected) {
8,624✔
353
        bool fast_reconnect = false;
6,998✔
354
        sess_2.connection_established(fast_reconnect); // Throws
8,446✔
355
    }
8,446✔
356
    ++m_num_active_sessions;
5,204✔
357
}
5,204✔
358

359

4,868✔
360
void Connection::initiate_session_deactivation(Session* sess)
4,868✔
361
{
10,072✔
362
    REALM_ASSERT(sess);
10,072✔
363
    REALM_ASSERT(&sess->m_conn == this);
5,204✔
364
    REALM_ASSERT(m_num_active_sessions);
5,204✔
365
    // Since the client may be waiting for m_num_active_sessions to reach 0
366
    // in stop_and_wait() (on a separate thread), deactivate Session before
4,868✔
367
    // decrementing the num active sessions value.
4,868✔
368
    sess->initiate_deactivation(); // Throws
5,630✔
369
    if (sess->m_state == Session::Deactivated) {
5,630✔
370
        finish_session_deactivation(sess);
5,340✔
371
    }
2,536✔
372
    if (REALM_UNLIKELY(--m_num_active_sessions == 0)) {
5,394✔
373
        if (m_activated && m_state == ConnectionState::disconnected)
4,370✔
374
            m_on_idle->trigger();
5,050✔
375
    }
2,306✔
376
}
5,204✔
377

378

1,122✔
379
void Connection::cancel_reconnect_delay()
1,122✔
380
{
1,134✔
381
    REALM_ASSERT(m_activated);
2,256✔
382

1,006✔
383
    if (m_reconnect_delay_in_progress) {
1,638✔
384
        if (m_nonzero_reconnect_delay)
1,018✔
385
            logger.detail("Canceling reconnect delay"); // Throws
510✔
386

387
        // Cancel the in-progress wait operation by destroying the timer
388
        // object. Destruction is needed in this case, because a new wait
389
        // operation might have to be initiated before the previous one
390
        // completes (its completion handler starts to execute), so the new wait
1,006✔
391
        // operation must be done on a new timer object.
1,006✔
392
        m_reconnect_disconnect_timer.reset();
2,024✔
393
        m_reconnect_delay_in_progress = false;
2,024✔
394
        m_reconnect_info.reset();
2,024✔
395
        initiate_reconnect_wait(); // Throws
2,024✔
396
        return;
1,018✔
397
    }
1,018✔
398

399
    // If we are not disconnected, then we need to make sure the next time we get disconnected
400
    // that we are allowed to re-connect as quickly as possible.
401
    //
402
    // Setting m_reconnect_info.scheduled_reset will cause initiate_reconnect_wait to reset the
403
    // backoff/delay state before calculating the next delay, unless a PONG message is received
404
    // for the urgent PING message we send below.
405
    //
406
    // If we get a PONG message for the urgent PING message sent below, then the connection is
116✔
407
    // healthy and we can calculate the next delay normally.
116✔
408
    if (m_state != ConnectionState::disconnected) {
232✔
409
        m_reconnect_info.scheduled_reset = true;
116✔
410
        m_ping_after_scheduled_reset_of_reconnect_info = false;
232✔
411

116✔
412
        schedule_urgent_ping(); // Throws
232✔
413
        return;
116✔
414
    }
116✔
415
    // Nothing to do in this case. The next reconnect attemp will be made as
116✔
416
    // soon as there are any sessions that are both active and unsuspended.
417
}
116✔
418

3,468✔
419
void Connection::finish_session_deactivation(Session* sess)
3,468✔
420
{
7,744✔
421
    REALM_ASSERT(sess->m_state == Session::Deactivated);
7,744✔
422
    auto ident = sess->m_ident;
7,744✔
423
    m_sessions.erase(ident);
7,744✔
424
    m_session_history.erase(ident);
4,276✔
425
}
4,276✔
426

1,174✔
427
void Connection::force_close()
1,174✔
428
{
1,296✔
429
    if (m_force_closed) {
1,296✔
430
        return;
431
    }
1,174✔
432

433
    m_force_closed = true;
2,470✔
434

1,150✔
435
    if (m_state != ConnectionState::disconnected) {
2,446✔
436
        voluntary_disconnect();
1,284✔
437
    }
2,458✔
438

1,174✔
439
    REALM_ASSERT_EX(m_state == ConnectionState::disconnected, m_state);
1,320✔
440
    if (m_reconnect_delay_in_progress || m_disconnect_delay_in_progress) {
1,320✔
441
        m_reconnect_disconnect_timer.reset();
38✔
442
        m_reconnect_delay_in_progress = false;
38✔
443
        m_disconnect_delay_in_progress = false;
14✔
444
    }
14✔
445

446
    // We must copy any session pointers we want to close to a vector because force_closing
447
    // the session may remove it from m_sessions and invalidate the iterator uses to loop
1,174✔
448
    // through the map. By copying to a separate vector we ensure our iterators remain valid.
1,174✔
449
    std::vector<Session*> to_close;
1,348✔
450
    for (auto& session_pair : m_sessions) {
1,348✔
451
        if (session_pair.second->m_state == Session::State::Active) {
102✔
452
            to_close.push_back(session_pair.second.get());
102✔
453
        }
50✔
454
    }
1,224✔
455

52✔
456
    for (auto& sess : to_close) {
1,348✔
457
        sess->force_close();
50✔
458
    }
1,224✔
459

1,174✔
460
    logger.debug("Force closed idle connection");
1,296✔
461
}
1,296✔
462

463

1,744✔
464
void Connection::websocket_connected_handler(const std::string& protocol)
1,744✔
465
{
3,630✔
466
    if (!protocol.empty()) {
3,630✔
467
        std::string_view expected_prefix =
1,886✔
468
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,630✔
469
        // FIXME: Use std::string_view::begins_with() in C++20.
1,744✔
470
        auto prefix_matches = [&](std::string_view other) {
3,630✔
471
            return protocol.size() >= other.size() && (protocol.substr(0, other.size()) == other);
3,630✔
472
        };
3,630✔
473
        if (prefix_matches(expected_prefix)) {
3,630✔
474
            util::MemoryInputStream in;
3,630✔
475
            in.set_buffer(protocol.data() + expected_prefix.size(), protocol.data() + protocol.size());
3,630✔
476
            in.imbue(std::locale::classic());
3,630✔
477
            in.unsetf(std::ios_base::skipws);
3,630✔
478
            int value_2 = 0;
3,630✔
479
            in >> value_2;
3,630✔
480
            if (in && in.eof() && value_2 >= 0) {
3,630✔
481
                bool good_version =
3,630✔
482
                    (value_2 >= get_oldest_supported_protocol_version() && value_2 <= get_current_protocol_version());
3,630✔
483
                if (good_version) {
1,886✔
484
                    logger.detail("Negotiated protocol version: %1", value_2);
1,886✔
485
                    // For now, grab the connection ID from the websocket if it supports it. In the future, the server
486
                    // will provide the appservices connection ID via a log message.
1,744✔
487
                    // TODO: Remove once the server starts sending the connection ID
1,744✔
488
                    receive_appservices_request_id(m_websocket->get_appservices_request_id());
3,630✔
489
                    m_negotiated_protocol_version = value_2;
3,630✔
490
                    handle_connection_established(); // Throws
3,630✔
491
                    return;
3,630✔
492
                }
3,630✔
493
            }
1,886✔
494
        }
1,886✔
495
        close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed,
×
496
                                        util::format("Bad protocol info from server: '%1'", protocol)},
×
497
                                       IsFatal{true}, ConnectionTerminationReason::bad_headers_in_http_response);
×
498
    }
×
499
    else {
×
500
        close_due_to_client_side_error(
×
501
            {ErrorCodes::SyncProtocolNegotiationFailed, "Missing protocol info from server"}, IsFatal{true},
×
502
            ConnectionTerminationReason::bad_headers_in_http_response);
1,744✔
503
    }
504
}
1,886✔
505

506

37,496✔
507
bool Connection::websocket_binary_message_received(util::Span<const char> data)
37,496✔
508
{
41,930✔
509
    if (m_force_closed) {
41,930✔
510
        logger.debug("Received binary message after connection was force closed");
×
511
        return false;
512
    }
37,496✔
513

37,496✔
514
    using sf = SimulatedFailure;
42,116✔
515
    if (sf::check_trigger(sf::sync_client__read_head)) {
42,116✔
516
        close_due_to_client_side_error(
440✔
517
            {ErrorCodes::RuntimeError, "Simulated failure during sync client websocket read"}, IsFatal{false},
440✔
518
            ConnectionTerminationReason::read_or_write_error);
440✔
519
        return bool(m_websocket);
254✔
520
    }
37,564✔
521

37,310✔
522
    handle_message_received(data);
79,172✔
523
    return bool(m_websocket);
41,676✔
524
}
41,930✔
525

526

416✔
527
void Connection::websocket_error_handler()
416✔
528
{
766✔
529
    m_websocket_error_received = true;
350✔
530
}
350✔
531

470✔
532
bool Connection::websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg)
470✔
533
{
402✔
534
    if (m_force_closed) {
402✔
535
        logger.debug("Received websocket close message after connection was force closed");
×
536
        return false;
470✔
537
    }
538
    logger.info("Closing the websocket with error code=%1, message='%2', was_clean=%3", error_code, msg, was_clean);
872✔
539

30✔
540
    switch (error_code) {
432✔
541
        case WebSocketError::websocket_ok:
30✔
542
            break;
30✔
543
        case WebSocketError::websocket_resolve_failed:
58✔
544
            [[fallthrough]];
58✔
545
        case WebSocketError::websocket_connection_failed: {
112✔
546
            SessionErrorInfo error_info(
56✔
547
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)}, IsFatal{false});
56✔
548
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
56✔
549
            // to make sure the websocket URL is correct
42✔
550
            if (!m_server_endpoint.is_verified) {
98✔
551
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
98✔
552
            }
98✔
553
            involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::connect_operation_failed);
58✔
554
            break;
404✔
555
        }
350✔
556
        case WebSocketError::websocket_read_error:
632✔
557
            [[fallthrough]];
632✔
558
        case WebSocketError::websocket_write_error: {
632✔
559
            close_due_to_transient_error({ErrorCodes::ConnectionClosed, msg},
632✔
560
                                         ConnectionTerminationReason::read_or_write_error);
632✔
561
            break;
284✔
562
        }
284✔
563
        case WebSocketError::websocket_going_away:
✔
564
            [[fallthrough]];
×
565
        case WebSocketError::websocket_protocol_error:
✔
566
            [[fallthrough]];
×
567
        case WebSocketError::websocket_unsupported_data:
✔
568
            [[fallthrough]];
×
569
        case WebSocketError::websocket_invalid_payload_data:
✔
570
            [[fallthrough]];
×
571
        case WebSocketError::websocket_policy_violation:
✔
572
            [[fallthrough]];
×
573
        case WebSocketError::websocket_reserved:
✔
574
            [[fallthrough]];
×
575
        case WebSocketError::websocket_no_status_received:
✔
576
            [[fallthrough]];
×
577
        case WebSocketError::websocket_invalid_extension: {
✔
578
            close_due_to_client_side_error({ErrorCodes::SyncProtocolInvariantFailed, msg}, IsFatal{false},
×
579
                                           ConnectionTerminationReason::websocket_protocol_violation); // Throws
×
580
            break;
2✔
581
        }
2✔
582
        case WebSocketError::websocket_message_too_big: {
4✔
583
            auto message = util::format(
4✔
584
                "Sync websocket closed because the server received a message that was too large: %1", msg);
4✔
585
            SessionErrorInfo error_info(Status(ErrorCodes::LimitExceeded, std::move(message)), IsFatal{false});
4✔
586
            error_info.server_requests_action = ProtocolErrorInfo::Action::ClientReset;
4✔
587
            involuntary_disconnect(std::move(error_info),
4✔
588
                                   ConnectionTerminationReason::websocket_protocol_violation); // Throws
2✔
589
            break;
8✔
590
        }
6✔
591
        case WebSocketError::websocket_tls_handshake_failed: {
10✔
592
            close_due_to_client_side_error(
10✔
593
                Status(ErrorCodes::TlsHandshakeFailed, util::format("TLS handshake failed: %1", msg)), IsFatal{false},
10✔
594
                ConnectionTerminationReason::ssl_certificate_rejected); // Throws
4✔
595
            break;
4✔
596
        }
×
597
        case WebSocketError::websocket_client_too_old:
✔
598
            [[fallthrough]];
×
599
        case WebSocketError::websocket_client_too_new:
✔
600
            [[fallthrough]];
×
601
        case WebSocketError::websocket_protocol_mismatch: {
✔
602
            close_due_to_client_side_error({ErrorCodes::SyncProtocolNegotiationFailed, msg}, IsFatal{true},
×
603
                                           ConnectionTerminationReason::http_response_says_fatal_error); // Throws
×
604
            break;
✔
605
        }
606
        case WebSocketError::websocket_fatal_error: {
2✔
607
            // Error is fatal if the sync_route has already been verified - if the sync_route has not
608
            // been verified, then use a non-fatal error and try to perform a location update.
609
            SessionErrorInfo error_info(
×
610
                {ErrorCodes::SyncConnectFailed, util::format("Failed to connect to sync: %1", msg)},
×
611
                IsFatal{m_server_endpoint.is_verified});
612
            ConnectionTerminationReason reason = ConnectionTerminationReason::http_response_says_fatal_error;
613
            // If the connection fails/times out and the server has not been contacted yet, refresh the location
×
614
            // to make sure the websocket URL is correct
615
            if (!m_server_endpoint.is_verified) {
×
616
                error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
617
                reason = ConnectionTerminationReason::connect_operation_failed;
×
618
            }
×
619
            involuntary_disconnect(std::move(error_info), reason);
×
620
            break;
✔
621
        }
×
622
        case WebSocketError::websocket_forbidden: {
✔
623
            SessionErrorInfo error_info({ErrorCodes::AuthError, msg}, IsFatal{true});
×
624
            error_info.server_requests_action = ProtocolErrorInfo::Action::LogOutUser;
×
625
            involuntary_disconnect(std::move(error_info),
×
626
                                   ConnectionTerminationReason::http_response_says_fatal_error);
×
627
            break;
22✔
628
        }
22✔
629
        case WebSocketError::websocket_unauthorized: {
44✔
630
            SessionErrorInfo error_info(
44✔
631
                {ErrorCodes::AuthError,
44✔
632
                 util::format("Websocket was closed because of an authentication issue: %1", msg)},
44✔
633
                IsFatal{false});
44✔
634
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
44✔
635
            involuntary_disconnect(std::move(error_info),
44✔
636
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
22✔
637
            break;
28✔
638
        }
6✔
639
        case WebSocketError::websocket_moved_permanently: {
12✔
640
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
12✔
641
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
12✔
642
            involuntary_disconnect(std::move(error_info),
12✔
643
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
6✔
644
            break;
6✔
645
        }
×
646
        case WebSocketError::websocket_abnormal_closure: {
✔
647
            SessionErrorInfo error_info({ErrorCodes::ConnectionClosed, msg}, IsFatal{false});
×
648
            error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshUser;
×
649
            involuntary_disconnect(std::move(error_info),
×
650
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
651
            break;
✔
652
        }
×
653
        case WebSocketError::websocket_internal_server_error:
✔
654
            [[fallthrough]];
×
655
        case WebSocketError::websocket_retry_error: {
✔
656
            involuntary_disconnect(SessionErrorInfo({ErrorCodes::ConnectionClosed, msg}, IsFatal{false}),
×
657
                                   ConnectionTerminationReason::http_response_says_nonfatal_error);
×
658
            break;
470✔
659
        }
660
    }
872✔
661

470✔
662
    return bool(m_websocket);
402✔
663
}
402✔
664

665
// Guarantees that handle_reconnect_wait() is never called from within the
666
// execution of initiate_reconnect_wait() (no callback reentrance).
4,188✔
667
void Connection::initiate_reconnect_wait()
4,188✔
668
{
8,654✔
669
    REALM_ASSERT(m_activated);
8,654✔
670
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,466✔
671
    REALM_ASSERT(!m_disconnect_delay_in_progress);
4,466✔
672

4,188✔
673
    // If we've been force closed then we don't need/want to reconnect. Just return early here.
1,150✔
674
    if (m_force_closed) {
5,616✔
675
        return;
1,286✔
676
    }
4,324✔
677

3,038✔
678
    m_reconnect_delay_in_progress = true;
6,218✔
679
    auto delay = m_reconnect_info.delay_interval();
3,706✔
680
    if (delay == std::chrono::milliseconds::max()) {
3,180✔
681
        logger.detail("Reconnection delayed indefinitely"); // Throws
1,058✔
682
        // Not actually starting a timer corresponds to an infinite wait
526✔
683
        m_nonzero_reconnect_delay = true;
1,058✔
684
        return;
532✔
685
    }
3,044✔
686

2,340✔
687
    if (delay == std::chrono::milliseconds::zero()) {
4,988✔
688
        m_nonzero_reconnect_delay = false;
2,644✔
689
    }
2,644✔
690
    else {
348✔
691
        logger.detail("Allowing reconnection in %1 milliseconds", delay.count()); // Throws
348✔
692
        m_nonzero_reconnect_delay = true;
176✔
693
    }
176✔
694

695
    // We create a timer for the reconnect_disconnect timer even if the delay is zero because
696
    // we need it to be cancelable in case the connection is terminated before the timer
2,512✔
697
    // callback is run.
698
    m_reconnect_disconnect_timer = m_client.create_timer(delay, [this](Status status) {
2,652✔
699
        // If the operation is aborted, the connection object may have been
2,510✔
700
        // destroyed.
1,844✔
701
        if (status != ErrorCodes::OperationAborted)
5,162✔
702
            handle_reconnect_wait(status); // Throws
4,496✔
703
    });                                    // Throws
2,652✔
704
}
2,648✔
705

706

1,844✔
707
void Connection::handle_reconnect_wait(Status status)
1,844✔
708
{
1,984!
709
    if (!status.is_ok()) {
1,984✔
710
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
711
        throw Exception(status);
712
    }
1,844✔
713

1,844✔
714
    REALM_ASSERT(m_reconnect_delay_in_progress);
1,984✔
715
    m_reconnect_delay_in_progress = false;
3,828✔
716

1,838✔
717
    if (m_num_active_unsuspended_sessions > 0)
3,828✔
718
        initiate_reconnect(); // Throws
1,982✔
719
}
1,984✔
720

721
struct Connection::WebSocketObserverShim : public sync::WebSocketObserver {
1,846✔
722
    explicit WebSocketObserverShim(Connection* conn)
1,846✔
723
        : conn(conn)
1,846✔
724
        , sentinel(conn->m_websocket_sentinel)
1,846✔
725
    {
1,988✔
726
    }
1,988✔
727

728
    Connection* conn;
729
    util::bind_ptr<LifecycleSentinel> sentinel;
730

1,744✔
731
    void websocket_connected_handler(const std::string& protocol) override
1,744✔
732
    {
1,886✔
733
        if (sentinel->destroyed) {
1,886✔
734
            return;
735
        }
1,744✔
736

1,744✔
737
        return conn->websocket_connected_handler(protocol);
1,886✔
738
    }
1,886✔
739

416✔
740
    void websocket_error_handler() override
416✔
741
    {
350✔
742
        if (sentinel->destroyed) {
350✔
743
            return;
744
        }
416✔
745

416✔
746
        conn->websocket_error_handler();
350✔
747
    }
350✔
748

37,496✔
749
    bool websocket_binary_message_received(util::Span<const char> data) override
37,496✔
750
    {
41,930✔
751
        if (sentinel->destroyed) {
41,930✔
752
            return false;
753
        }
37,496✔
754

37,496✔
755
        return conn->websocket_binary_message_received(data);
41,930✔
756
    }
41,930✔
757

470✔
758
    bool websocket_closed_handler(bool was_clean, WebSocketError error_code, std::string_view msg) override
470✔
759
    {
402✔
760
        if (sentinel->destroyed) {
402✔
761
            return true;
762
        }
470✔
763

470✔
764
        return conn->websocket_closed_handler(was_clean, error_code, msg);
402✔
765
    }
402✔
766
};
767

1,844✔
768
void Connection::initiate_reconnect()
1,844✔
769
{
1,984✔
770
    REALM_ASSERT(m_activated);
3,828✔
771

1,844✔
772
    m_state = ConnectionState::connecting;
3,828✔
773
    report_connection_state_change(ConnectionState::connecting); // Throws
1,984✔
774
    if (m_websocket_sentinel) {
1,984✔
775
        m_websocket_sentinel->destroyed = true;
1,844✔
776
    }
1,844✔
777
    m_websocket_sentinel = util::make_bind<LifecycleSentinel>();
1,984✔
778
    m_websocket.reset();
1,984✔
779

1,844✔
780
    // Watchdog
781
    initiate_connect_wait(); // Throws
3,828✔
782

1,844✔
783
    std::vector<std::string> sec_websocket_protocol;
3,828✔
784
    {
3,828✔
785
        auto protocol_prefix =
3,828✔
786
            is_flx_sync_connection() ? get_flx_websocket_protocol_prefix() : get_pbs_websocket_protocol_prefix();
3,828✔
787
        int min = get_oldest_supported_protocol_version();
3,828✔
788
        int max = get_current_protocol_version();
1,984✔
789
        REALM_ASSERT_3(min, <=, max);
1,984✔
790
        // List protocol version in descending order to ensure that the server
25,834✔
791
        // selects the highest possible version.
23,990✔
792
        for (int version = max; version >= min; --version) {
49,822✔
793
            sec_websocket_protocol.push_back(util::format("%1%2", protocol_prefix, version)); // Throws
25,692✔
794
        }
23,848✔
795
    }
3,828✔
796

1,844✔
797
    logger.info("Connecting to '%1%2:%3%4'", to_string(m_server_endpoint.envelope), m_server_endpoint.address,
1,984✔
798
                m_server_endpoint.port, m_http_request_path_prefix);
3,828✔
799

1,844✔
800
    m_websocket_error_received = false;
3,828✔
801
    m_websocket =
3,828✔
802
        m_client.m_socket_provider->connect(std::make_unique<WebSocketObserverShim>(this),
3,828✔
803
                                            WebSocketEndpoint{
3,828✔
804
                                                m_server_endpoint.address,
3,828✔
805
                                                m_server_endpoint.port,
3,828✔
806
                                                get_http_request_path(),
3,828✔
807
                                                std::move(sec_websocket_protocol),
1,984✔
808
                                                is_ssl(m_server_endpoint.envelope),
3,828✔
809
                                                /// DEPRECATED - The following will be removed in a future release
1,844✔
810
                                                {m_custom_http_headers.begin(), m_custom_http_headers.end()},
3,828✔
811
                                                m_verify_servers_ssl_certificate,
3,828✔
812
                                                m_ssl_trust_certificate_path,
3,828✔
813
                                                m_ssl_verify_callback,
3,828✔
814
                                                m_proxy_config,
3,828✔
815
                                            });
1,984✔
816
}
1,984✔
817

818

1,846✔
819
void Connection::initiate_connect_wait()
820
{
1,988✔
821
    // Deploy a watchdog to enforce an upper bound on the time it can take to
822
    // fully establish the connection (including SSL and WebSocket
823
    // handshakes). Without such a watchdog, connect operations could take very
1,846✔
824
    // long, or even indefinite time.
825
    milliseconds_type time = m_client.m_connect_timeout;
3,834✔
826

827
    m_connect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
1,988✔
828
        // If the operation is aborted, the connection object may have been
1,846✔
829
        // destroyed.
830
        if (status != ErrorCodes::OperationAborted)
3,834✔
831
            handle_connect_wait(status); // Throws
1,846✔
832
    });                                  // Throws
1,988✔
833
}
1,988✔
834

835

836
void Connection::handle_connect_wait(Status status)
×
837
{
×
838
    if (!status.is_ok()) {
×
839
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
840
        throw Exception(status);
841
    }
×
842

843
    REALM_ASSERT_EX(m_state == ConnectionState::connecting, m_state);
×
844
    logger.info("Connect timeout"); // Throws
×
845
    SessionErrorInfo error_info({ErrorCodes::SyncConnectTimeout, "Sync connection was not fully established in time"},
846
                                IsFatal{false});
847
    // If the connection fails/times out and the server has not been contacted yet, refresh the location
×
848
    // to make sure the websocket URL is correct
849
    if (!m_server_endpoint.is_verified) {
×
850
        error_info.server_requests_action = ProtocolErrorInfo::Action::RefreshLocation;
×
851
    }
×
852
    involuntary_disconnect(std::move(error_info), ConnectionTerminationReason::sync_connect_timeout); // Throws
853
}
854

855

1,744✔
856
void Connection::handle_connection_established()
857
{
3,630✔
858
    // Cancel connect timeout watchdog
859
    m_connect_timer.reset();
3,630✔
860

1,744✔
861
    m_state = ConnectionState::connected;
1,886✔
862
    m_server_endpoint.is_verified = true; // sync route is valid since connection is successful
3,630✔
863

1,744✔
864
    milliseconds_type now = monotonic_clock_now();
3,630✔
865
    m_pong_wait_started_at = now; // Initially, no time was spent waiting for a PONG message
1,886✔
866
    initiate_ping_delay(now);     // Throws
3,630✔
867

1,744✔
868
    bool fast_reconnect = false;
2,426✔
869
    if (m_disconnect_has_occurred) {
2,426✔
870
        milliseconds_type time = now - m_disconnect_time;
1,086✔
871
        if (time <= m_client.m_fast_reconnect_limit)
1,086✔
872
            fast_reconnect = true;
546✔
873
    }
2,880✔
874

2,334✔
875
    for (auto& p : m_sessions) {
4,806✔
876
        Session& sess = *p.second;
4,806✔
877
        sess.connection_established(fast_reconnect); // Throws
2,472✔
878
    }
4,216✔
879

1,744✔
880
    report_connection_state_change(ConnectionState::connected); // Throws
1,886✔
881
}
1,886✔
882

883

116✔
884
void Connection::schedule_urgent_ping()
116✔
885
{
232✔
886
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
132✔
887
    if (m_ping_delay_in_progress) {
132✔
888
        m_heartbeat_timer.reset();
116✔
889
        m_ping_delay_in_progress = false;
116✔
890
        m_minimize_next_ping_delay = true;
116✔
891
        milliseconds_type now = monotonic_clock_now();
116✔
892
        initiate_ping_delay(now); // Throws
116✔
893
        return;
200✔
894
    }
200✔
895
    REALM_ASSERT_EX(m_state == ConnectionState::connecting || m_waiting_for_pong, m_state);
116✔
896
    if (!m_send_ping)
116✔
897
        m_minimize_next_ping_delay = true;
16✔
898
}
16✔
899

900

1,810✔
901
void Connection::initiate_ping_delay(milliseconds_type now)
1,810✔
902
{
3,892✔
903
    REALM_ASSERT(!m_ping_delay_in_progress);
3,892✔
904
    REALM_ASSERT(!m_waiting_for_pong);
2,082✔
905
    REALM_ASSERT(!m_send_ping);
3,892✔
906

1,810✔
907
    milliseconds_type delay = 0;
3,868✔
908
    if (!m_minimize_next_ping_delay) {
2,082✔
909
        delay = m_client.m_ping_keepalive_period;
1,976✔
910
        // Make a randomized deduction of up to 10%, or up to 100% if this is
911
        // the first PING message to be sent since the connection was
912
        // established. The purpose of this randomized deduction is to reduce
913
        // the risk of many connections sending PING messages simultaneously to
1,786✔
914
        // the server.
1,786✔
915
        milliseconds_type max_deduction = (m_ping_sent ? delay / 10 : delay);
3,762✔
916
        auto distr = std::uniform_int_distribution<milliseconds_type>(0, max_deduction);
3,762✔
917
        milliseconds_type randomized_deduction = distr(m_client.get_random());
1,976✔
918
        delay -= randomized_deduction;
3,762✔
919
        // Deduct the time spent waiting for PONG
1,786✔
920
        REALM_ASSERT_3(now, >=, m_pong_wait_started_at);
3,762✔
921
        milliseconds_type spent_time = now - m_pong_wait_started_at;
3,758✔
922
        if (spent_time < delay) {
3,758✔
923
            delay -= spent_time;
1,976✔
924
        }
1,976✔
925
        else {
8✔
926
            delay = 0;
1,790✔
927
        }
28✔
928
    }
2,000✔
929
    else {
130✔
930
        m_minimize_next_ping_delay = false;
106✔
931
    }
106✔
932

1,810✔
933

934
    m_ping_delay_in_progress = true;
3,892✔
935

1,810✔
936
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(delay), [this](Status status) {
3,832✔
937
        if (status == ErrorCodes::OperationAborted)
2,138✔
938
            return;
1,976✔
939
        else if (!status.is_ok())
102✔
940
            throw Exception(status);
60✔
941

60✔
942
        handle_ping_delay();                                    // Throws
1,912✔
943
    });                                                         // Throws
1,912✔
944
    logger.debug("Will emit a ping in %1 milliseconds", delay); // Throws
2,082✔
945
}
2,082✔
946

947

60✔
948
void Connection::handle_ping_delay()
60✔
949
{
166✔
950
    REALM_ASSERT(m_ping_delay_in_progress);
166✔
951
    m_ping_delay_in_progress = false;
106✔
952
    m_send_ping = true;
166✔
953

954
    initiate_pong_timeout(); // Throws
166✔
955

34✔
956
    if (m_state == ConnectionState::connected && !m_sending)
166✔
957
        send_next_message(); // Throws
80✔
958
}
106✔
959

960

60✔
961
void Connection::initiate_pong_timeout()
60✔
962
{
166✔
963
    REALM_ASSERT(!m_ping_delay_in_progress);
166✔
964
    REALM_ASSERT(!m_waiting_for_pong);
106✔
965
    REALM_ASSERT(m_send_ping);
166✔
966

60✔
967
    m_waiting_for_pong = true;
106✔
968
    m_pong_wait_started_at = monotonic_clock_now();
166✔
969

60✔
970
    milliseconds_type time = m_client.m_pong_keepalive_timeout;
166✔
971
    m_heartbeat_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
160✔
972
        if (status == ErrorCodes::OperationAborted)
112✔
973
            return;
100✔
974
        else if (!status.is_ok())
6✔
975
            throw Exception(status);
6✔
976

6✔
977
        handle_pong_timeout(); // Throws
66✔
978
    });                        // Throws
6✔
979
}
106✔
980

981

6✔
982
void Connection::handle_pong_timeout()
6✔
983
{
12✔
984
    REALM_ASSERT(m_waiting_for_pong);
12✔
985
    logger.debug("Timeout on reception of PONG message"); // Throws
12✔
986
    close_due_to_transient_error({ErrorCodes::ConnectionClosed, "Timed out waiting for PONG response from server"},
12✔
987
                                 ConnectionTerminationReason::pong_timeout);
6✔
988
}
6✔
989

990

50,278✔
991
void Connection::initiate_write_message(const OutputBuffer& out, Session* sess)
992
{
99,756✔
993
    // Stop sending messages if an websocket error was received.
994
    if (m_websocket_error_received)
49,478✔
995
        return;
50,278✔
996

50,278✔
997
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
50,174✔
998
        if (sentinel->destroyed) {
50,108✔
999
            return;
50,312✔
1000
        }
730!
1001
        if (!status.is_ok()) {
48,682✔
1002
            if (status != ErrorCodes::Error::OperationAborted) {
×
1003
                // Write errors will be handled by the websocket_write_error_handler() callback
1004
                logger.error("Connection: write failed %1: %2", status.code_string(), status.reason());
×
1005
            }
×
1006
            return;
49,582✔
1007
        }
49,582✔
1008
        handle_write_message(); // Throws
98,960✔
1009
    });                         // Throws
98,960✔
1010
    m_sending_session = sess;
99,756✔
1011
    m_sending = true;
49,478✔
1012
}
49,478✔
1013

1014

49,582✔
1015
void Connection::handle_write_message()
49,582✔
1016
{
98,262✔
1017
    m_sending_session->message_sent(); // Throws
48,738✔
1018
    if (m_sending_session->m_state == Session::Deactivated) {
48,738✔
1019
        finish_session_deactivation(m_sending_session);
49,650✔
1020
    }
49,650✔
1021
    m_sending_session = nullptr;
98,262✔
1022
    m_sending = false;
98,262✔
1023
    send_next_message(); // Throws
48,680✔
1024
}
48,680✔
1025

1026

84,928✔
1027
void Connection::send_next_message()
84,928✔
1028
{
164,208✔
1029
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
164,208✔
1030
    REALM_ASSERT(!m_sending_session);
164,208✔
1031
    REALM_ASSERT(!m_sending);
79,334✔
1032
    if (m_send_ping) {
79,334✔
1033
        send_ping(); // Throws
154✔
1034
        return;
119,614✔
1035
    }
100✔
1036
    while (!m_sessions_enlisted_to_send.empty()) {
111,686✔
1037
        // The state of being connected is not supposed to be able to change
1038
        // across this loop thanks to the "no callback reentrance" guarantee
85,048✔
1039
        // provided by Websocket::async_write_text(), and friends.
1040
        REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
167,162✔
1041

85,048✔
1042
        Session& sess = *m_sessions_enlisted_to_send.front();
167,162✔
1043
        m_sessions_enlisted_to_send.pop_front();
82,114✔
1044
        sess.send_message(); // Throws
167,162✔
1045

1,034✔
1046
        if (sess.m_state == Session::Deactivated) {
83,148✔
1047
            finish_session_deactivation(&sess);
1,884✔
1048
        }
1,884✔
1049

1050
        // An enlisted session may choose to not send a message. In that case,
85,048✔
1051
        // we should pass the opportunity to the next enlisted session.
50,408✔
1052
        if (m_sending)
167,162✔
1053
            break;
134,482✔
1054
    }
82,114✔
1055
}
79,180✔
1056

1057

54✔
1058
void Connection::send_ping()
54✔
1059
{
154✔
1060
    REALM_ASSERT(!m_ping_delay_in_progress);
154✔
1061
    REALM_ASSERT(m_waiting_for_pong);
100✔
1062
    REALM_ASSERT(m_send_ping);
154✔
1063

54✔
1064
    m_send_ping = false;
122✔
1065
    if (m_reconnect_info.scheduled_reset)
100✔
1066
        m_ping_after_scheduled_reset_of_reconnect_info = true;
146✔
1067

54✔
1068
    m_last_ping_sent_at = monotonic_clock_now();
154✔
1069
    logger.debug("Sending: PING(timestamp=%1, rtt=%2)", m_last_ping_sent_at,
100✔
1070
                 m_previous_ping_rtt); // Throws
154✔
1071

54✔
1072
    ClientProtocol& protocol = get_client_protocol();
154✔
1073
    OutputBuffer& out = get_output_buffer();
154✔
1074
    protocol.make_ping(out, m_last_ping_sent_at, m_previous_ping_rtt); // Throws
154✔
1075
    initiate_write_ping(out);                                          // Throws
154✔
1076
    m_ping_sent = true;
100✔
1077
}
100✔
1078

1079

54✔
1080
void Connection::initiate_write_ping(const OutputBuffer& out)
54✔
1081
{
154✔
1082
    m_websocket->async_write_binary(out.as_span(), [this, sentinel = m_websocket_sentinel](Status status) {
102✔
1083
        if (sentinel->destroyed) {
102✔
1084
            return;
52✔
UNCOV
1085
        }
×
1086
        if (!status.is_ok()) {
100✔
1087
            if (status != ErrorCodes::Error::OperationAborted) {
×
1088
                // Write errors will be handled by the websocket_write_error_handler() callback
1089
                logger.error("Connection: send ping failed %1: %2", status.code_string(), status.reason());
×
1090
            }
×
1091
            return;
52✔
1092
        }
52✔
1093
        handle_write_ping(); // Throws
154✔
1094
    });                      // Throws
154✔
1095
    m_sending = true;
100✔
1096
}
100✔
1097

1098

52✔
1099
void Connection::handle_write_ping()
52✔
1100
{
152✔
1101
    REALM_ASSERT(m_sending);
152✔
1102
    REALM_ASSERT(!m_sending_session);
152✔
1103
    m_sending = false;
152✔
1104
    send_next_message(); // Throws
100✔
1105
}
100✔
1106

1107

37,310✔
1108
void Connection::handle_message_received(util::Span<const char> data)
1109
{
41,676✔
1110
    // parse_message_received() parses the message and calls the proper handler
37,310✔
1111
    // on the Connection object (this).
37,310✔
1112
    get_client_protocol().parse_message_received<Connection>(*this, std::string_view(data.data(), data.size()));
41,676✔
1113
}
41,676✔
1114

1115

2,076✔
1116
void Connection::initiate_disconnect_wait()
2,076✔
1117
{
2,306✔
1118
    REALM_ASSERT(!m_reconnect_delay_in_progress);
4,382✔
1119

882✔
1120
    if (m_disconnect_delay_in_progress) {
3,188✔
1121
        m_reconnect_disconnect_timer.reset();
1,860✔
1122
        m_disconnect_delay_in_progress = false;
978✔
1123
    }
3,054✔
1124

1125
    milliseconds_type time = m_client.m_connection_linger_time;
4,382✔
1126

1127
    m_reconnect_disconnect_timer = m_client.create_timer(std::chrono::milliseconds(time), [this](Status status) {
2,306✔
1128
        // If the operation is aborted, the connection object may have been
2,076✔
1129
        // destroyed.
6✔
1130
        if (status != ErrorCodes::OperationAborted)
4,380✔
1131
            handle_disconnect_wait(status); // Throws
2,082✔
1132
    });                                     // Throws
4,380✔
1133
    m_disconnect_delay_in_progress = true;
2,306✔
1134
}
2,306✔
1135

1136

6✔
1137
void Connection::handle_disconnect_wait(Status status)
6✔
1138
{
6!
1139
    if (!status.is_ok()) {
6✔
1140
        REALM_ASSERT(status != ErrorCodes::OperationAborted);
×
1141
        throw Exception(status);
1142
    }
6✔
1143

1144
    m_disconnect_delay_in_progress = false;
12✔
1145

6✔
1146
    REALM_ASSERT_EX(m_state != ConnectionState::disconnected, m_state);
12✔
1147
    if (m_num_active_unsuspended_sessions == 0) {
6✔
1148
        if (m_client.m_connection_linger_time > 0)
12✔
1149
            logger.detail("Linger time expired"); // Throws
6✔
1150
        voluntary_disconnect();                   // Throws
12✔
1151
        logger.info("Disconnected");              // Throws
12✔
1152
    }
6✔
1153
}
6✔
1154

1155

8✔
1156
void Connection::close_due_to_protocol_error(Status status)
8✔
1157
{
16✔
1158
    SessionErrorInfo error_info(std::move(status), IsFatal{true});
16✔
1159
    error_info.server_requests_action = ProtocolErrorInfo::Action::ProtocolViolation;
16✔
1160
    involuntary_disconnect(std::move(error_info),
16✔
1161
                           ConnectionTerminationReason::sync_protocol_violation); // Throws
8✔
1162
}
8✔
1163

1164

192✔
1165
void Connection::close_due_to_client_side_error(Status status, IsFatal is_fatal, ConnectionTerminationReason reason)
192✔
1166
{
258✔
1167
    logger.info("Connection closed due to error: %1", status); // Throws
450✔
1168

192✔
1169
    involuntary_disconnect(SessionErrorInfo{std::move(status), is_fatal}, reason); // Throw
258✔
1170
}
258✔
1171

1172

354✔
1173
void Connection::close_due_to_transient_error(Status status, ConnectionTerminationReason reason)
354✔
1174
{
644✔
1175
    logger.info("Connection closed due to transient error: %1", status); // Throws
644✔
1176
    SessionErrorInfo error_info{std::move(status), IsFatal{false}};
290✔
1177
    error_info.server_requests_action = ProtocolErrorInfo::Action::Transient;
644✔
1178

354✔
1179
    involuntary_disconnect(std::move(error_info), reason); // Throw
290✔
1180
}
290✔
1181

1182

1183
// Close connection due to error discovered on the server-side, and then
1184
// reported to the client by way of a connection-level ERROR message.
32✔
1185
void Connection::close_due_to_server_side_error(ProtocolError error_code, const ProtocolErrorInfo& info)
32✔
1186
{
68✔
1187
    logger.info("Connection closed due to error reported by server: %1 (%2)", info.message,
36✔
1188
                int(error_code)); // Throws
68✔
1189

32✔
1190
    const auto reason = info.is_fatal ? ConnectionTerminationReason::server_said_do_not_reconnect
68✔
1191
                                      : ConnectionTerminationReason::server_said_try_again_later;
68✔
1192
    involuntary_disconnect(SessionErrorInfo{info, protocol_error_to_status(error_code, info.message)},
68✔
1193
                           reason); // Throws
36✔
1194
}
36✔
1195

1196

1,846✔
1197
void Connection::disconnect(const SessionErrorInfo& info)
1198
{
3,834✔
1199
    // Cancel connect timeout watchdog
1200
    m_connect_timer.reset();
3,834✔
1201

1,744✔
1202
    if (m_state == ConnectionState::connected) {
3,732✔
1203
        m_disconnect_time = monotonic_clock_now();
1,880✔
1204
        m_disconnect_has_occurred = true;
1,880✔
1205

1206
        // Sessions that are in the Deactivating state at this time can be
1207
        // immediately discarded, in part because they are no longer enlisted to
1208
        // send. Such sessions will be taken to the Deactivated state by
1209
        // Session::connection_lost(), and then they will be removed from
1,744✔
1210
        // `m_sessions`.
4,270✔
1211
        auto i = m_sessions.begin(), end = m_sessions.end();
1,880✔
1212
        while (i != end) {
6,470✔
1213
            // Prevent invalidation of the main iterator when erasing elements
2,526✔
1214
            auto j = i++;
4,590✔
1215
            Session& sess = *j->second;
4,590✔
1216
            sess.connection_lost(); // Throws
3,464✔
1217
            if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated)
4,594✔
1218
                m_sessions.erase(j);
2,672✔
1219
        }
2,064✔
1220
    }
3,726✔
1221

1222
    change_state_to_disconnected();
3,834✔
1223

1,846✔
1224
    m_ping_delay_in_progress = false;
3,834✔
1225
    m_waiting_for_pong = false;
3,834✔
1226
    m_send_ping = false;
3,834✔
1227
    m_minimize_next_ping_delay = false;
3,834✔
1228
    m_ping_after_scheduled_reset_of_reconnect_info = false;
3,834✔
1229
    m_ping_sent = false;
3,834✔
1230
    m_heartbeat_timer.reset();
1,988✔
1231
    m_previous_ping_rtt = 0;
3,834✔
1232

1,846✔
1233
    m_websocket_sentinel->destroyed = true;
3,834✔
1234
    m_websocket_sentinel.reset();
3,834✔
1235
    m_websocket.reset();
3,834✔
1236
    m_input_body_buffer.reset();
3,834✔
1237
    m_sending_session = nullptr;
3,834✔
1238
    m_sessions_enlisted_to_send.clear();
1,988✔
1239
    m_sending = false;
3,834✔
1240

1,846✔
1241
    report_connection_state_change(ConnectionState::disconnected, info); // Throws
3,834✔
1242
    initiate_reconnect_wait();                                           // Throws
1,988✔
1243
}
1,988✔
1244

52,484✔
1245
bool Connection::is_flx_sync_connection() const noexcept
52,484✔
1246
{
113,026✔
1247
    return m_server_endpoint.server_mode != SyncServerMode::PBS;
60,542✔
1248
}
60,542✔
1249

50✔
1250
void Connection::receive_pong(milliseconds_type timestamp)
50✔
1251
{
96✔
1252
    logger.debug("Received: PONG(timestamp=%1)", timestamp);
146✔
1253

50✔
1254
    bool legal_at_this_time = (m_waiting_for_pong && !m_send_ping);
96✔
1255
    if (REALM_UNLIKELY(!legal_at_this_time)) {
96✔
1256
        close_due_to_protocol_error(
×
1257
            {ErrorCodes::SyncProtocolInvariantFailed, "Received PONG message when it was not valid"}); // Throws
×
1258
        return;
1259
    }
50✔
1260

1261
    if (REALM_UNLIKELY(timestamp != m_last_ping_sent_at)) {
96✔
1262
        close_due_to_protocol_error(
×
1263
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1264
             util::format("Received PONG message with an invalid timestamp (expected %1, received %2)",
×
1265
                          m_last_ping_sent_at, timestamp)}); // Throws
×
1266
        return;
1267
    }
50✔
1268

50✔
1269
    milliseconds_type now = monotonic_clock_now();
146✔
1270
    milliseconds_type round_trip_time = now - timestamp;
146✔
1271
    logger.debug("Round trip time was %1 milliseconds", round_trip_time);
96✔
1272
    m_previous_ping_rtt = round_trip_time;
96✔
1273

1274
    // If this PONG message is a response to a PING mesage that was sent after
1275
    // the last invocation of cancel_reconnect_delay(), then the connection is
50✔
1276
    // still good, and we do not have to skip the next reconnect delay.
16✔
1277
    if (m_ping_after_scheduled_reset_of_reconnect_info) {
112✔
1278
        REALM_ASSERT(m_reconnect_info.scheduled_reset);
102✔
1279
        m_ping_after_scheduled_reset_of_reconnect_info = false;
102✔
1280
        m_reconnect_info.scheduled_reset = false;
86✔
1281
    }
136✔
1282

50✔
1283
    m_heartbeat_timer.reset();
96✔
1284
    m_waiting_for_pong = false;
146✔
1285

1286
    initiate_ping_delay(now); // Throws
146✔
1287

1288
    if (m_client.m_roundtrip_time_handler)
146✔
1289
        m_client.m_roundtrip_time_handler(m_previous_ping_rtt); // Throws
1290
}
96✔
1291

34,428✔
1292
Session* Connection::find_and_validate_session(session_ident_type session_ident, std::string_view message) noexcept
34,428✔
1293
{
38,320✔
1294
    if (session_ident == 0) {
38,320✔
1295
        return nullptr;
1296
    }
34,428✔
1297

34,430✔
1298
    auto* sess = get_session(session_ident);
72,750✔
1299
    if (REALM_LIKELY(sess)) {
72,750✔
1300
        return sess;
38,318✔
1301
    }
2,147,521,965✔
1302
    // Check the history to see if the message received was for a previous session
1303
    if (auto it = m_session_history.find(session_ident); it == m_session_history.end()) {
2✔
1304
        logger.error("Bad session identifier in %1 message, session_ident = %2", message, session_ident);
×
1305
        close_due_to_protocol_error(
×
1306
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1307
             util::format("Received message %1 for session iden %2 when that session never existed", message,
×
1308
                          session_ident)});
2,147,483,647✔
1309
    }
2,147,483,647✔
1310
    else {
2,147,483,649✔
1311
        logger.error("Received %1 message for closed session, session_ident = %2", message,
2,147,483,649✔
1312
                     session_ident); // Throws
2,147,483,649✔
1313
    }
34,430✔
1314
    return nullptr;
2✔
1315
}
38,320✔
1316

396✔
1317
void Connection::receive_error_message(const ProtocolErrorInfo& info, session_ident_type session_ident)
396✔
1318
{
784✔
1319
    Session* sess = nullptr;
750✔
1320
    if (session_ident != 0) {
750✔
1321
        sess = find_and_validate_session(session_ident, "ERROR");
350✔
1322
        if (REALM_UNLIKELY(!sess)) {
350✔
1323
            return;
362✔
1324
        }
×
1325
        if (auto status = sess->receive_error_message(info); !status.is_ok()) {
350✔
1326
            close_due_to_protocol_error(std::move(status)); // Throws
×
1327
            return;
1328
        }
362✔
1329

1330
        if (sess->m_state == Session::Deactivated) {
350✔
1331
            finish_session_deactivation(sess);
362✔
1332
        }
362✔
1333
        return;
350✔
1334
    }
384✔
1335

34✔
1336
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, session_ident=%4, error_action=%5)",
72✔
1337
                info.message, info.raw_error_code, info.is_fatal, session_ident,
38✔
1338
                info.server_requests_action); // Throws
72✔
1339

34✔
1340
    bool known_error_code = bool(get_protocol_error_message(info.raw_error_code));
70✔
1341
    if (REALM_LIKELY(known_error_code)) {
70✔
1342
        ProtocolError error_code = ProtocolError(info.raw_error_code);
68✔
1343
        if (REALM_LIKELY(!is_session_level_error(error_code))) {
68✔
1344
            close_due_to_server_side_error(error_code, info); // Throws
68✔
1345
            return;
36✔
1346
        }
36✔
1347
        close_due_to_protocol_error(
×
1348
            {ErrorCodes::SyncProtocolInvariantFailed,
×
1349
             util::format("Received ERROR message with a non-connection-level error code %1 without a session ident",
×
1350
                          info.raw_error_code)});
2✔
1351
    }
2✔
1352
    else {
4✔
1353
        close_due_to_protocol_error(
4✔
1354
            {ErrorCodes::SyncProtocolInvariantFailed,
4✔
1355
             util::format("Received ERROR message with unknown error code %1", info.raw_error_code)});
36✔
1356
    }
2✔
1357
}
38✔
1358

1359

1360
void Connection::receive_query_error_message(int raw_error_code, std::string_view message, int64_t query_version,
10✔
1361
                                             session_ident_type session_ident)
10✔
1362
{
10✔
1363
    if (session_ident == 0) {
10✔
1364
        return close_due_to_protocol_error(
×
1365
            {ErrorCodes::SyncProtocolInvariantFailed, "Received query error message for session ident 0"});
1366
    }
10✔
1367

1368
    if (!is_flx_sync_connection()) {
10✔
1369
        return close_due_to_protocol_error({ErrorCodes::SyncProtocolInvariantFailed,
×
1370
                                            "Received a FLX query error message on a non-FLX sync connection"});
1371
    }
10✔
1372

10✔
1373
    Session* sess = find_and_validate_session(session_ident, "QUERY_ERROR");
20✔
1374
    if (REALM_UNLIKELY(!sess)) {
20✔
1375
        return;
1376
    }
1377

1378
    if (auto status = sess->receive_query_error_message(raw_error_code, message, query_version); !status.is_ok()) {
1,778✔
1379
        close_due_to_protocol_error(std::move(status));
1,768✔
1380
    }
1,768✔
1381
}
10✔
1382

1383

1384
void Connection::receive_ident_message(session_ident_type session_ident, SaltedFileIdent client_file_ident)
1,768✔
1385
{
1,998✔
1386
    Session* sess = find_and_validate_session(session_ident, "IDENT");
3,766✔
1387
    if (REALM_UNLIKELY(!sess)) {
1,998✔
1388
        return;
1389
    }
22,186✔
1390

22,186✔
1391
    if (auto status = sess->receive_ident_message(client_file_ident); !status.is_ok())
24,184✔
1392
        close_due_to_protocol_error(std::move(status)); // Throws
×
1393
}
1,998✔
1394

1395
void Connection::receive_download_message(session_ident_type session_ident, const DownloadMessage& message)
22,186✔
1396
{
25,844✔
1397
    Session* sess = find_and_validate_session(session_ident, "DOWNLOAD");
25,844✔
1398
    if (REALM_UNLIKELY(!sess)) {
48,030✔
1399
        return;
1400
    }
1401

8,124✔
1402
    if (auto status = sess->receive_download_message(message); !status.is_ok()) {
33,968✔
1403
        close_due_to_protocol_error(std::move(status));
8,126✔
1404
    }
2✔
1405
}
25,844✔
1406

1407
void Connection::receive_mark_message(session_ident_type session_ident, request_ident_type request_ident)
8,124✔
1408
{
8,246✔
1409
    Session* sess = find_and_validate_session(session_ident, "MARK");
16,364✔
1410
    if (REALM_UNLIKELY(!sess)) {
8,240✔
1411
        return;
1412
    }
1413

1,948✔
1414
    if (auto status = sess->receive_mark_message(request_ident); !status.is_ok())
10,188✔
1415
        close_due_to_protocol_error(std::move(status)); // Throws
1,952✔
1416
}
8,240✔
1417

1418

1419
void Connection::receive_unbound_message(session_ident_type session_ident)
1,948✔
1420
{
1,852✔
1421
    Session* sess = find_and_validate_session(session_ident, "UNBOUND");
1,852✔
1422
    if (REALM_UNLIKELY(!sess)) {
1,852✔
1423
        return;
1424
    }
1,950✔
1425

1,950✔
1426
    if (auto status = sess->receive_unbound_message(); !status.is_ok()) {
3,802✔
1427
        close_due_to_protocol_error(std::move(status)); // Throws
1,948✔
1428
        return;
1429
    }
1430

1431
    if (sess->m_state == Session::Deactivated) {
1,852✔
1432
        finish_session_deactivation(sess);
1,882✔
1433
    }
1,882✔
1434
}
1,882✔
1435

1436

1437
void Connection::receive_test_command_response(session_ident_type session_ident, request_ident_type request_ident,
1438
                                               std::string_view body)
30✔
1439
{
26✔
1440
    Session* sess = find_and_validate_session(session_ident, "TEST_COMMAND");
26✔
1441
    if (REALM_UNLIKELY(!sess)) {
56✔
1442
        return;
1443
    }
1444

1445
    if (auto status = sess->receive_test_command_response(request_ident, body); !status.is_ok()) {
26✔
1446
        close_due_to_protocol_error(std::move(status));
2,796✔
1447
    }
2,796✔
1448
}
2,822✔
1449

2,796✔
1450

2,796✔
1451
void Connection::receive_server_log_message(session_ident_type session_ident, util::Logger::Level level,
1452
                                            std::string_view message)
1453
{
3,222✔
1454
    std::string prefix;
3,222✔
1455
    if (REALM_LIKELY(!m_appservices_coid.empty())) {
6,018✔
1456
        prefix = util::format("Server[%1]", m_appservices_coid);
5,050✔
1457
    }
5,044✔
1458
    else {
1,822✔
1459
        prefix = "Server";
1,822✔
1460
    }
1461

6✔
1462
    if (session_ident != 0) {
3,228✔
1463
        if (auto sess = get_session(session_ident)) {
2,152✔
1464
            sess->logger.log(LogCategory::session, level, "%1 log: %2", prefix, message);
3,954✔
1465
            return;
2,126✔
1466
        }
3,094✔
1467

968✔
1468
        logger.log(util::LogCategory::session, level, "%1 log for unknown session %2: %3", prefix, session_ident,
20✔
1469
                   message);
20✔
1470
        return;
20✔
1471
    }
4,856✔
1472

1473
    logger.log(level, "%1 log: %2", prefix, message);
3,786✔
1474
}
2,270✔
1475

1,194✔
1476

1,194✔
1477
void Connection::receive_appservices_request_id(std::string_view coid)
1,194✔
1478
{
5,672✔
1479
    // Only set once per connection
1480
    if (!coid.empty() && m_appservices_coid.empty()) {
2,962✔
1481
        m_appservices_coid = coid;
1,332✔
1482
        logger.log(util::LogCategory::session, util::LogCategory::Level::info,
1,332✔
1483
                   "Connected to app services with request id: \"%1\"", m_appservices_coid);
1,332✔
1484
    }
1,332✔
1485
}
2,962✔
1486

1487

1488
void Connection::handle_protocol_error(Status status)
1489
{
1490
    close_due_to_protocol_error(std::move(status));
1491
}
1492

1493

1494
// Sessions are guaranteed to be granted the opportunity to send a message in
1495
// the order that they enlist. Note that this is important to ensure
1496
// nonoverlapping communication with the server for consecutive sessions
85,682✔
1497
// associated with the same Realm file.
85,682✔
1498
//
85,682✔
1499
// CAUTION: The specified session may get destroyed before this function
85,682✔
1500
// returns, but only if its Session::send_message() puts it into the Deactivated
35,260✔
1501
// state.
85,682✔
1502
void Connection::enlist_to_send(Session* sess)
1503
{
83,110✔
1504
    REALM_ASSERT_EX(m_state == ConnectionState::connected, m_state);
83,110✔
1505
    m_sessions_enlisted_to_send.push_back(sess); // Throws
83,146✔
1506
    if (!m_sending)
83,146✔
1507
        send_next_message(); // Throws
30,456✔
1508
}
83,110✔
1509

1510

2,156✔
1511
std::string Connection::get_active_appservices_connection_id()
2,156✔
1512
{
36✔
1513
    return m_appservices_coid;
2,192✔
1514
}
2,148✔
1515

1516
void Session::cancel_resumption_delay()
44✔
1517
{
2,164✔
1518
    REALM_ASSERT_EX(m_state == Active, m_state);
2,208✔
1519

1520
    if (!m_suspended)
2,208✔
1521
        return;
2,166✔
1522

1523
    m_suspended = false;
72✔
1524

44✔
1525
    logger.debug("Resumed"); // Throws
32✔
1526

4✔
1527
    if (unbind_process_complete())
28✔
1528
        initiate_rebind(); // Throws
60✔
1529

44✔
1530
    m_conn.one_more_active_unsuspended_session(); // Throws
28✔
1531
    if (m_try_again_activation_timer) {
28✔
1532
        m_try_again_activation_timer.reset();
4✔
1533
    }
4✔
1534

11,064✔
1535
    on_resumed(); // Throws
11,092✔
1536
}
11,070✔
1537

11,042✔
1538

1539
void Session::gather_pending_compensating_writes(util::Span<Changeset> changesets,
22✔
1540
                                                 std::vector<ProtocolErrorInfo>* out)
22✔
1541
{
12,584✔
1542
    if (m_pending_compensating_write_errors.empty() || changesets.empty()) {
12,584✔
1543
        return;
12,562✔
1544
    }
12,562✔
1545

22✔
1546
#ifdef REALM_DEBUG
44✔
1547
    REALM_ASSERT_DEBUG(
44✔
1548
        std::is_sorted(m_pending_compensating_write_errors.begin(), m_pending_compensating_write_errors.end(),
22✔
1549
                       [](const ProtocolErrorInfo& lhs, const ProtocolErrorInfo& rhs) {
66✔
1550
                           REALM_ASSERT_DEBUG(lhs.compensating_write_server_version.has_value());
66✔
1551
                           REALM_ASSERT_DEBUG(rhs.compensating_write_server_version.has_value());
44✔
1552
                           return *lhs.compensating_write_server_version < *rhs.compensating_write_server_version;
44✔
1553
                       }));
44✔
1554
#endif
44✔
1555

22✔
1556
    while (!m_pending_compensating_write_errors.empty() &&
66✔
1557
           *m_pending_compensating_write_errors.front().compensating_write_server_version <=
66✔
1558
               changesets.back().version) {
22✔
1559
        auto& cur_error = m_pending_compensating_write_errors.front();
22✔
1560
        REALM_ASSERT_3(*cur_error.compensating_write_server_version, >=, changesets.front().version);
22✔
1561
        out->push_back(std::move(cur_error));
22✔
1562
        m_pending_compensating_write_errors.pop_front();
22✔
1563
    }
20,706✔
1564
}
20,706✔
1565

20,684✔
1566

9,606✔
1567
void Session::integrate_changesets(const SyncProgress& progress, std::uint_fast64_t downloadable_bytes,
1568
                                   const ReceivedChangesets& received_changesets, VersionInfo& version_info,
1569
                                   DownloadBatchState download_batch_state)
1570
{
24,542✔
1571
    auto& history = get_history();
34,148✔
1572
    if (received_changesets.empty()) {
34,148✔
1573
        if (download_batch_state == DownloadBatchState::MoreToCome) {
21,574✔
1574
            throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
1575
                                       "received empty download message that was not the last in batch",
11,078✔
1576
                                       ProtocolError::bad_progress);
11,078✔
1577
        }
11,078✔
1578
        history.set_sync_progress(progress, downloadable_bytes, version_info); // Throws
23,046✔
1579
        return;
23,046✔
1580
    }
23,032✔
1581

11,064✔
1582
    std::vector<ProtocolErrorInfo> pending_compensating_write_errors;
23,652✔
1583
    auto transact = get_db()->start_read();
19,828✔
1584
    history.integrate_server_changesets(
19,828✔
1585
        progress, downloadable_bytes, received_changesets, version_info, download_batch_state, logger, transact,
19,828✔
1586
        [&](const TransactionRef&, util::Span<Changeset> changesets) {
16,398✔
1587
            gather_pending_compensating_writes(changesets, &pending_compensating_write_errors);
16,386✔
1588
        }); // Throws
16,386✔
1589
    if (received_changesets.size() == 1) {
16,398✔
1590
        logger.debug("1 remote changeset integrated, producing client version %1",
8,516✔
1591
                     version_info.sync_version.version); // Throws
19,594✔
1592
    }
8,538✔
1593
    else {
4,080✔
1594
        logger.debug("%2 remote changesets integrated, producing client version %1",
4,080✔
1595
                     version_info.sync_version.version, received_changesets.size()); // Throws
4,080✔
1596
    }
4,080✔
1597

22✔
1598
    for (const auto& pending_error : pending_compensating_write_errors) {
12,596✔
1599
        logger.info("Reporting compensating write for client version %1 in server version %2: %3",
44✔
1600
                    pending_error.compensating_write_rejected_client_version,
44✔
1601
                    *pending_error.compensating_write_server_version, pending_error.message);
44✔
1602
        try {
44✔
1603
            on_connection_state_changed(
22✔
1604
                m_conn.get_state(),
22✔
1605
                SessionErrorInfo{pending_error,
44✔
1606
                                 protocol_error_to_status(static_cast<ProtocolError>(pending_error.raw_error_code),
11,100✔
1607
                                                          pending_error.message)});
22✔
1608
        }
22✔
1609
        catch (...) {
22✔
1610
            logger.error("Exception thrown while reporting compensating write: %1", exception_to_status());
20✔
1611
        }
20✔
1612
    }
42✔
1613
}
12,594✔
1614

1615

20✔
1616
void Session::on_integration_failure(const IntegrationException& error)
20✔
1617
{
40✔
1618
    REALM_ASSERT_EX(m_state == Active, m_state);
40✔
1619
    REALM_ASSERT(!m_client_error && !m_error_to_send);
20✔
1620
    logger.error("Failed to integrate downloaded changesets: %1", error.to_status());
40✔
1621

1622
    m_client_error = util::make_optional<IntegrationException>(error);
20✔
1623
    m_error_to_send = true;
20✔
1624
    SessionErrorInfo error_info{error.to_status(), IsFatal{false}};
40✔
1625
    error_info.server_requests_action = ProtocolErrorInfo::Action::Warning;
40✔
1626
    // Surface the error to the user otherwise is lost.
18✔
1627
    on_connection_state_changed(m_conn.get_state(), std::move(error_info));
38✔
1628

20✔
1629
    // Since the deactivation process has not been initiated, the UNBIND
1630
    // message cannot have been sent unless an ERROR message was received.
1631
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
21,688✔
1632
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
21,688✔
1633
        ensure_enlisted_to_send(); // Throws
21,686✔
1634
    }
18✔
1635
}
21,688✔
1636

21,668✔
1637
void Session::on_changesets_integrated(version_type client_version, const SyncProgress& progress)
1638
{
47,162✔
1639
    REALM_ASSERT_EX(m_state == Active, m_state);
25,858✔
1640
    REALM_ASSERT_3(progress.download.server_version, >=, m_download_progress.server_version);
25,494✔
1641

21,668✔
1642
    m_download_progress = progress.download;
25,494✔
1643
    m_progress = progress;
47,162✔
1644

1645
    if (progress.upload.client_version > m_upload_progress.client_version)
25,494✔
1646
        m_upload_progress = progress.upload;
21,940✔
1647

1,658✔
1648
    do_recognize_sync_version(client_version); // Allows upload process to resume
27,152✔
1649

1,658✔
1650
    check_for_download_completion(); // Throws
25,494✔
1651

1652
    // If the client migrated from PBS to FLX, create subscriptions when new tables are received from server.
1653
    if (auto migration_store = get_migration_store(); migration_store && m_is_flx_sync_session) {
47,162✔
1654
        auto& flx_subscription_store = *get_flx_subscription_store();
23,294✔
1655
        get_migration_store()->create_subscriptions(flx_subscription_store);
23,288✔
1656
    }
23,288✔
1657

21,668✔
1658
    // Since the deactivation process has not been initiated, the UNBIND
1659
    // message cannot have been sent unless an ERROR message was received.
1660
    REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent);
25,494✔
1661
    if (m_ident_message_sent && !m_error_message_received && !m_suspended) {
30,362✔
1662
        ensure_enlisted_to_send(); // Throws
25,490✔
1663
    }
30,358✔
1664
}
25,494✔
1665

1666

1667
Session::~Session()
4,868✔
1668
{
10,072✔
1669
    //    REALM_ASSERT_EX(m_state == Unactivated || m_state == Deactivated, m_state);
4,868✔
1670
}
10,072✔
1671

4,868✔
1672

4,868✔
1673
std::string Session::make_logger_prefix(session_ident_type ident)
1674
{
5,204✔
1675
    std::ostringstream out;
5,204✔
1676
    out.imbue(std::locale::classic());
10,072✔
1677
    out << "Session[" << ident << "]: "; // Throws
10,072✔
1678
    return out.str();                    // Throws
5,204✔
1679
}
10,072✔
1680

1681

4,868✔
1682
void Session::activate()
4,868✔
1683
{
10,072✔
1684
    REALM_ASSERT_EX(m_state == Unactivated, m_state);
5,204✔
1685

4,868✔
1686
    logger.debug("Activating"); // Throws
10,072✔
1687

4,680✔
1688
    if (REALM_LIKELY(!get_client().is_dry_run())) {
9,884✔
1689
        bool file_exists = util::File::exists(get_realm_path());
10,070✔
1690
        m_performing_client_reset = get_client_reset_config().has_value();
10,070✔
1691

4,868✔
1692
        logger.info("client_reset_config = %1, Realm exists = %2 ", m_performing_client_reset, file_exists);
10,070✔
1693
        if (!m_performing_client_reset) {
10,070✔
1694
            get_history().get_status(m_last_version_available, m_client_file_ident, m_progress); // Throws
9,884✔
1695
        }
9,884✔
1696
    }
5,202✔
1697
    logger.debug("client_file_ident = %1, client_file_ident_salt = %2", m_client_file_ident.ident,
10,072✔
1698
                 m_client_file_ident.salt); // Throws
10,072✔
1699
    m_upload_progress = m_progress.upload;
10,072✔
1700
    m_download_progress = m_progress.download;
10,072✔
1701
    REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version);
10,072✔
1702
    init_progress_handler();
10,072✔
1703

1704
    logger.debug("last_version_available  = %1", m_last_version_available);                    // Throws
10,072✔
1705
    logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws
10,072✔
1706
    logger.debug("progress_download_client_version = %1",
5,204✔
1707
                 m_progress.download.last_integrated_client_version);                                      // Throws
10,072✔
1708
    logger.debug("progress_upload_server_version = %1", m_progress.upload.last_integrated_server_version); // Throws
5,204✔
1709
    logger.debug("progress_upload_client_version = %1", m_progress.upload.client_version);                 // Throws
10,072✔
1710

4,868✔
1711
    reset_protocol_state();
5,204✔
1712
    m_state = Active;
10,072✔
1713

4,868✔
1714
    call_debug_hook(SyncClientHookEvent::SessionActivating);
10,072✔
1715

4,868✔
1716
    REALM_ASSERT(!m_suspended);
5,204✔
1717
    m_conn.one_more_active_unsuspended_session(); // Throws
5,204✔
1718

4,868✔
1719
    try {
5,206✔
1720
        process_pending_flx_bootstrap();
5,206✔
1721
    }
5,204✔
1722
    catch (const IntegrationException& error) {
5,204✔
1723
        on_integration_failure(error);
4,868✔
1724
    }
4,868✔
1725
    catch (...) {
5,204✔
1726
        on_integration_failure(IntegrationException(exception_to_status()));
2✔
1727
    }
2✔
1728

1729
    // Checks if there is a pending client reset
1730
    handle_pending_client_reset_acknowledgement();
10,072✔
1731
}
10,070✔
1732

1733

4,868✔
1734
// The caller (Connection) must discard the session if the session has become
1735
// deactivated upon return.
4,868✔
1736
void Session::initiate_deactivation()
1737
{
10,068✔
1738
    REALM_ASSERT_EX(m_state == Active, m_state);
9,764✔
1739

1740
    logger.debug("Initiating deactivation"); // Throws
10,068✔
1741

2,722✔
1742
    m_state = Deactivating;
7,922✔
1743

2,722✔
1744
    if (!m_suspended)
5,200✔
1745
        m_conn.one_less_active_unsuspended_session(); // Throws
4,896✔
1746

1747
    if (m_enlisted_to_send) {
5,200✔
1748
        REALM_ASSERT(!unbind_process_complete());
5,432✔
1749
        return;
3,712✔
1750
    }
3,286✔
1751

426✔
1752
    // Deactivate immediately if the BIND message has not yet been sent and the
426✔
1753
    // session is not enlisted to send, or if the unbinding process has already
1754
    // completed.
1755
    if (!m_bind_message_sent || unbind_process_complete()) {
3,634✔
1756
        complete_deactivation(); // Throws
2,092✔
1757
        // Life cycle state is now Deactivated
1,620✔
1758
        return;
2,092✔
1759
    }
2,192✔
1760

1761
    // Ready to send the UNBIND message, if it has not already been sent
1762
    if (!m_unbind_message_sent) {
1,442✔
1763
        enlist_to_send(); // Throws
6,200✔
1764
        return;
6,200✔
1765
    }
6,200✔
1766
}
1,442✔
1767

4,868✔
1768

4,868✔
1769
void Session::complete_deactivation()
1770
{
5,204✔
1771
    REALM_ASSERT_EX(m_state == Deactivating, m_state);
5,204✔
1772
    m_state = Deactivated;
5,204✔
1773

1774
    logger.debug("Deactivation completed"); // Throws
5,204✔
1775
}
5,204✔
1776

1777

85,048✔
1778
// Called by the associated Connection object when this session is granted an
85,048✔
1779
// opportunity to send a message.
85,048✔
1780
//
85,048✔
1781
// The caller (Connection) must discard the session if the session has become
85,048✔
1782
// deactivated upon return.
1783
void Session::send_message()
1784
{
82,118✔
1785
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
86,646✔
1786
    REALM_ASSERT(m_enlisted_to_send);
83,152✔
1787
    m_enlisted_to_send = false;
82,118✔
1788
    if (m_state == Deactivating || m_error_message_received || m_suspended) {
83,152✔
1789
        // Deactivation has been initiated. If the UNBIND message has not been
1790
        // sent yet, there is no point in sending it. Instead, we can let the
1791
        // deactivation process complete.
1792
        if (!m_bind_message_sent) {
8,156✔
1793
            return complete_deactivation(); // Throws
5,378✔
1794
            // Life cycle state is now Deactivated
3,494✔
1795
        }
6,412✔
1796

1797
        // Session life cycle state is Deactivating or the unbinding process has
1798
        // been initiated by a session specific ERROR message
1799
        if (!m_unbind_message_sent)
83,298✔
1800
            send_unbind_message(); // Throws
2,778✔
1801
        return;
83,298✔
1802
    }
9,392✔
1803

1804
    // Session life cycle state is Active and the unbinding process has
1805
    // not been initiated
75,790✔
1806
    REALM_ASSERT(!m_unbind_message_sent);
153,246✔
1807

78✔
1808
    if (!m_bind_message_sent)
77,534✔
1809
        return send_bind_message(); // Throws
79,910✔
1810

30✔
1811
    if (!m_ident_message_sent) {
73,366✔
1812
        if (have_client_file_ident())
3,826✔
1813
            send_ident_message(); // Throws
79,586✔
1814
        return;
7,460✔
1815
    }
7,460✔
1816

3,634✔
1817
    const auto has_pending_test_command = std::any_of(m_pending_test_commands.begin(), m_pending_test_commands.end(),
73,144✔
1818
                                                      [](const PendingTestCommand& command) {
69,510✔
1819
                                                          return command.pending;
72,180✔
1820
                                                      });
72✔
1821
    if (has_pending_test_command) {
69,510✔
1822
        return send_test_command_message();
26✔
1823
    }
72,134✔
1824

8✔
1825
    if (m_error_to_send)
69,492✔
1826
        return send_json_error_message(); // Throws
18✔
1827

72,100✔
1828
    // Stop sending upload, mark and query messages when the client detects an error.
8,504✔
1829
    if (m_client_error) {
69,466✔
1830
        return;
63,602✔
1831
    }
63,602✔
1832

55,752✔
1833
    if (m_target_download_mark > m_last_download_mark_sent)
125,214✔
1834
        return send_mark_message(); // Throws
8,598✔
1835

7,846✔
1836
    auto is_upload_allowed = [&]() -> bool {
68,712✔
1837
        if (!m_is_flx_sync_session) {
60,866✔
1838
            return true;
53,422✔
1839
        }
53,422✔
1840

7,846✔
1841
        auto migration_store = get_migration_store();
15,290✔
1842
        if (!migration_store) {
15,278✔
1843
            return true;
7,834✔
1844
        }
1845

1846
        auto sentinel_query_version = migration_store->get_sentinel_subscription_set_version();
7,456✔
1847
        if (!sentinel_query_version) {
15,290✔
1848
            return true;
7,430✔
1849
        }
71,026✔
1850

6✔
1851
        // Do not allow upload if the last query sent is the sentinel one used by the migration store.
6✔
1852
        return m_last_sent_flx_query_version != *sentinel_query_version;
14✔
1853
    };
71,036✔
1854

63,592✔
1855
    if (!is_upload_allowed()) {
116,616✔
1856
        return;
55,760✔
1857
    }
8✔
1858

7,840✔
1859
    auto check_pending_flx_version = [&]() -> bool {
62,138✔
1860
        if (!m_is_flx_sync_session) {
62,134✔
1861
            return false;
53,418✔
1862
        }
59,976✔
1863

1864
        if (!m_allow_upload) {
13,992✔
1865
            return false;
6,966✔
1866
        }
6,966✔
1867

1868
        m_pending_flx_sub_set = get_flx_subscription_store()->get_next_pending_version(m_last_sent_flx_query_version);
7,026✔
1869

6,558✔
1870
        if (!m_pending_flx_sub_set) {
5,996✔
1871
            return false;
68,580✔
1872
        }
5,572✔
1873

582✔
1874
        return m_upload_progress.client_version >= m_pending_flx_sub_set->snapshot_version;
1,006✔
1875
    };
69,004✔
1876

29,420✔
1877
    if (check_pending_flx_version()) {
90,276✔
1878
        return send_query_change_message(); // throws
63,580✔
1879
    }
572✔
1880

1881
    if (m_allow_upload && (m_last_version_available > m_upload_progress.client_version)) {
60,284✔
1882
        return send_upload_message(); // Throws
34,396✔
1883
    }
34,396✔
1884
}
60,284✔
1885

4,730✔
1886

4,730✔
1887
void Session::send_bind_message()
4,730✔
1888
{
4,120✔
1889
    REALM_ASSERT_EX(m_state == Active, m_state);
8,850✔
1890

4,730✔
1891
    session_ident_type session_ident = m_ident;
8,850✔
1892
    bool need_client_file_ident = !have_client_file_ident();
4,120✔
1893
    const bool is_subserver = false;
8,850✔
1894

4,730✔
1895

768✔
1896
    ClientProtocol& protocol = m_conn.get_client_protocol();
4,888✔
1897
    int protocol_version = m_conn.get_negotiated_protocol_version();
4,150✔
1898
    OutputBuffer& out = m_conn.get_output_buffer();
4,150✔
1899
    // Discard the token since it's ignored by the server.
768✔
1900
    std::string empty_access_token;
4,888✔
1901
    if (m_is_flx_sync_session) {
4,120✔
1902
        nlohmann::json bind_json_data;
1,504✔
1903
        if (auto migrated_partition = get_migration_store()->get_migrated_partition()) {
1,504✔
1904
            bind_json_data["migratedPartition"] = *migrated_partition;
798✔
1905
        }
798✔
1906
        bind_json_data["sessionReason"] = static_cast<uint64_t>(get_session_reason());
1,504✔
1907
        auto schema_version = get_schema_version();
1,504✔
1908
        // Send 0 if schema is not versioned.
768✔
1909
        bind_json_data["schemaVersion"] = schema_version != uint64_t(-1) ? schema_version : 0;
1,504✔
1910
        if (logger.would_log(util::Logger::Level::debug)) {
1,504✔
1911
            std::string json_data_dump;
1,504✔
1912
            if (!bind_json_data.empty()) {
1,504✔
1913
                json_data_dump = bind_json_data.dump();
1,504✔
1914
            }
1,504✔
1915
            logger.debug(
4,698✔
1916
                "Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, json_data=\"%4\")",
4,698✔
1917
                session_ident, need_client_file_ident, is_subserver, json_data_dump);
4,698✔
1918
        }
4,698✔
1919
        protocol.make_flx_bind_message(protocol_version, out, session_ident, bind_json_data, empty_access_token,
4,698✔
1920
                                       need_client_file_ident, is_subserver); // Throws
4,698✔
1921
    }
4,698✔
1922
    else {
8,114✔
1923
        std::string server_path = get_virt_path();
3,384✔
1924
        logger.debug("Sending: BIND(session_ident=%1, need_client_file_ident=%2, is_subserver=%3, server_path=%4)",
8,114✔
1925
                     session_ident, need_client_file_ident, is_subserver, server_path);
8,114✔
1926
        protocol.make_pbs_bind_message(protocol_version, out, session_ident, server_path, empty_access_token,
3,384✔
1927
                                       need_client_file_ident, is_subserver); // Throws
3,384✔
1928
    }
3,384✔
1929
    m_conn.initiate_write_message(out, this); // Throws
8,850✔
1930

2,884✔
1931
    m_bind_message_sent = true;
8,850✔
1932
    call_debug_hook(SyncClientHookEvent::BindMessageSent);
4,120✔
1933

1934
    // Ready to send the IDENT message if the file identifier pair is already
1935
    // available.
3,634✔
1936
    if (!need_client_file_ident)
7,754✔
1937
        enlist_to_send(); // Throws
5,624✔
1938
}
7,754✔
1939

3,634✔
1940

1941
void Session::send_ident_message()
1942
{
7,460✔
1943
    REALM_ASSERT_EX(m_state == Active, m_state);
7,460✔
1944
    REALM_ASSERT(m_bind_message_sent);
7,460✔
1945
    REALM_ASSERT(!m_unbind_message_sent);
3,826✔
1946
    REALM_ASSERT(have_client_file_ident());
7,460✔
1947

732✔
1948

732✔
1949
    ClientProtocol& protocol = m_conn.get_client_protocol();
4,558✔
1950
    OutputBuffer& out = m_conn.get_output_buffer();
4,558✔
1951
    session_ident_type session_ident = m_ident;
4,558✔
1952

732✔
1953
    if (m_is_flx_sync_session) {
4,558✔
1954
        const auto active_query_set = get_flx_subscription_store()->get_active();
1,428✔
1955
        const auto active_query_body = active_query_set.to_ext_json();
1,428✔
1956
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
1,428✔
1957
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
1,428✔
1958
                     "latest_server_version_salt=%6, query_version=%7, query_size=%8, query=\"%9\")",
1,428✔
1959
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
1,428✔
1960
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
3,598✔
1961
                     m_progress.latest_server_version.salt, active_query_set.version(), active_query_body.size(),
3,598✔
1962
                     active_query_body); // Throws
3,598✔
1963
        protocol.make_flx_ident_message(out, session_ident, m_client_file_ident, m_progress,
3,598✔
1964
                                        active_query_set.version(), active_query_body); // Throws
3,598✔
1965
        m_last_sent_flx_query_version = active_query_set.version();
3,598✔
1966
    }
3,598✔
1967
    else {
6,032✔
1968
        logger.debug("Sending: IDENT(client_file_ident=%1, client_file_ident_salt=%2, "
6,032✔
1969
                     "scan_server_version=%3, scan_client_version=%4, latest_server_version=%5, "
6,764✔
1970
                     "latest_server_version_salt=%6)",
3,130✔
1971
                     m_client_file_ident.ident, m_client_file_ident.salt, m_progress.download.server_version,
6,764✔
1972
                     m_progress.download.last_integrated_client_version, m_progress.latest_server_version.version,
6,764✔
1973
                     m_progress.latest_server_version.salt);                                  // Throws
3,130✔
1974
        protocol.make_pbs_ident_message(out, session_ident, m_client_file_ident, m_progress); // Throws
3,130✔
1975
    }
6,764✔
1976
    m_conn.initiate_write_message(out, this); // Throws
7,460✔
1977

1978
    m_ident_message_sent = true;
3,826✔
1979

582✔
1980
    // Other messages may be waiting to be sent
582✔
1981
    enlist_to_send(); // Throws
4,408✔
1982
}
4,408✔
1983

582✔
1984
void Session::send_query_change_message()
582✔
1985
{
572✔
1986
    REALM_ASSERT_EX(m_state == Active, m_state);
1,154✔
1987
    REALM_ASSERT(m_ident_message_sent);
572✔
1988
    REALM_ASSERT(!m_unbind_message_sent);
572✔
1989
    REALM_ASSERT(m_pending_flx_sub_set);
572✔
1990
    REALM_ASSERT_3(m_pending_flx_sub_set->query_version, >, m_last_sent_flx_query_version);
1,154✔
1991

582✔
1992
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
1,154✔
1993
        return;
582✔
1994
    }
582✔
1995

1996
    auto sub_store = get_flx_subscription_store();
1,154✔
1997
    auto latest_sub_set = sub_store->get_by_version(m_pending_flx_sub_set->query_version);
1,154✔
1998
    auto latest_queries = latest_sub_set.to_ext_json();
1,154✔
1999
    logger.debug("Sending: QUERY(query_version=%1, query_size=%2, query=\"%3\", snapshot_version=%4)",
1,154✔
2000
                 latest_sub_set.version(), latest_queries.size(), latest_queries, latest_sub_set.snapshot_version());
1,154✔
2001

2002
    OutputBuffer& out = m_conn.get_output_buffer();
1,154✔
2003
    session_ident_type session_ident = get_ident();
572✔
2004
    ClientProtocol& protocol = m_conn.get_client_protocol();
1,154✔
2005
    protocol.make_query_change_message(out, session_ident, latest_sub_set.version(), latest_queries);
1,154✔
2006
    m_conn.initiate_write_message(out, this);
572✔
2007

2008
    m_last_sent_flx_query_version = latest_sub_set.version();
29,992✔
2009

29,420✔
2010
    request_download_completion_notification();
29,992✔
2011
}
29,992✔
2012

2013
void Session::send_upload_message()
29,420✔
2014
{
29,668✔
2015
    REALM_ASSERT_EX(m_state == Active, m_state);
29,668✔
2016
    REALM_ASSERT(m_ident_message_sent);
59,088✔
2017
    REALM_ASSERT(!m_unbind_message_sent);
59,088✔
2018

448✔
2019
    if (REALM_UNLIKELY(get_client().is_dry_run()))
30,116✔
2020
        return;
448✔
2021

2022
    version_type target_upload_version = m_last_version_available;
59,088✔
2023
    if (m_pending_flx_sub_set) {
59,088✔
2024
        REALM_ASSERT(m_is_flx_sync_session);
29,856✔
2025
        target_upload_version = m_pending_flx_sub_set->snapshot_version;
29,856✔
2026
    }
436✔
2027

29,420✔
2028
    std::vector<UploadChangeset> uploadable_changesets;
29,668✔
2029
    version_type locked_server_version = 0;
29,668✔
2030
    get_history().find_uploadable_changesets(m_upload_progress, target_upload_version, uploadable_changesets,
29,668✔
2031
                                             locked_server_version); // Throws
43,912✔
2032

134✔
2033
    if (uploadable_changesets.empty()) {
29,802✔
2034
        // Nothing more to upload right now
2035
        // If we need to limit upload up to some version other than the last client version available and there are no
134✔
2036
        // changes to upload, then there is no need to send an empty message.
134✔
2037
        if (m_pending_flx_sub_set) {
29,544✔
2038
            logger.debug("Empty UPLOAD was skipped (progress_client_version=%1, progress_server_version=%2)",
130✔
2039
                         m_upload_progress.client_version, m_upload_progress.last_integrated_server_version);
29,416✔
2040
            // Other messages may be waiting to be sent
314✔
2041
            return enlist_to_send(); // Throws
444✔
2042
        }
444✔
2043
    }
15,300✔
2044

29,286✔
2045
    if (m_pending_flx_sub_set && target_upload_version < m_last_version_available) {
58,824✔
2046
        logger.trace("Limiting UPLOAD message up to version %1 to send QUERY version %2",
306✔
2047
                     m_pending_flx_sub_set->snapshot_version, m_pending_flx_sub_set->query_version);
29,592✔
2048
    }
29,592✔
2049

29,286✔
2050
    version_type progress_client_version = m_upload_progress.client_version;
58,824✔
2051
    version_type progress_server_version = m_upload_progress.last_integrated_server_version;
29,538✔
2052

29,286✔
2053
    logger.debug("Sending: UPLOAD(progress_client_version=%1, progress_server_version=%2, "
58,824✔
2054
                 "locked_server_version=%3, num_changesets=%4)",
29,538✔
2055
                 progress_client_version, progress_server_version, locked_server_version,
58,824✔
2056
                 uploadable_changesets.size()); // Throws
51,312✔
2057

21,774✔
2058
    ClientProtocol& protocol = m_conn.get_client_protocol();
51,312✔
2059
    ClientProtocol::UploadMessageBuilder upload_message_builder = protocol.make_upload_message_builder(); // Throws
51,312✔
2060

21,774✔
2061
    for (const UploadChangeset& uc : uploadable_changesets) {
51,312✔
2062
        logger.debug(util::LogCategory::changeset,
21,514✔
2063
                     "Fetching changeset for upload (client_version=%1, server_version=%2, "
21,514!
2064
                     "changeset_size=%3, origin_timestamp=%4, origin_file_ident=%5)",
21,514✔
2065
                     uc.progress.client_version, uc.progress.last_integrated_server_version, uc.changeset.size(),
21,514✔
2066
                     uc.origin_timestamp, uc.origin_file_ident); // Throws
21,514✔
2067
        if (logger.would_log(util::Logger::Level::trace)) {
21,514✔
2068
            BinaryData changeset_data = uc.changeset.get_first_chunk();
×
2069
            if (changeset_data.size() < 1024) {
×
2070
                logger.trace(util::LogCategory::changeset, "Changeset: %1",
×
2071
                             _impl::clamped_hex_dump(changeset_data)); // Throws
2072
            }
×
2073
            else {
×
2074
                logger.trace(util::LogCategory::changeset, "Changeset(comp): %1 %2", changeset_data.size(),
×
2075
                             protocol.compressed_hex_dump(changeset_data));
×
2076
            }
×
2077

2078
#if REALM_DEBUG
×
2079
            ChunkedBinaryInputStream in{changeset_data};
×
2080
            Changeset log;
×
2081
            try {
×
2082
                parse_changeset(in, log);
×
2083
                std::stringstream ss;
×
2084
                log.print(ss);
×
2085
                logger.trace(util::LogCategory::changeset, "Changeset (parsed):\n%1", ss.str());
×
2086
            }
2087
            catch (const BadChangesetError& err) {
2088
                logger.error(util::LogCategory::changeset, "Unable to parse changeset: %1", err.what());
2089
            }
2090
#endif
2091
        }
2092

2093
#if 0 // Upload log compaction is currently not implemented
2094
        if (!get_client().m_disable_upload_compaction) {
2095
            ChangesetEncoder::Buffer encode_buffer;
2096

2097
            {
2098
                // Upload compaction only takes place within single changesets to
2099
                // avoid another client seeing inconsistent snapshots.
2100
                ChunkedBinaryInputStream stream{uc.changeset};
2101
                Changeset changeset;
2102
                parse_changeset(stream, changeset); // Throws
2103
                // FIXME: What is the point of setting these? How can compaction care about them?
2104
                changeset.version = uc.progress.client_version;
2105
                changeset.last_integrated_remote_version = uc.progress.last_integrated_server_version;
2106
                changeset.origin_timestamp = uc.origin_timestamp;
2107
                changeset.origin_file_ident = uc.origin_file_ident;
2108

2109
                compact_changesets(&changeset, 1);
2110
                encode_changeset(changeset, encode_buffer);
2111

2112
                logger.debug(util::LogCategory::changeset, "Upload compaction: original size = %1, compacted size = %2", uc.changeset.size(),
2113
                             encode_buffer.size()); // Throws
2114
            }
2115

2116
            upload_message_builder.add_changeset(
21,774✔
2117
                uc.progress.client_version, uc.progress.last_integrated_server_version, uc.origin_timestamp,
21,774✔
2118
                uc.origin_file_ident, BinaryData{encode_buffer.data(), encode_buffer.size()}); // Throws
21,774✔
2119
        }
21,774✔
2120
        else
21,774✔
2121
#endif
21,774✔
2122
        {
43,288✔
2123
            upload_message_builder.add_changeset(uc.progress.client_version,
21,514✔
2124
                                                 uc.progress.last_integrated_server_version, uc.origin_timestamp,
50,800✔
2125
                                                 uc.origin_file_ident,
50,800✔
2126
                                                 uc.changeset); // Throws
50,800✔
2127
        }
50,800✔
2128
    }
50,800✔
2129

29,286✔
2130
    int protocol_version = m_conn.get_negotiated_protocol_version();
58,824✔
2131
    OutputBuffer& out = m_conn.get_output_buffer();
29,538✔
2132
    session_ident_type session_ident = get_ident();
29,538✔
2133
    upload_message_builder.make_upload_message(protocol_version, out, session_ident, progress_client_version,
58,824✔
2134
                                               progress_server_version,
58,824✔
2135
                                               locked_server_version); // Throws
29,538✔
2136
    m_conn.initiate_write_message(out, this);                          // Throws
29,538✔
2137

2138
    // Other messages may be waiting to be sent
8,504✔
2139
    enlist_to_send(); // Throws
38,042✔
2140
}
38,042✔
2141

8,504✔
2142

8,504✔
2143
void Session::send_mark_message()
2144
{
17,102✔
2145
    REALM_ASSERT_EX(m_state == Active, m_state);
17,102✔
2146
    REALM_ASSERT(m_ident_message_sent);
8,598✔
2147
    REALM_ASSERT(!m_unbind_message_sent);
17,102✔
2148
    REALM_ASSERT_3(m_target_download_mark, >, m_last_download_mark_sent);
17,102✔
2149

8,504✔
2150
    request_ident_type request_ident = m_target_download_mark;
17,102✔
2151
    logger.debug("Sending: MARK(request_ident=%1)", request_ident); // Throws
17,102✔
2152

2153
    ClientProtocol& protocol = m_conn.get_client_protocol();
17,102✔
2154
    OutputBuffer& out = m_conn.get_output_buffer();
8,598✔
2155
    session_ident_type session_ident = get_ident();
8,598✔
2156
    protocol.make_mark_message(out, session_ident, request_ident); // Throws
17,102✔
2157
    m_conn.initiate_write_message(out, this);                      // Throws
17,102✔
2158

2159
    m_last_download_mark_sent = request_ident;
8,598✔
2160

2161
    // Other messages may be waiting to be sent
3,494✔
2162
    enlist_to_send(); // Throws
12,092✔
2163
}
12,092✔
2164

3,494✔
2165

2166
void Session::send_unbind_message()
3,494✔
2167
{
2,778✔
2168
    REALM_ASSERT_EX(m_state == Deactivating || m_error_message_received || m_suspended, m_state);
6,272✔
2169
    REALM_ASSERT(m_bind_message_sent);
6,272✔
2170
    REALM_ASSERT(!m_unbind_message_sent);
6,272✔
2171

3,494✔
2172
    logger.debug("Sending: UNBIND"); // Throws
6,272✔
2173

2174
    ClientProtocol& protocol = m_conn.get_client_protocol();
6,272✔
2175
    OutputBuffer& out = m_conn.get_output_buffer();
6,272✔
2176
    session_ident_type session_ident = get_ident();
2,778✔
2177
    protocol.make_unbind_message(out, session_ident); // Throws
2,778✔
2178
    m_conn.initiate_write_message(out, this);         // Throws
2,778✔
2179

18✔
2180
    m_unbind_message_sent = true;
2,796✔
2181
}
2,796✔
2182

18✔
2183

18✔
2184
void Session::send_json_error_message()
18✔
2185
{
18✔
2186
    REALM_ASSERT_EX(m_state == Active, m_state);
36✔
2187
    REALM_ASSERT(m_ident_message_sent);
36✔
2188
    REALM_ASSERT(!m_unbind_message_sent);
36✔
2189
    REALM_ASSERT(m_error_to_send);
36✔
2190
    REALM_ASSERT(m_client_error);
18✔
2191

18✔
2192
    ClientProtocol& protocol = m_conn.get_client_protocol();
36✔
2193
    OutputBuffer& out = m_conn.get_output_buffer();
36✔
2194
    session_ident_type session_ident = get_ident();
18✔
2195
    auto protocol_error = m_client_error->error_for_server;
36✔
2196

18✔
2197
    auto message = util::format("%1", m_client_error->to_status());
36✔
2198
    logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast<int>(protocol_error),
36✔
2199
                session_ident); // Throws
36✔
2200

2201
    nlohmann::json error_body_json;
36✔
2202
    error_body_json["message"] = std::move(message);
36✔
2203
    protocol.make_json_error_message(out, session_ident, static_cast<int>(protocol_error),
36✔
2204
                                     error_body_json.dump()); // Throws
18✔
2205
    m_conn.initiate_write_message(out, this);                 // Throws
18✔
2206

2207
    m_error_to_send = false;
48✔
2208
    enlist_to_send(); // Throws
48✔
2209
}
18✔
2210

30✔
2211

32✔
2212
void Session::send_test_command_message()
32✔
2213
{
58✔
2214
    REALM_ASSERT_EX(m_state == Active, m_state);
56✔
2215

2216
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
56✔
2217
                           [](const PendingTestCommand& command) {
56✔
2218
                               return command.pending;
56✔
2219
                           });
26✔
2220
    REALM_ASSERT(it != m_pending_test_commands.end());
56✔
2221

30✔
2222
    ClientProtocol& protocol = m_conn.get_client_protocol();
26✔
2223
    OutputBuffer& out = m_conn.get_output_buffer();
56✔
2224
    auto session_ident = get_ident();
56✔
2225

2226
    logger.info("Sending: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", it->body, session_ident, it->id);
56✔
2227
    protocol.make_test_command_message(out, session_ident, it->id, it->body);
56✔
2228

2229
    m_conn.initiate_write_message(out, this); // Throws;
26✔
2230
    it->pending = false;
1,770✔
2231

2232
    enlist_to_send();
26✔
2233
}
1,770✔
2234

2235
bool Session::client_reset_if_needed()
2236
{
1,970✔
2237
    // Regardless of what happens, once we return from this function we will
1,744✔
2238
    // no longer be in the middle of a client reset
1,744✔
2239
    m_performing_client_reset = false;
3,526✔
2240

1,556✔
2241
    // Even if we end up not actually performing a client reset, consume the
2242
    // config to ensure that the resources it holds are released
188✔
2243
    auto client_reset_config = std::exchange(get_client_reset_config(), std::nullopt);
2,158✔
2244
    if (!client_reset_config) {
1,970✔
2245
        return false;
1,970✔
2246
    }
1,970✔
2247

2248
    auto on_flx_version_complete = [this](int64_t version) {
188✔
2249
        this->on_flx_sync_version_complete(version);
150✔
2250
    };
150✔
2251
    bool did_reset =
376✔
2252
        client_reset::perform_client_reset(logger, *get_db(), std::move(*client_reset_config), m_client_file_ident,
188✔
2253
                                           get_flx_subscription_store(), on_flx_version_complete);
376✔
2254

188✔
2255
    call_debug_hook(SyncClientHookEvent::ClientResetMergeComplete);
376✔
2256
    if (!did_reset) {
376✔
2257
        return false;
188✔
2258
    }
188✔
2259

188✔
2260
    // The fresh Realm has been used to reset the state
188✔
2261
    logger.debug("Client reset is completed, path=%1", get_realm_path()); // Throws
188✔
2262

188✔
2263
    SaltedFileIdent client_file_ident;
376✔
2264
    get_history().get_status(m_last_version_available, client_file_ident, m_progress); // Throws
376✔
2265
    REALM_ASSERT_3(m_client_file_ident.ident, ==, client_file_ident.ident);
188✔
2266
    REALM_ASSERT_3(m_client_file_ident.salt, ==, client_file_ident.salt);
188✔
2267
    REALM_ASSERT_EX(m_progress.download.last_integrated_client_version == 0,
188✔
2268
                    m_progress.download.last_integrated_client_version);
376✔
2269
    REALM_ASSERT_EX(m_progress.upload.client_version == 0, m_progress.upload.client_version);
188✔
2270
    logger.trace("last_version_available  = %1", m_last_version_available); // Throws
188✔
2271

188✔
2272
    m_upload_progress = m_progress.upload;
188✔
2273
    m_download_progress = m_progress.download;
188✔
2274
    init_progress_handler();
376✔
2275
    // In recovery mode, there may be new changesets to upload and nothing left to download.
134✔
2276
    // In FLX DiscardLocal mode, there may be new commits due to subscription handling.
134✔
2277
    // For both, we want to allow uploads again without needing external changes to download first.
2278
    m_allow_upload = true;
376✔
2279

188✔
2280
    // Checks if there is a pending client reset
2281
    handle_pending_client_reset_acknowledgement();
188✔
2282

1,768✔
2283
    update_subscription_version_info();
1,956✔
2284

1,768✔
2285
    // If a migration or rollback is in progress, mark it complete when client reset is completed.
2286
    if (auto migration_store = get_migration_store()) {
188✔
2287
        migration_store->complete_migration_or_rollback();
134✔
2288
    }
134✔
2289

1,768✔
2290
    return true;
212✔
2291
}
188✔
2292

1,744✔
2293
Status Session::receive_ident_message(SaltedFileIdent client_file_ident)
1,744✔
2294
{
3,742✔
2295
    logger.debug("Received: IDENT(client_file_ident=%1, client_file_ident_salt=%2)", client_file_ident.ident,
1,998✔
2296
                 client_file_ident.salt); // Throws
1,998✔
2297

1,744✔
2298
    // Ignore the message if the deactivation process has been initiated,
2299
    // because in that case, the associated Realm and SessionWrapper must
2300
    // not be accessed any longer.
1,744✔
2301
    if (m_state != Active)
1,998✔
2302
        return Status::OK(); // Success
28✔
2303

2304
    bool legal_at_this_time = (m_bind_message_sent && !have_client_file_ident() && !m_error_message_received &&
3,714✔
2305
                               !m_unbound_message_received);
1,970✔
2306
    if (REALM_UNLIKELY(!legal_at_this_time)) {
3,714✔
2307
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received IDENT message when it was not legal"};
2308
    }
×
2309
    if (REALM_UNLIKELY(client_file_ident.ident < 1)) {
1,970✔
2310
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier in IDENT message"};
×
2311
    }
2312
    if (REALM_UNLIKELY(client_file_ident.salt == 0)) {
1,970✔
2313
        return {ErrorCodes::SyncProtocolInvariantFailed, "Bad client file identifier salt in IDENT message"};
2314
    }
1,744✔
2315

2316
    m_client_file_ident = client_file_ident;
1,970✔
2317

1,744✔
2318
    if (REALM_UNLIKELY(get_client().is_dry_run())) {
3,714✔
2319
        // Ready to send the IDENT message
1,744✔
2320
        ensure_enlisted_to_send(); // Throws
188✔
2321
        return Status::OK();       // Success
188✔
2322
    }
188✔
2323

2324
    // if a client reset happens, it will take care of setting the file ident
1,744✔
2325
    // and if not, we do it here
1,744✔
2326
    bool did_client_reset = false;
3,714✔
2327

1,744✔
2328
    // Save some of the client reset info for reporting to the client if an error occurs.
40✔
2329
    Status cr_status(Status::OK()); // Start with no client reset
2,010✔
2330
    ProtocolErrorInfo::Action cr_action = ProtocolErrorInfo::Action::NoAction;
2,010✔
2331
    if (auto& cr_config = get_client_reset_config()) {
2,010✔
2332
        cr_status = cr_config->error;
228✔
2333
        cr_action = cr_config->action;
228✔
2334
    }
228✔
2335

1,704✔
2336
    try {
3,526✔
2337
        did_client_reset = client_reset_if_needed();
3,526✔
2338
    }
3,526✔
2339
    catch (const std::exception& e) {
3,526✔
2340
        auto err_msg = util::format("A fatal error occurred during '%1' client reset for %2: '%3'", cr_action,
1,596✔
2341
                                    cr_status, e.what());
40✔
2342
        logger.error(err_msg.c_str());
40✔
2343
        SessionErrorInfo err_info(Status{ErrorCodes::AutoClientResetFailed, err_msg}, IsFatal{true});
1,744✔
2344
        suspend(err_info);
1,744✔
2345
        return Status::OK();
1,784✔
2346
    }
40✔
2347
    if (!did_client_reset) {
1,930✔
2348
        get_history().set_client_file_ident(client_file_ident,
23,968✔
2349
                                            m_fix_up_object_ids); // Throws
1,782✔
2350
        m_progress.download.last_integrated_client_version = 0;
1,782✔
2351
        m_progress.upload.client_version = 0;
1,782✔
2352
    }
23,968✔
2353

356✔
2354
    // Ready to send the IDENT message
2355
    ensure_enlisted_to_send(); // Throws
23,760✔
2356
    return Status::OK();       // Success
23,760✔
2357
}
1,970✔
2358

21,830✔
2359
Status Session::receive_download_message(const DownloadMessage& message)
20,998✔
2360
{
25,842✔
2361
    // Ignore the message if the deactivation process has been initiated,
21,830✔
2362
    // because in that case, the associated Realm and SessionWrapper must
21,830✔
2363
    // not be accessed any longer.
1,806✔
2364
    if (m_state != Active)
27,648✔
2365
        return Status::OK();
2,036✔
2366

1,806✔
2367
    bool is_flx = m_conn.is_flx_sync_connection();
27,418✔
2368
    int64_t query_version = is_flx ? *message.query_version : 0;
27,418✔
2369

1,806✔
2370
    if (!is_flx || query_version > 0)
27,418✔
2371
        enable_progress_notifications();
26,596✔
2372

1,806✔
2373
    // If this is a PBS connection, then every download message is its own complete batch.
20,024✔
2374
    bool last_in_batch = is_flx ? *message.last_in_batch : true;
45,636✔
2375
    auto batch_state = last_in_batch ? sync::DownloadBatchState::LastInBatch : sync::DownloadBatchState::MoreToCome;
45,636✔
2376
    if (is_steady_state_download_message(batch_state, query_version))
45,636✔
2377
        batch_state = DownloadBatchState::SteadyState;
44,572✔
2378

20,024✔
2379
    auto&& progress = message.progress;
45,636✔
2380
    if (is_flx) {
45,636✔
2381
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
21,756✔
2382
                     "latest_server_version=%3, latest_server_version_salt=%4, "
21,756✔
2383
                     "upload_client_version=%5, upload_server_version=%6, progress_estimate=%7, "
1,732✔
2384
                     "last_in_batch=%8, query_version=%9, num_changesets=%10, ...)",
1,732✔
2385
                     progress.download.server_version, progress.download.last_integrated_client_version,
1,732✔
2386
                     progress.latest_server_version.version, progress.latest_server_version.salt,
23,562✔
2387
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
1,732✔
2388
                     message.downloadable.as_estimate(), last_in_batch, query_version,
1,732✔
2389
                     message.changesets.size()); // Throws
1,732✔
2390
    }
1,732✔
2391
    else {
45,710✔
2392
        logger.debug("Received: DOWNLOAD(download_server_version=%1, download_client_version=%2, "
45,710✔
2393
                     "latest_server_version=%3, latest_server_version_salt=%4, "
23,880✔
2394
                     "upload_client_version=%5, upload_server_version=%6, "
23,880✔
2395
                     "downloadable_bytes=%7, num_changesets=%8, ...)",
45,710✔
2396
                     progress.download.server_version, progress.download.last_integrated_client_version,
23,880✔
2397
                     progress.latest_server_version.version, progress.latest_server_version.salt,
23,880✔
2398
                     progress.upload.client_version, progress.upload.last_integrated_server_version,
23,880✔
2399
                     message.downloadable.as_bytes(), message.changesets.size()); // Throws
23,880✔
2400
    }
45,710✔
2401

21,830✔
2402
    // Ignore download messages when the client detects an error. This is to prevent transforming the same bad
24,826✔
2403
    // changeset over and over again.
2404
    if (m_client_error) {
25,612✔
2405
        logger.debug("Ignoring download message because the client detected an integration error");
24,826✔
2406
        return Status::OK();
24,826✔
2407
    }
2408

24,826✔
2409
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
50,440✔
2410
    if (REALM_UNLIKELY(!legal_at_this_time)) {
25,612✔
2411
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received DOWNLOAD message when it was not legal"};
2✔
2412
    }
2✔
2413
    if (auto status = check_received_sync_progress(progress); REALM_UNLIKELY(!status.is_ok())) {
25,610✔
2414
        logger.error("Bad sync progress received (%1)", status);
24,826✔
2415
        return status;
2416
    }
2417

2418
    version_type server_version = m_progress.download.server_version;
50,436✔
2419
    version_type last_integrated_client_version = m_progress.download.last_integrated_client_version;
50,436✔
2420
    for (const RemoteChangeset& changeset : message.changesets) {
50,436✔
2421
        // Check that per-changeset server version is strictly increasing, except in FLX sync where the server
24,826✔
2422
        // version must be increasing, but can stay the same during bootstraps.
2423
        bool good_server_version = m_is_flx_sync_session ? (changeset.remote_version >= server_version)
22,788✔
2424
                                                         : (changeset.remote_version > server_version);
22,788✔
2425
        // Each server version cannot be greater than the one in the header of the download message.
2426
        good_server_version = good_server_version && (changeset.remote_version <= progress.download.server_version);
22,788✔
2427
        if (!good_server_version) {
22,788✔
2428
            return {ErrorCodes::SyncProtocolInvariantFailed,
24,826✔
2429
                    util::format("Bad server version in changeset header (DOWNLOAD) (%1, %2, %3)",
2430
                                 changeset.remote_version, server_version, progress.download.server_version)};
2431
        }
24,826✔
2432
        server_version = changeset.remote_version;
47,614✔
2433
        // Check that per-changeset last integrated client version is "weakly"
24,826✔
2434
        // increasing.
2435
        bool good_client_version =
22,788✔
2436
            (changeset.last_integrated_local_version >= last_integrated_client_version &&
22,788✔
2437
             changeset.last_integrated_local_version <= progress.download.last_integrated_client_version);
22,788✔
2438
        if (!good_client_version) {
47,614✔
2439
            return {ErrorCodes::SyncProtocolInvariantFailed,
2440
                    util::format("Bad last integrated client version in changeset header (DOWNLOAD) "
21,830✔
2441
                                 "(%1, %2, %3)",
21,830✔
2442
                                 changeset.last_integrated_local_version, last_integrated_client_version,
21,830✔
2443
                                 progress.download.last_integrated_client_version)};
8✔
2444
        }
8✔
2445
        last_integrated_client_version = changeset.last_integrated_local_version;
44,610✔
2446
        // Server shouldn't send our own changes, and zero is not a valid client
2447
        // file identifier.
21,822✔
2448
        bool good_file_ident =
23,926✔
2449
            (changeset.origin_file_ident > 0 && changeset.origin_file_ident != m_client_file_ident.ident);
23,926✔
2450
        if (!good_file_ident) {
23,926✔
2451
            return {ErrorCodes::SyncProtocolInvariantFailed,
2452
                    util::format("Bad origin file identifier in changeset header (DOWNLOAD)",
20,684✔
2453
                                 changeset.origin_file_ident)};
20,684✔
2454
        }
2455
    }
43,472✔
2456

20,684✔
2457
    auto hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageReceived, progress, query_version,
46,294✔
2458
                                       batch_state, message.changesets.size());
25,610✔
2459
    if (hook_action == SyncClientHookAction::EarlyReturn) {
25,610✔
2460
        return Status::OK();
20,692✔
2461
    }
8✔
2462
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
25,602✔
2463

2464
    if (process_flx_bootstrap_message(progress, batch_state, query_version, message.changesets)) {
46,286✔
2465
        clear_resumption_delay_state();
21,744✔
2466
        return Status::OK();
21,744✔
2467
    }
1,060✔
2468

2469
    initiate_integrate_changesets(message.downloadable.as_bytes(), batch_state, progress,
32,666✔
2470
                                  message.changesets); // Throws
32,666✔
2471

2472
    hook_action = call_debug_hook(SyncClientHookEvent::DownloadMessageIntegrated, progress, query_version,
24,542✔
2473
                                  batch_state, message.changesets.size());
24,542✔
2474
    if (hook_action == SyncClientHookAction::EarlyReturn) {
24,542✔
2475
        return Status::OK();
8,124✔
2476
    }
58✔
2477
    REALM_ASSERT_EX(hook_action == SyncClientHookAction::NoAction, hook_action);
24,542✔
2478

8,066✔
2479
    // When we receive a DOWNLOAD message successfully, we can clear the backoff timer value used to reconnect
8,066✔
2480
    // after a retryable session error.
6✔
2481
    clear_resumption_delay_state();
24,548✔
2482
    return Status::OK();
32,602✔
2483
}
32,602✔
2484

8,060✔
2485
Status Session::receive_mark_message(request_ident_type request_ident)
2486
{
8,240✔
2487
    logger.debug("Received: MARK(request_ident=%1)", request_ident); // Throws
8,240✔
2488

2489
    // Ignore the message if the deactivation process has been initiated,
2490
    // because in that case, the associated Realm and SessionWrapper must
2491
    // not be accessed any longer.
2492
    if (m_state != Active)
16,300✔
2493
        return Status::OK(); // Success
8,080✔
2494

8,060✔
2495
    bool legal_at_this_time = (m_ident_message_sent && !m_error_message_received && !m_unbound_message_received);
8,220✔
2496
    if (REALM_UNLIKELY(!legal_at_this_time)) {
16,280✔
2497
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received MARK message when it was not legal"};
8,064✔
2498
    }
4✔
2499
    bool good_request_ident =
8,216✔
2500
        (request_ident <= m_last_download_mark_sent && request_ident > m_last_download_mark_received);
8,216✔
2501
    if (REALM_UNLIKELY(!good_request_ident)) {
8,216✔
2502
        return {
2503
            ErrorCodes::SyncProtocolInvariantFailed,
1,950✔
2504
            util::format(
1,950✔
2505
                "Received MARK message with invalid request identifer (last mark sent: %1 last mark received: %2)",
2506
                m_last_download_mark_sent, m_last_download_mark_received)};
1,950✔
2507
    }
1,950✔
2508

2509
    m_server_version_at_last_download_mark = m_progress.download.server_version;
8,216✔
2510
    m_last_download_mark_received = request_ident;
8,216✔
2511
    check_for_download_completion(); // Throws
8,216✔
2512

2513
    return Status::OK(); // Success
8,216✔
2514
}
8,216✔
2515

1,950!
2516

2517
// The caller (Connection) must discard the session if the session has become
1,950✔
2518
// deactivated upon return.
2519
Status Session::receive_unbound_message()
2520
{
3,802✔
2521
    logger.debug("Received: UNBOUND");
1,852✔
2522

2523
    bool legal_at_this_time = (m_unbind_message_sent && !m_error_message_received && !m_unbound_message_received);
3,802✔
2524
    if (REALM_UNLIKELY(!legal_at_this_time)) {
1,852✔
2525
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received UNBOUND message when it was not legal"};
1,950✔
2526
    }
2527

1,950✔
2528
    // The fact that the UNBIND message has been sent, but an ERROR message has
1,950✔
2529
    // not been received, implies that the deactivation process must have been
2530
    // initiated, so this session must be in the Deactivating state or the session
2531
    // has been suspended because of a client side error.
2532
    REALM_ASSERT_EX(m_state == Deactivating || m_suspended, m_state);
1,862!
2533

10✔
2534
    m_unbound_message_received = true;
1,862✔
2535

10✔
2536
    // Detect completion of the unbinding process
2537
    if (m_unbind_message_send_complete && m_state == Deactivating) {
1,852✔
2538
        // The deactivation process completes when the unbinding process
2539
        // completes.
2540
        complete_deactivation(); // Throws
2,220✔
2541
        // Life cycle state is now Deactivated
368✔
2542
    }
2,220✔
2543

2544
    return Status::OK(); // Success
2,220✔
2545
}
2,220✔
2546

2547

2548
Status Session::receive_query_error_message(int error_code, std::string_view message, int64_t query_version)
2549
{
378✔
2550
    logger.info("Received QUERY_ERROR \"%1\" (error_code=%2, query_version=%3)", message, error_code, query_version);
378✔
2551
    // Ignore the message if the deactivation process has been initiated,
368✔
2552
    // because in that case, the associated Realm and SessionWrapper must
2553
    // not be accessed any longer.
2554
    if (m_state == Active) {
10✔
2555
        on_flx_sync_error(query_version, message); // throws
10✔
2556
    }
10✔
2557
    return Status::OK();
10✔
2558
}
10✔
2559

368✔
2560
// The caller (Connection) must discard the session if the session has become
368✔
2561
// deactivated upon return.
368✔
2562
Status Session::receive_error_message(const ProtocolErrorInfo& info)
4✔
2563
{
360✔
2564
    logger.info("Received: ERROR \"%1\" (error_code=%2, is_fatal=%3, error_action=%4)", info.message,
724✔
2565
                info.raw_error_code, info.is_fatal, info.server_requests_action); // Throws
356✔
2566

2567
    bool legal_at_this_time = (m_bind_message_sent && !m_error_message_received && !m_unbound_message_received);
356✔
2568
    if (REALM_UNLIKELY(!legal_at_this_time)) {
720✔
2569
        return {ErrorCodes::SyncProtocolInvariantFailed, "Received ERROR message when it was not legal"};
2570
    }
2571

22✔
2572
    auto protocol_error = static_cast<ProtocolError>(info.raw_error_code);
378✔
2573
    auto status = protocol_error_to_status(protocol_error, info.message);
378✔
2574
    if (status != ErrorCodes::UnknownError && REALM_UNLIKELY(!is_session_level_error(protocol_error))) {
378✔
2575
        return {ErrorCodes::SyncProtocolInvariantFailed,
22✔
2576
                util::format("Received ERROR message for session with non-session-level error code %1",
22✔
2577
                             info.raw_error_code)};
2578
    }
342✔
2579

2580
    // Can't process debug hook actions once the Session is undergoing deactivation, since
34✔
2581
    // the SessionWrapper may not be available
34✔
2582
    if (m_state == Active) {
390✔
2583
        auto debug_action = call_debug_hook(SyncClientHookEvent::ErrorMessageReceived, info);
390✔
2584
        if (debug_action == SyncClientHookAction::EarlyReturn) {
356✔
2585
            return Status::OK();
38✔
2586
        }
38✔
2587
    }
356✔
2588

34✔
2589
    // For compensating write errors, we need to defer raising them to the SDK until after the server version
34✔
2590
    // containing the compensating write has appeared in a download message.
2591
    if (status == ErrorCodes::SyncCompensatingWrite) {
660✔
2592
        // If the client is not active, the compensating writes will not be processed now, but will be
308✔
2593
        // sent again the next time the client connects
308✔
2594
        if (m_state == Active) {
364✔
2595
            REALM_ASSERT(info.compensating_write_server_version.has_value());
22✔
2596
            m_pending_compensating_write_errors.push_back(info);
22✔
2597
        }
370✔
2598
        return Status::OK();
370✔
2599
    }
370!
2600

348✔
2601
    if (protocol_error == ProtocolError::schema_version_changed) {
330✔
2602
        // Enable upload immediately if the session is still active.
348✔
2603
        if (m_state == Active) {
34✔
2604
            auto wt = get_db()->start_write();
34✔
2605
            _impl::sync_schema_migration::track_sync_schema_migration(*wt, *info.previous_schema_version);
382!
2606
            wt->commit();
34✔
2607
            // Notify SyncSession a schema migration is required.
2608
            on_connection_state_changed(m_conn.get_state(), SessionErrorInfo{info});
34✔
2609
        }
34!
2610
        // Keep the session active to upload any unsynced changes.
2611
        return Status::OK();
34✔
2612
    }
34✔
2613

2614
    m_error_message_received = true;
296✔
2615
    suspend(SessionErrorInfo{info, std::move(status)});
296✔
2616
    return Status::OK();
296✔
2617
}
330✔
2618

2619
void Session::suspend(const SessionErrorInfo& info)
348✔
2620
{
684✔
2621
    REALM_ASSERT(!m_suspended);
684✔
2622
    REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state);
684!
2623
    logger.debug("Suspended"); // Throws
684✔
2624

2625
    m_suspended = true;
684✔
2626

44✔
2627
    // Detect completion of the unbinding process
44✔
2628
    if (m_unbind_message_send_complete && m_error_message_received) {
336!
2629
        // The fact that the UNBIND message has been sent, but we are not being suspended because
2630
        // we received an ERROR message implies that the deactivation process must
348✔
2631
        // have been initiated, so this session must be in the Deactivating state.
348✔
2632
        REALM_ASSERT_EX(m_state == Deactivating, m_state);
348!
2633

2634
        // The deactivation process completes when the unbinding process
2635
        // completes.
30✔
2636
        complete_deactivation(); // Throws
30✔
2637
        // Life cycle state is now Deactivated
30✔
2638
    }
30✔
2639

30✔
2640
    // Notify the application of the suspension of the session if the session is
30✔
2641
    // still in the Active state
30✔
2642
    if (m_state == Active) {
336✔
2643
        call_debug_hook(SyncClientHookEvent::SessionSuspended, info);
336✔
2644
        m_conn.one_less_active_unsuspended_session(); // Throws
336✔
2645
        on_suspended(info);                           // Throws
336✔
2646
    }
366✔
2647

30✔
2648
    if (!info.is_fatal) {
336✔
2649
        begin_resumption_delay(info);
58✔
2650
    }
58✔
2651

2652
    // Ready to send the UNBIND message, if it has not been sent already
2653
    if (!m_unbind_message_sent)
380✔
2654
        ensure_enlisted_to_send(); // Throws
380✔
2655
}
336✔
2656

44✔
2657
Status Session::receive_test_command_response(request_ident_type ident, std::string_view body)
44✔
2658
{
70✔
2659
    logger.info("Received: TEST_COMMAND \"%1\" (session_ident=%2, request_ident=%3)", body, m_ident, ident);
70✔
2660
    auto it = std::find_if(m_pending_test_commands.begin(), m_pending_test_commands.end(),
26✔
2661
                           [&](const PendingTestCommand& command) {
26✔
2662
                               return command.id == ident;
26✔
2663
                           });
56✔
2664
    if (it == m_pending_test_commands.end()) {
56✔
2665
        return {ErrorCodes::SyncProtocolInvariantFailed,
44✔
2666
                util::format("Received test command response for a non-existent ident %1", ident)};
44✔
2667
    }
44✔
2668

8✔
2669
    it->promise.emplace_value(std::string{body});
62✔
2670
    m_pending_test_commands.erase(it);
26✔
2671

2672
    return Status::OK();
62✔
2673
}
62✔
2674

36✔
2675
void Session::begin_resumption_delay(const ProtocolErrorInfo& error_info)
44✔
2676
{
28✔
2677
    REALM_ASSERT(!m_try_again_activation_timer);
28✔
2678

21,820✔
2679
    m_try_again_delay_info.update(static_cast<sync::ProtocolError>(error_info.raw_error_code),
21,848✔
2680
                                  error_info.resumption_delay_interval);
28✔
2681
    auto try_again_interval = m_try_again_delay_info.delay_interval();
28✔
2682
    if (ProtocolError(error_info.raw_error_code) == ProtocolError::session_closed) {
28✔
2683
        // FIXME With compensating writes the server sends this error after completing a bootstrap. Doing the
21,820✔
2684
        // normal backoff behavior would result in waiting up to 5 minutes in between each query change which is
2685
        // not acceptable latency. So for this error code alone, we hard-code a 1 second retry interval.
2686
        try_again_interval = std::chrono::milliseconds{1000};
21,842✔
2687
    }
21,842✔
2688
    logger.debug("Will attempt to resume session after %1 milliseconds", try_again_interval.count());
21,858✔
2689
    m_try_again_activation_timer = get_client().create_timer(try_again_interval, [this](Status status) {
21,858✔
2690
        if (status == ErrorCodes::OperationAborted)
21,858✔
2691
            return;
8✔
2692
        else if (!status.is_ok())
20✔
2693
            throw Exception(status);
×
2694

2695
        m_try_again_activation_timer.reset();
21,850✔
2696
        cancel_resumption_delay();
20✔
2697
    });
20✔
2698
}
28✔
2699

2700
void Session::clear_resumption_delay_state()
21,830✔
2701
{
25,604✔
2702
    if (m_try_again_activation_timer) {
25,604✔
2703
        logger.debug("Clearing resumption delay state after successful download");
×
2704
        m_try_again_delay_info.reset();
×
2705
    }
21,830✔
2706
}
25,604✔
2707

2708
Status Session::check_received_sync_progress(const SyncProgress& progress) noexcept
2709
{
25,612✔
2710
    const SyncProgress& a = m_progress;
47,442✔
2711
    const SyncProgress& b = progress;
25,612✔
2712
    std::string message;
25,612✔
2713
    if (b.latest_server_version.version < a.latest_server_version.version) {
25,612✔
2714
        message = util::format("Latest server version in download messages must be weakly increasing throughout a "
×
2715
                               "session (current: %1, received: %2)",
21,830✔
2716
                               a.latest_server_version.version, b.latest_server_version.version);
×
2717
    }
×
2718
    if (b.upload.client_version < a.upload.client_version) {
25,612✔
2719
        message = util::format("Last integrated client version in download messages must be weakly increasing "
×
2720
                               "throughout a session (current: %1, received: %2)",
×
2721
                               a.upload.client_version, b.upload.client_version);
21,830✔
2722
    }
×
2723
    if (b.upload.client_version > m_last_version_available) {
25,612✔
2724
        message = util::format("Last integrated client version on server cannot be greater than the latest client "
×
2725
                               "version in existence (current: %1, received: %2)",
×
2726
                               m_last_version_available, b.upload.client_version);
×
2727
    }
21,830✔
2728
    if (b.download.server_version < a.download.server_version) {
25,612✔
2729
        message =
×
2730
            util::format("Download cursor must be weakly increasing throughout a session (current: %1, received: %2)",
×
2731
                         a.download.server_version, b.download.server_version);
×
2732
    }
×
2733
    if (b.download.server_version > b.latest_server_version.version) {
25,612✔
2734
        message = util::format(
21,830✔
2735
            "Download cursor cannot be greater than the latest server version in existence (cursor: %1, latest: %2)",
21,830✔
2736
            b.download.server_version, b.latest_server_version.version);
21,830✔
2737
    }
×
2738
    if (b.download.last_integrated_client_version < a.download.last_integrated_client_version) {
47,442✔
2739
        message = util::format(
2740
            "Last integrated client version on the server at the position in the server's history of the download "
2741
            "cursor must be weakly increasing throughout a session (current: %1, received: %2)",
2742
            a.download.last_integrated_client_version, b.download.last_integrated_client_version);
29,726✔
2743
    }
29,726✔
2744
    if (b.download.last_integrated_client_version > b.upload.client_version) {
55,338✔
2745
        message = util::format("Last integrated client version on the server in the position at the server's history "
29,726✔
2746
                               "of the download cursor cannot be greater than the latest client version integrated "
21,552✔
2747
                               "on the server (download: %1, upload: %2)",
8,174✔
2748
                               b.download.last_integrated_client_version, b.upload.client_version);
162✔
2749
    }
8,012✔
2750
    if (b.download.server_version < b.upload.last_integrated_server_version) {
25,612✔
2751
        message = util::format(
8,012✔
2752
            "The server version of the download cursor cannot be less than the server version integrated in the "
8,012✔
2753
            "latest client version acknowledged by the server (download: %1, upload: %2)",
2754
            b.download.server_version, b.upload.last_integrated_server_version);
2755
    }
2,170✔
2756

2,170✔
2757
    if (message.empty()) {
27,782✔
2758
        return Status::OK();
33,624✔
2759
    }
33,624✔
2760
    return {ErrorCodes::SyncProtocolInvariantFailed, std::move(message)};
2761
}
25,612✔
2762

2763

2764
void Session::check_for_download_completion()
2765
{
33,710✔
2766
    REALM_ASSERT_3(m_target_download_mark, >=, m_last_download_mark_received);
33,710✔
2767
    REALM_ASSERT_3(m_last_download_mark_received, >=, m_last_triggering_download_mark);
33,710✔
2768
    if (m_last_download_mark_received == m_last_triggering_download_mark)
33,710✔
2769
        return;
25,388✔
2770
    if (m_last_download_mark_received < m_target_download_mark)
8,322✔
2771
        return;
146✔
2772
    if (m_download_progress.server_version < m_server_version_at_last_download_mark)
8,176✔
2773
        return;
2774
    m_last_triggering_download_mark = m_target_download_mark;
8,176✔
2775
    if (REALM_UNLIKELY(!m_allow_upload)) {
8,176✔
2776
        // Activate the upload process now, and enable immediate reactivation
2777
        // after a subsequent fast reconnect.
2778
        m_allow_upload = true;
2,360✔
2779
        ensure_enlisted_to_send(); // Throws
2,360✔
2780
    }
2,360✔
2781
    on_download_completion(); // Throws
8,176✔
2782
}
8,176✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc