• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_121

21 Nov 2023 01:54PM UTC coverage: 92.117% (+0.4%) from 91.683%
thomas.goyne_121

push

Evergreen

jedelbo
Move bson files to core utils

92262 of 169120 branches covered (0.0%)

234642 of 254722 relevant lines covered (92.12%)

6329664.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.87
/src/realm/object-store/sync/sync_session.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/object-store/thread_safe_reference.hpp>
22
#include <realm/object-store/impl/realm_coordinator.hpp>
23
#include <realm/object-store/sync/app.hpp>
24
#include <realm/object-store/sync/impl/sync_client.hpp>
25
#include <realm/object-store/sync/impl/sync_file.hpp>
26
#include <realm/object-store/sync/impl/sync_metadata.hpp>
27
#include <realm/object-store/sync/sync_manager.hpp>
28
#include <realm/object-store/sync/sync_user.hpp>
29
#include <realm/object-store/util/scheduler.hpp>
30

31
#include <realm/db_options.hpp>
32
#include <realm/sync/client.hpp>
33
#include <realm/sync/config.hpp>
34
#include <realm/sync/network/http.hpp>
35
#include <realm/sync/network/websocket_error.hpp>
36
#include <realm/sync/noinst/client_history_impl.hpp>
37
#include <realm/sync/noinst/client_reset_operation.hpp>
38
#include <realm/sync/noinst/migration_store.hpp>
39
#include <realm/sync/protocol.hpp>
40

41
using namespace realm;
42
using namespace realm::_impl;
43

44
using SessionWaiterPointer = void (sync::Session::*)(util::UniqueFunction<void(std::error_code)>);
45

46
constexpr const char SyncError::c_original_file_path_key[];
47
constexpr const char SyncError::c_recovery_file_path_key[];
48

49
/// STATES:
50
///
51
/// WAITING_FOR_ACCESS_TOKEN: a request has been initiated to ask
52
/// for an updated access token and the session is waiting for a response.
53
/// From: INACTIVE, DYING
54
/// To:
55
///    * ACTIVE: when the SDK successfully refreshes the token
56
///    * INACTIVE: if asked to log out, or if asked to close
57
///
58
/// ACTIVE: the session is connected to the Sync Server and is actively
59
/// transferring data.
60
/// From: INACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
61
/// To:
62
///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
63
///                is Immediate.
64
///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
65
///
66
/// DYING: the session is performing clean-up work in preparation to be destroyed.
67
/// From: ACTIVE
68
/// To:
69
///    * INACTIVE: when the clean-up work completes, if the session wasn't
70
///                revived, or if explicitly asked to log out before the
71
///                clean-up work begins
72
///    * ACTIVE: if the session is revived
73
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
74
///                                but the token is invalid or expired.
75
///
76
/// INACTIVE: the user owning this session has logged out, the `sync::Session`
77
/// owned by this session is destroyed, and the session is quiescent.
78
/// Note that a session briefly enters this state before being destroyed, but
79
/// it can also enter this state and stay there if the user has been logged out.
80
/// From: initial, ACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
81
/// To:
82
///    * ACTIVE: if the session is revived
83
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
84
///                                but the token is invalid or expired.
85

86
void SyncSession::become_active()
87
{
1,702✔
88
    REALM_ASSERT(m_state != State::Active);
1,702✔
89
    m_state = State::Active;
1,702✔
90

764✔
91
    // First time the session becomes active, register a notification on the sentinel subscription set to restart the
764✔
92
    // session and update to native FLX.
764✔
93
    if (m_migration_sentinel_query_version) {
1,702✔
94
        m_flx_subscription_store->get_by_version(*m_migration_sentinel_query_version)
2✔
95
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
96
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
2✔
97
                if (!s.is_ok()) {
2✔
98
                    return;
×
99
                }
×
100
                REALM_ASSERT(s.get_value() == sync::SubscriptionSet::State::Complete);
2✔
101
                if (auto strong_self = weak_self.lock()) {
2✔
102
                    strong_self->m_migration_store->cancel_migration();
2✔
103
                    strong_self->restart_session();
2✔
104
                }
2✔
105
            });
2✔
106
        m_migration_sentinel_query_version.reset();
2✔
107
    }
2✔
108

764✔
109
    // when entering from the Dying state the session will still be bound
764✔
110
    if (!m_session) {
1,702✔
111
        create_sync_session();
1,700✔
112
        m_session->bind();
1,700✔
113
    }
1,700✔
114

764✔
115
    // Register all the pending wait-for-completion blocks. This can
764✔
116
    // potentially add a redundant callback if we're coming from the Dying
764✔
117
    // state, but that's okay (we won't call the user callbacks twice).
764✔
118
    SyncSession::CompletionCallbacks callbacks_to_register;
1,702✔
119
    std::swap(m_completion_callbacks, callbacks_to_register);
1,702✔
120

764✔
121
    for (auto& [id, callback_tuple] : callbacks_to_register) {
938✔
122
        add_completion_callback(std::move(callback_tuple.second), callback_tuple.first);
333✔
123
    }
333✔
124
}
1,702✔
125

126
void SyncSession::restart_session()
127
{
8✔
128
    util::CheckedUniqueLock lock(m_state_mutex);
8✔
129
    do_restart_session(std::move(lock));
8✔
130
}
8✔
131

132
void SyncSession::become_dying(util::CheckedUniqueLock lock)
133
{
74✔
134
    REALM_ASSERT(m_state != State::Dying);
74✔
135
    m_state = State::Dying;
74✔
136

29✔
137
    // If we have no session, we cannot possibly upload anything.
29✔
138
    if (!m_session) {
74✔
139
        become_inactive(std::move(lock));
×
140
        return;
×
141
    }
×
142

29✔
143
    size_t current_death_count = ++m_death_count;
74✔
144
    m_session->async_wait_for_upload_completion([weak_session = weak_from_this(), current_death_count](Status) {
74✔
145
        if (auto session = weak_session.lock()) {
74✔
146
            util::CheckedUniqueLock lock(session->m_state_mutex);
37✔
147
            if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
37✔
148
                session->become_inactive(std::move(lock));
37✔
149
            }
37✔
150
        }
37✔
151
    });
74✔
152
    m_state_mutex.unlock(lock);
74✔
153
}
74✔
154

155
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status, bool cancel_subscription_notifications)
156
{
1,519✔
157
    REALM_ASSERT(m_state != State::Inactive);
1,519✔
158
    m_state = State::Inactive;
1,519✔
159

671✔
160
    do_become_inactive(std::move(lock), status, cancel_subscription_notifications);
1,519✔
161
}
1,519✔
162

163
void SyncSession::become_paused(util::CheckedUniqueLock lock)
164
{
150✔
165
    REALM_ASSERT(m_state != State::Paused);
150✔
166
    auto old_state = m_state;
150✔
167
    m_state = State::Paused;
150✔
168

75✔
169
    // Nothing to do if we're already inactive besides update the state.
75✔
170
    if (old_state == State::Inactive) {
150✔
171
        m_state_mutex.unlock(lock);
2✔
172
        return;
2✔
173
    }
2✔
174

74✔
175
    do_become_inactive(std::move(lock), Status::OK(), true);
148✔
176
}
148✔
177

178
void SyncSession::do_restart_session(util::CheckedUniqueLock)
179
{
8✔
180
    // Nothing to do if the sync session is currently paused
4✔
181
    // It will be resumed when resume() is called
4✔
182
    if (m_state == State::Paused)
8✔
183
        return;
×
184

4✔
185
    // Go straight to inactive so the progress completion waiters will
4✔
186
    // continue to wait until the session restarts and completes the
4✔
187
    // upload/download sync
4✔
188
    m_state = State::Inactive;
8✔
189

4✔
190
    if (m_session) {
8✔
191
        m_session.reset();
8✔
192
    }
8✔
193

4✔
194
    // Create a new session and re-register the completion callbacks
4✔
195
    // The latest server path will be retrieved from sync_manager when
4✔
196
    // the new session is created by create_sync_session() in become
4✔
197
    // active.
4✔
198
    become_active();
8✔
199
}
8✔
200

201
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status,
202
                                     bool cancel_subscription_notifications)
203
{
1,667✔
204
    // Manually set the disconnected state. Sync would also do this, but
745✔
205
    // since the underlying SyncSession object already have been destroyed,
745✔
206
    // we are not able to get the callback.
745✔
207
    util::CheckedUniqueLock connection_state_lock(m_connection_state_mutex);
1,667✔
208
    auto old_state = m_connection_state;
1,667✔
209
    auto new_state = m_connection_state = SyncSession::ConnectionState::Disconnected;
1,667✔
210
    connection_state_lock.unlock();
1,667✔
211

745✔
212
    SyncSession::CompletionCallbacks waits;
1,667✔
213
    std::swap(waits, m_completion_callbacks);
1,667✔
214

745✔
215
    m_session = nullptr;
1,667✔
216
    if (m_sync_manager) {
1,667✔
217
        m_sync_manager->unregister_session(m_db->get_path());
1,667✔
218
    }
1,667✔
219

745✔
220
    auto subscription_store = m_flx_subscription_store;
1,667✔
221
    m_state_mutex.unlock(lock);
1,667✔
222

745✔
223
    // Send notifications after releasing the lock to prevent deadlocks in the callback.
745✔
224
    if (old_state != new_state) {
1,667✔
225
        m_connection_change_notifier.invoke_callbacks(old_state, connection_state());
1,341✔
226
    }
1,341✔
227

745✔
228
    if (status.is_ok())
1,667✔
229
        status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");
1,595✔
230

745✔
231
    if (subscription_store && cancel_subscription_notifications) {
1,667✔
232
        subscription_store->notify_all_state_change_notifications(status);
465✔
233
    }
465✔
234

745✔
235
    // Inform any queued-up completion handlers that they were cancelled.
745✔
236
    for (auto& [id, callback] : waits)
1,667✔
237
        callback.second(status);
58✔
238
}
1,667✔
239

240
void SyncSession::become_waiting_for_access_token()
241
{
18✔
242
    REALM_ASSERT(m_state != State::WaitingForAccessToken);
18✔
243
    m_state = State::WaitingForAccessToken;
18✔
244
}
18✔
245

246
void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
247
{
16✔
248
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
8✔
249
    {
16✔
250
        util::CheckedUniqueLock lock(m_state_mutex);
16✔
251
        cancel_pending_waits(std::move(lock), status);
16✔
252
    }
16✔
253
    if (user) {
16✔
254
        user->log_out();
16✔
255
    }
16✔
256

8✔
257
    if (auto error_handler = config(&SyncConfig::error_handler)) {
16✔
258
        auto user_facing_error = SyncError({ErrorCodes::AuthError, status.reason()}, true);
16✔
259
        error_handler(shared_from_this(), std::move(user_facing_error));
16✔
260
    }
16✔
261
}
16✔
262

263
static bool check_for_auth_failure(const app::AppError& error)
264
{
26✔
265
    using namespace realm::sync;
26✔
266
    // Auth failure is returned as a 401 (unauthorized) or 403 (forbidden) response
13✔
267
    if (error.additional_status_code) {
26✔
268
        auto status_code = HTTPStatus(*error.additional_status_code);
24✔
269
        if (status_code == HTTPStatus::Unauthorized || status_code == HTTPStatus::Forbidden)
24✔
270
            return true;
14✔
271
    }
12✔
272

6✔
273
    return false;
12✔
274
}
12✔
275

276
static bool check_for_redirect_response(const app::AppError& error)
277
{
12✔
278
    using namespace realm::sync;
12✔
279
    // Check for unhandled 301/308 permanent redirect response
6✔
280
    if (error.additional_status_code) {
12✔
281
        auto status_code = HTTPStatus(*error.additional_status_code);
10✔
282
        if (status_code == HTTPStatus::MovedPermanently || status_code == HTTPStatus::PermanentRedirect)
10✔
283
            return true;
×
284
    }
12✔
285

6✔
286
    return false;
12✔
287
}
12✔
288

289
util::UniqueFunction<void(util::Optional<app::AppError>)>
290
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
291
{
44✔
292
    return [session, restart_session](util::Optional<app::AppError> error) {
44✔
293
        auto session_user = session->user();
44✔
294
        if (!session_user) {
44✔
295
            util::CheckedUniqueLock lock(session->m_state_mutex);
×
296
            auto refresh_error = error ? error->to_status() : Status::OK();
×
297
            session->cancel_pending_waits(std::move(lock), refresh_error);
×
298
        }
×
299
        else if (error) {
44✔
300
            if (error->code() == ErrorCodes::ClientAppDeallocated) {
28✔
301
                return; // this response came in after the app shut down, ignore it
×
302
            }
×
303
            else if (ErrorCodes::error_categories(error->code()).test(ErrorCategory::client_error)) {
28✔
304
                // any other client errors other than app_deallocated are considered fatal because
1✔
305
                // there was a problem locally before even sending the request to the server
1✔
306
                // eg. ClientErrorCode::user_not_found, ClientErrorCode::user_not_logged_in,
1✔
307
                // ClientErrorCode::too_many_redirects
1✔
308
                session->handle_bad_auth(session_user, error->to_status());
2✔
309
            }
2✔
310
            else if (check_for_auth_failure(*error)) {
26✔
311
                // A 401 response on a refresh request means that the token cannot be refreshed and we should not
7✔
312
                // retry. This can be because an admin has revoked this user's sessions, the user has been disabled,
7✔
313
                // or the refresh token has expired according to the server's clock.
7✔
314
                session->handle_bad_auth(
14✔
315
                    session_user,
14✔
316
                    {error->code(), util::format("Unable to refresh the user access token: %1", error->reason())});
14✔
317
            }
14✔
318
            else if (check_for_redirect_response(*error)) {
12✔
319
                // A 301 or 308 response is an unhandled permanent redirect response (which should not happen) - if
320
                // this is received, fail the request with an appropriate error message.
321
                // Temporary redirect responses (302, 307) are not supported
322
                session->handle_bad_auth(
×
323
                    session_user,
×
324
                    {error->code(), util::format("Unhandled redirect response when trying to reach the server: %1",
×
325
                                                 error->reason())});
×
326
            }
×
327
            else {
12✔
328
                // A refresh request has failed. This is an unexpected non-fatal error and we would
6✔
329
                // like to retry but we shouldn't do this immediately in order to not swamp the
6✔
330
                // server with requests. Consider two scenarios:
6✔
331
                // 1) If this request was spawned from the proactive token check, or a user
6✔
332
                // initiated request, the token may actually be valid. Just advance to Active
6✔
333
                // from WaitingForAccessToken if needed and let the sync server tell us if the
6✔
334
                // token is valid or not. If this also fails we will end up in case 2 below.
6✔
335
                // 2) If the sync connection initiated the request because the server is
6✔
336
                // unavailable or the connection otherwise encounters an unexpected error, we want
6✔
337
                // to let the sync client attempt to reinitialize the connection using its own
6✔
338
                // internal backoff timer which will happen automatically so nothing needs to
6✔
339
                // happen here.
6✔
340
                util::CheckedUniqueLock lock(session->m_state_mutex);
12✔
341
                if (session->m_state == State::WaitingForAccessToken) {
12✔
342
                    session->become_active();
2✔
343
                }
2✔
344
            }
12✔
345
        }
28✔
346
        else {
16✔
347
            // If the session needs to be restarted, then restart the session now
8✔
348
            // The latest access token and server url will be pulled from the sync
8✔
349
            // manager when the new session is started.
8✔
350
            if (restart_session) {
16✔
351
                session->restart_session();
2✔
352
            }
2✔
353
            // Otherwise, update the access token and reconnect
7✔
354
            else {
14✔
355
                session->update_access_token(session_user->access_token());
14✔
356
            }
14✔
357
        }
16✔
358
    };
44✔
359
}
44✔
360

361
SyncSession::SyncSession(SyncClient& client, std::shared_ptr<DB> db, const RealmConfig& config,
362
                         SyncManager* sync_manager)
363
    : m_config{config}
364
    , m_db{std::move(db)}
365
    , m_original_sync_config{m_config.sync_config}
366
    , m_migration_store{sync::MigrationStore::create(m_db)}
367
    , m_client(client)
368
    , m_sync_manager(sync_manager)
369
{
1,374✔
370
    REALM_ASSERT(m_config.sync_config);
1,374✔
371
    // we don't want the following configs enabled during a client reset
601✔
372
    m_config.scheduler = nullptr;
1,374✔
373
    m_config.audit_config = nullptr;
1,374✔
374

601✔
375
    // Adjust the sync_config if using PBS sync and already in the migrated or rollback state
601✔
376
    if (m_migration_store->is_migrated() || m_migration_store->is_rollback_in_progress()) {
1,374✔
377
        m_config.sync_config = sync::MigrationStore::convert_sync_config_to_flx(m_original_sync_config);
8✔
378
    }
8✔
379

601✔
380
    // If using FLX, set up m_flx_subscription_store and the history_write_validator
601✔
381
    if (m_config.sync_config->flx_sync_requested) {
1,374✔
382
        create_subscription_store();
419✔
383
        std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
419✔
384
        set_write_validator_factory(weak_sub_mgr);
419✔
385
    }
419✔
386

601✔
387
    // After a migration to FLX, if the user opens the realm with a flexible sync configuration, we need to first
601✔
388
    // upload any unsynced changes before updating to native FLX.
601✔
389
    // A subscription set is used as sentinel so we know when to stop uploading.
601✔
390
    // Note: Currently, a sentinel subscription set is always created even if there is nothing to upload.
601✔
391
    if (m_migration_store->is_migrated() && m_original_sync_config->flx_sync_requested) {
1,374✔
392
        m_migration_store->create_sentinel_subscription_set(*m_flx_subscription_store);
2✔
393
        m_migration_sentinel_query_version = m_migration_store->get_sentinel_subscription_set_version();
2✔
394
        REALM_ASSERT(m_migration_sentinel_query_version);
2✔
395
    }
2✔
396
}
1,374✔
397

398
std::shared_ptr<SyncManager> SyncSession::sync_manager() const
399
{
×
400
    util::CheckedLockGuard lk(m_state_mutex);
×
401
    REALM_ASSERT(m_sync_manager);
×
402
    return m_sync_manager->shared_from_this();
×
403
}
×
404

405
void SyncSession::detach_from_sync_manager()
406
{
2✔
407
    shutdown_and_wait();
2✔
408
    util::CheckedLockGuard lk(m_state_mutex);
2✔
409
    m_sync_manager = nullptr;
2✔
410
}
2✔
411

412
void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
413
{
70✔
414
    util::CheckedLockGuard config_lock(m_config_mutex);
70✔
415
    // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
35✔
416
    std::string recovery_path;
70✔
417
    auto original_path = path();
70✔
418
    error.user_info[SyncError::c_original_file_path_key] = original_path;
70✔
419
    if (should_backup == ShouldBackup::yes) {
70✔
420
        recovery_path = util::reserve_unique_file_name(
70✔
421
            m_sync_manager->recovery_directory_path(m_config.sync_config->recovery_directory),
70✔
422
            util::create_timestamped_template("recovered_realm"));
70✔
423
        error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
70✔
424
    }
70✔
425
    using Action = SyncFileActionMetadata::Action;
70✔
426
    auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
70✔
427
    m_sync_manager->perform_metadata_update([action, original_path = std::move(original_path),
70✔
428
                                             recovery_path = std::move(recovery_path),
70✔
429
                                             partition_value = m_config.sync_config->partition_value,
70✔
430
                                             identity = m_config.sync_config->user->identity()](const auto& manager) {
70✔
431
        manager.make_file_action_metadata(original_path, partition_value, identity, action, recovery_path);
70✔
432
    });
70✔
433
}
70✔
434

435
void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
436
{
174✔
437
    // first check that recovery will not be prevented
87✔
438
    if (server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
174✔
439
        auto mode = config(&SyncConfig::client_resync_mode);
4✔
440
        if (mode == ClientResyncMode::Recover) {
4✔
441
            handle_fresh_realm_downloaded(
2✔
442
                nullptr,
2✔
443
                {ErrorCodes::RuntimeError,
2✔
444
                 "A client reset is required but the server does not permit recovery for this client"},
2✔
445
                server_requests_action);
2✔
446
            return;
2✔
447
        }
2✔
448
    }
172✔
449

86✔
450
    std::vector<char> encryption_key;
172✔
451
    {
172✔
452
        util::CheckedLockGuard lock(m_config_mutex);
172✔
453
        encryption_key = m_config.encryption_key;
172✔
454
    }
172✔
455

86✔
456
    DBOptions options;
172✔
457
    options.allow_file_format_upgrade = false;
172✔
458
    options.enable_async_writes = false;
172✔
459
    if (!encryption_key.empty())
172✔
460
        options.encryption_key = encryption_key.data();
2✔
461

86✔
462
    DBRef db;
172✔
463
    auto fresh_path = client_reset::get_fresh_path_for(m_db->get_path());
172✔
464
    try {
172✔
465
        // We want to attempt to use a pre-existing file to reduce the chance of
86✔
466
        // downloading the first part of the file only to then delete it over
86✔
467
        // and over, but if we fail to open it then we should just start over.
86✔
468
        try {
172✔
469
            db = DB::create(sync::make_client_replication(), fresh_path, options);
172✔
470
        }
172✔
471
        catch (...) {
91✔
472
            util::File::try_remove(fresh_path);
10✔
473
        }
10✔
474

86✔
475
        if (!db) {
168✔
476
            db = DB::create(sync::make_client_replication(), fresh_path, options);
2✔
477
        }
2✔
478
    }
164✔
479
    catch (...) {
90✔
480
        // Failed to open the fresh path after attempting to delete it, so we
4✔
481
        // just can't do automatic recovery.
4✔
482
        handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
8✔
483
        return;
8✔
484
    }
8✔
485

82✔
486
    util::CheckedLockGuard state_lock(m_state_mutex);
164✔
487
    if (m_state != State::Active) {
164✔
488
        return;
×
489
    }
×
490
    std::shared_ptr<SyncSession> fresh_sync_session;
164✔
491
    {
164✔
492
        util::CheckedLockGuard config_lock(m_config_mutex);
164✔
493
        RealmConfig config = m_config;
164✔
494
        config.path = fresh_path;
164✔
495
        // in case of migrations use the migrated config
82✔
496
        auto fresh_config = m_migrated_sync_config ? *m_migrated_sync_config : *m_config.sync_config;
150✔
497
        // deep copy the sync config so we don't modify the live session's config
82✔
498
        config.sync_config = std::make_shared<SyncConfig>(fresh_config);
164✔
499
        config.sync_config->client_resync_mode = ClientResyncMode::Manual;
164✔
500
        fresh_sync_session = m_sync_manager->get_session(db, config);
164✔
501
        auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
164✔
502
        // the fresh Realm may apply writes to this db after it has outlived its sync session
82✔
503
        // the writes are used to generate a changeset for recovery, but are never committed
82✔
504
        history.set_write_validator_factory({});
164✔
505
    }
164✔
506

82✔
507
    fresh_sync_session->assert_mutex_unlocked();
164✔
508
    // The fresh realm uses flexible sync.
82✔
509
    if (auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store()) {
164✔
510
        auto fresh_sub = fresh_sub_store->get_latest();
64✔
511
        // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
32✔
512
        if (auto local_subs_store = m_flx_subscription_store) {
64✔
513
            sync::SubscriptionSet active = local_subs_store->get_active();
46✔
514
            auto fresh_mut_sub = fresh_sub.make_mutable_copy();
46✔
515
            fresh_mut_sub.import(active);
46✔
516
            fresh_sub = fresh_mut_sub.commit();
46✔
517
        }
55✔
518
        fresh_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete)
64✔
519
            .then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State state) {
63✔
520
                if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
63✔
521
                    return util::Future<sync::SubscriptionSet::State>::make_ready(state);
54✔
522
                }
53✔
523
                auto strong_self = weak_self.lock();
31✔
524
                if (!strong_self || !strong_self->m_migration_store->is_migration_in_progress()) {
31✔
525
                    return util::Future<sync::SubscriptionSet::State>::make_ready(state);
9✔
526
                }
×
527

528
                // fresh_sync_session is using a new realm file that doesn't have the migration_store info
9✔
529
                // so the query string from the local migration store will need to be provided
9✔
530
                auto query_string = strong_self->m_migration_store->get_query_string();
18✔
531
                REALM_ASSERT(query_string);
18✔
532
                // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
9✔
533
                // message.
9✔
534
                fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
18✔
535
                auto latest_subs = fresh_sub_store->get_latest();
18✔
536
                {
18✔
537
                    util::CheckedLockGuard lock(strong_self->m_state_mutex);
18✔
538
                    // Save a copy of the subscriptions so we add them to the local realm once the
9✔
539
                    // subscription store is created.
9✔
540
                    strong_self->m_active_subscriptions_after_migration = latest_subs;
18✔
541
                }
18✔
542

32✔
543
                return latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
41✔
544
            })
41✔
545
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
64✔
546
                // Keep the sync session alive while it's downloading, but then close
32✔
547
                // it immediately
31✔
548
                fresh_sync_session->force_close();
63✔
549
                if (auto strong_self = weak_self.lock()) {
63✔
550
                    if (s.is_ok()) {
33✔
551
                        strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action);
32✔
552
                    }
32✔
553
                    else {
33✔
554
                        strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status(), server_requests_action);
33✔
555
                    }
51✔
556
                }
82✔
557
            });
82✔
558
    }
82✔
559
    else { // pbs
100✔
560
        fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](Status s) {
100✔
561
            // Keep the sync session alive while it's downloading, but then close
50✔
562
            // it immediately
50✔
563
            fresh_sync_session->force_close();
100✔
564
            if (auto strong_self = weak_self.lock()) {
100✔
565
                strong_self->handle_fresh_realm_downloaded(db, s, server_requests_action);
132✔
566
            }
132✔
567
        });
50✔
568
    }
50✔
569
    fresh_sync_session->revive_if_needed();
82✔
570
}
82✔
571

87✔
572
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
87✔
573
                                                sync::ProtocolErrorInfo::Action server_requests_action)
87✔
574
{
87✔
575
    util::CheckedUniqueLock lock(m_state_mutex);
87✔
576
    if (m_state != State::Active) {
174✔
577
        return;
87✔
578
    }
87✔
579
    // The download can fail for many reasons. For example:
87✔
580
    // - unable to write the fresh copy to the file system
87✔
581
    // - during download of the fresh copy, the fresh copy itself is reset
9✔
582
    // - in FLX mode there was a problem fulfilling the previously active subscription
3✔
583
    if (!status.is_ok()) {
90✔
584
        if (status == ErrorCodes::OperationAborted) {
15✔
585
            return;
9✔
586
        }
9✔
587
        lock.unlock();
12✔
588

6✔
589
        sync::SessionErrorInfo synthetic(
12✔
590
            Status{ErrorCodes::AutoClientResetFailed,
12✔
591
                   util::format("A fatal error occurred during client reset: '%1'", status.reason())},
12✔
592
            sync::IsFatal{true});
12✔
593
        handle_error(synthetic);
84✔
594
        return;
84✔
595
    }
84✔
596

78✔
597
    // Performing a client reset requires tearing down our current
78✔
598
    // sync session and creating a new one with the relevant client reset config. This
78✔
599
    // will result in session completion handlers firing
78✔
600
    // when the old session is torn down, which we don't want as this
78✔
601
    // is supposed to be transparent to the user.
78✔
602
    //
78✔
603
    // To avoid this, we need to move the completion handlers aside temporarily so
78✔
604
    // that moving to the inactive state doesn't clear them - they will be
78✔
605
    // re-registered when the session becomes active again.
78✔
606
    {
156✔
607
        m_server_requests_action = server_requests_action;
156✔
608
        m_client_reset_fresh_copy = db;
156✔
609
        CompletionCallbacks callbacks;
156✔
610
        std::swap(m_completion_callbacks, callbacks);
156✔
611
        // always swap back, even if advance_state throws
78✔
612
        auto guard = util::make_scope_exit([&]() noexcept {
156✔
613
            util::CheckedUniqueLock lock(m_state_mutex);
78✔
614
            if (m_completion_callbacks.empty())
78✔
615
                std::swap(callbacks, m_completion_callbacks);
156✔
616
            else
78✔
617
                m_completion_callbacks.merge(std::move(callbacks));
78✔
618
        });
156✔
619
        // Do not cancel the notifications on subscriptions.
78✔
620
        bool cancel_subscription_notifications = false;
156✔
621
        become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock
156✔
622

78✔
623
        // Once the session is inactive, update sync config and subscription store after migration.
13✔
624
        if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
91✔
625
            server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
82✔
626
            apply_sync_config_after_migration_or_rollback();
26✔
627
            auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
91✔
628
            update_subscription_store(flx_sync_requested);
91✔
629
        }
91✔
630
    }
78✔
631
    revive_if_needed();
78✔
632
}
86✔
633

8✔
634
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
8✔
635
{
8✔
636
    session.handle_error(std::move(error));
8✔
637
}
8✔
638

639
// This method should only be called from within the error handler callback registered upon the underlying
287✔
640
// `m_session`.
287✔
641
void SyncSession::handle_error(sync::SessionErrorInfo error)
287✔
642
{
562✔
643
    enum class NextStateAfterError { none, inactive, error };
562✔
644
    auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
425✔
645
    util::Optional<ShouldBackup> delete_file;
562✔
646
    bool log_out_user = false;
562✔
647
    bool unrecognized_by_client = false;
298✔
648

23✔
649
    if (error.status == ErrorCodes::AutoClientResetFailed) {
298✔
650
        // At this point, automatic recovery has been attempted but it failed.
23✔
651
        // Fallback to a manual reset and let the user try to handle it.
23✔
652
        next_state = NextStateAfterError::inactive;
287✔
653
        delete_file = ShouldBackup::yes;
282✔
654
    }
23✔
655
    else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
252✔
656
        switch (error.server_requests_action) {
256✔
657
            case sync::ProtocolErrorInfo::Action::NoAction:
9✔
658
                REALM_UNREACHABLE(); // This is not sent by the MongoDB server
12✔
659
            case sync::ProtocolErrorInfo::Action::ApplicationBug:
24✔
660
                [[fallthrough]];
23✔
661
            case sync::ProtocolErrorInfo::Action::ProtocolViolation:
26✔
662
                break;
140✔
663
            case sync::ProtocolErrorInfo::Action::Warning:
136✔
664
                break; // not fatal, but should be bubbled up to the user below.
136✔
665
            case sync::ProtocolErrorInfo::Action::Transient:
119✔
666
                // Not real errors, don't need to be reported to the binding.
667
                return;
110✔
668
            case sync::ProtocolErrorInfo::Action::DeleteRealm:
✔
669
                next_state = NextStateAfterError::inactive;
83✔
670
                delete_file = ShouldBackup::no;
83✔
671
                break;
85✔
672
            case sync::ProtocolErrorInfo::Action::ClientReset:
168✔
673
                [[fallthrough]];
95✔
674
            case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
97✔
675
                switch (config(&SyncConfig::client_resync_mode)) {
97✔
676
                    case ClientResyncMode::Manual:
24✔
677
                        next_state = NextStateAfterError::inactive;
51✔
678
                        delete_file = ShouldBackup::yes;
51✔
679
                        break;
57✔
680
                    case ClientResyncMode::DiscardLocal:
84✔
681
                        [[fallthrough]];
112✔
682
                    case ClientResyncMode::RecoverOrDiscard:
118✔
683
                        [[fallthrough]];
118✔
684
                    case ClientResyncMode::Recover:
85✔
685
                        download_fresh_realm(error.server_requests_action);
85✔
686
                        return; // do not propgate the error to the user at this point
85✔
687
                }
21✔
688
                break;
21✔
689
            case sync::ProtocolErrorInfo::Action::MigrateToFLX:
18✔
690
                // Should not receive this error if original sync config is FLX
9✔
691
                REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
18✔
692
                REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
18✔
693
                // Original config was PBS, migrating to FLX
9✔
694
                m_migration_store->migrate_to_flx(*error.migration_query_string,
18✔
695
                                                  m_original_sync_config->partition_value);
18✔
696
                save_sync_config_after_migration_or_rollback();
21✔
697
                download_fresh_realm(error.server_requests_action);
14✔
698
                return;
14✔
699
            case sync::ProtocolErrorInfo::Action::RevertToPBS:
10✔
700
                // If the client was updated to use FLX natively, but the server was rolled back to PBS,
5✔
701
                // the server should be sending switch_to_flx_sync; throw exception if this error is not
702
                // received.
703
                if (m_original_sync_config->flx_sync_requested) {
5✔
704
                    throw LogicError(ErrorCodes::InvalidServerResponse,
×
705
                                     "Received 'RevertToPBS' from server after rollback while client is natively "
5✔
706
                                     "using FLX - expected 'SwitchToPBS'");
5✔
707
                }
5✔
708
                // Original config was PBS, rollback the migration
5✔
709
                m_migration_store->rollback_to_pbs();
10✔
710
                save_sync_config_after_migration_or_rollback();
14✔
711
                download_fresh_realm(error.server_requests_action);
14✔
712
                return;
14✔
713
            case sync::ProtocolErrorInfo::Action::RefreshUser:
18✔
714
                if (auto u = user()) {
18✔
715
                    u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
9✔
716
                    return;
12✔
717
                }
12✔
718
                break;
3✔
719
            case sync::ProtocolErrorInfo::Action::RefreshLocation:
6✔
720
                if (auto u = user()) {
6✔
721
                    u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
3✔
722
                    return;
3✔
723
                }
3✔
724
                break;
×
725
            case sync::ProtocolErrorInfo::Action::LogOutUser:
✔
726
                next_state = NextStateAfterError::inactive;
5✔
727
                log_out_user = true;
5✔
728
                break;
5✔
729
        }
10✔
730
    }
10✔
731
    else {
10✔
732
        // Unrecognized error code.
287✔
733
        unrecognized_by_client = true;
292✔
734
    }
68✔
735

63✔
736
    util::CheckedUniqueLock lock(m_state_mutex);
129✔
737
    SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
129✔
738
    // `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
63✔
739
    sync_error.server_requests_action = error.server_requests_action;
129✔
740
    sync_error.is_unrecognized_by_client = unrecognized_by_client;
101✔
741

63✔
742
    if (delete_file)
129✔
743
        update_error_and_mark_file_for_deletion(sync_error, *delete_file);
36✔
744

1✔
745
    if (m_state == State::Dying && error.is_fatal) {
67✔
746
        become_inactive(std::move(lock), error.status);
63✔
747
        return;
63✔
748
    }
63✔
749

62✔
750
    // Don't bother invoking m_config.error_handler if the sync is inactive.
751
    // It does not make sense to call the handler when the session is closed.
752
    if (m_state == State::Inactive || m_state == State::Paused) {
127✔
753
        return;
62✔
754
    }
15✔
755

15✔
756
    switch (next_state) {
65✔
757
        case NextStateAfterError::none:
15✔
758
            if (config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
30✔
759
                cancel_pending_waits(std::move(lock), sync_error.status); // unlocks the mutex
35✔
760
            }
35✔
761
            break;
50✔
762
        case NextStateAfterError::inactive: {
35✔
763
            become_inactive(std::move(lock), sync_error.status);
47✔
764
            break;
47✔
765
        }
12✔
766
        case NextStateAfterError::error: {
77✔
767
            cancel_pending_waits(std::move(lock), sync_error.status);
77✔
768
            break;
77✔
769
        }
127✔
770
    }
65✔
771

772
    if (log_out_user) {
65✔
773
        if (auto u = user())
62!
774
            u->log_out();
62✔
775
    }
62✔
776

62✔
777
    if (auto error_handler = config(&SyncConfig::error_handler)) {
127✔
778
        error_handler(shared_from_this(), std::move(sync_error));
65✔
779
    }
65✔
780
}
85✔
781

20✔
782
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
20✔
783
{
43✔
784
    CompletionCallbacks callbacks;
43✔
785
    std::swap(callbacks, m_completion_callbacks);
43✔
786

9✔
787
    // Inform any waiters on pending subscription states that they were cancelled
9✔
788
    if (m_flx_subscription_store) {
32✔
789
        auto subscription_store = m_flx_subscription_store;
18✔
790
        m_state_mutex.unlock(lock);
20✔
791
        subscription_store->notify_all_state_change_notifications(error);
20✔
792
    }
20✔
793
    else {
34✔
794
        m_state_mutex.unlock(lock);
34✔
795
    }
34✔
796

9✔
797
    // Inform any queued-up completion handlers that they were cancelled.
20✔
798
    for (auto& [id, callback] : callbacks)
23✔
799
        callback.second(error);
9✔
800
}
23✔
801

2,676✔
802
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
2,676✔
803
                                         uint64_t uploadable, uint64_t download_version, uint64_t snapshot_version)
2,676✔
804
{
3,758✔
805
    m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, download_version, snapshot_version);
3,758✔
806
}
3,758✔
807

808
static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
78✔
809
                                                                   const std::shared_ptr<SyncConfig>& sync_config,
78✔
810
                                                                   DBRef&& fresh_copy, bool recovery_is_allowed)
78✔
811
{
156✔
812
    REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);
156✔
813

78✔
814
    sync::Session::Config::ClientReset config;
156✔
815
    config.mode = sync_config->client_resync_mode;
156✔
816
    config.fresh_copy = std::move(fresh_copy);
156✔
817
    config.recovery_is_allowed = recovery_is_allowed;
156✔
818

78✔
819
    // The conditions here are asymmetric because if we have *either* a before
78✔
820
    // or after callback we need to make sure to initialize the local schema
13✔
821
    // before the client reset happens.
65✔
822
    if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
143✔
823
        return config;
78✔
824

65✔
825
    RealmConfig realm_config = base_config;
130✔
826
    realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
130✔
827
    realm_config.scheduler = util::Scheduler::make_dummy();
129✔
828

47✔
829
    if (sync_config->notify_after_client_reset) {
112✔
830
        config.notify_after_client_reset = [realm_config](VersionID previous_version, bool did_recover) {
94✔
831
            auto coordinator = _impl::RealmCoordinator::get_coordinator(realm_config);
94✔
832
            ThreadSafeReference active_after = coordinator->get_unbound_realm();
94✔
833
            SharedRealm frozen_before = coordinator->get_realm(realm_config, previous_version);
94✔
834
            REALM_ASSERT(frozen_before);
94✔
835
            REALM_ASSERT(frozen_before->is_frozen());
94✔
836
            realm_config.sync_config->notify_after_client_reset(std::move(frozen_before), std::move(active_after),
111✔
837
                                                                did_recover);
112✔
838
        };
112✔
839
    }
129✔
840
    config.notify_before_client_reset = [config = std::move(realm_config)]() -> VersionID {
130✔
841
        // Opening the Realm live here may make a write if the schema is different
65✔
842
        // than what exists on disk. It is necessary to pass a fully usable Realm
65✔
843
        // to the user here. Note that the schema changes made here will be considered
65✔
844
        // an "offline write" to be recovered if this is recovery mode.
65✔
845
        auto before = Realm::get_shared_realm(config);
129✔
846
        before->read_group();
129✔
847
        if (auto& notify_before = config.sync_config->notify_before_client_reset) {
130✔
848
            notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
127✔
849
        }
129✔
850
        // Note that if the SDK requested a live Realm this may be a different
65✔
851
        // version than what we had before calling the callback.
65✔
852
        return before->read_transaction_version();
130✔
853
    };
130✔
854

855
    return config;
65✔
856
}
828✔
857

763✔
858
void SyncSession::create_sync_session()
859
{
1,700✔
860
    if (m_session)
1,700✔
861
        return;
763✔
862

763✔
863
    util::CheckedLockGuard config_lock(m_config_mutex);
1,700✔
864

763✔
865
    REALM_ASSERT(m_config.sync_config);
1,700✔
866
    SyncConfig& sync_config = *m_config.sync_config;
1,700✔
867
    REALM_ASSERT(sync_config.user);
1,700✔
868

763✔
869
    sync::Session::Config session_config;
1,700✔
870
    session_config.signed_user_token = sync_config.user->access_token();
1,700✔
871
    session_config.user_id = sync_config.user->identity();
1,700✔
872
    session_config.realm_identifier = sync_config.partition_value;
1,700✔
873
    session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
1,700✔
874
    session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
1,700✔
875
    session_config.ssl_verify_callback = sync_config.ssl_verify_callback;
1,700✔
876
    session_config.proxy_config = sync_config.proxy_config;
1,700✔
877
    session_config.simulate_integration_error = sync_config.simulate_integration_error;
1,700✔
878
    session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
1,700✔
879
    session_config.session_reason =
1,700✔
880
        client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
888✔
881

214✔
882
    if (sync_config.on_sync_client_event_hook) {
1,151✔
883
        session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
243✔
884
                                                    anchor = weak_from_this()](const SyncClientHookData& data) {
237✔
885
            return hook(anchor, data);
971✔
886
        };
971✔
887
    }
792✔
888

763✔
889
    {
1,700✔
890
        std::string sync_route = m_sync_manager->sync_route();
1,700✔
891

763✔
892
        if (!m_client.decompose_server_url(sync_route, session_config.protocol_envelope,
937✔
893
                                           session_config.server_address, session_config.server_port,
937✔
894
                                           session_config.service_identifier)) {
763✔
895
            throw sync::BadServerUrl(sync_route);
763✔
896
        }
763✔
897
        // FIXME: Java needs the fully resolved URL for proxy support, but we also need it before
763✔
898
        // the session is created. How to resolve this?
763✔
899
        m_server_url = sync_route;
1,700✔
900
    }
937✔
901

902
    if (sync_config.authorization_header_name) {
1,700✔
903
        session_config.authorization_header_name = *sync_config.authorization_header_name;
763✔
904
    }
763✔
905
    session_config.custom_http_headers = sync_config.custom_http_headers;
1,015✔
906

78✔
907
    if (m_server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
1,015✔
908
        // Migrations are allowed to recover local data.
78✔
909
        const bool allowed_to_recover = m_server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset ||
156✔
910
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
92✔
911
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
83✔
912
        // Use the original sync config, not the updated one from the migration store
78✔
913
        session_config.client_reset_config = make_client_reset_config(
156✔
914
            m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), allowed_to_recover);
841✔
915
        m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
841✔
916
    }
841✔
917

763✔
918
    m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
1,700✔
919

763✔
920
    std::weak_ptr<SyncSession> weak_self = weak_from_this();
1,700✔
921

763✔
922
    // Set up the wrapped progress handler callback
2,791✔
923
    m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
3,728✔
924
                                                uint_fast64_t uploaded, uint_fast64_t uploadable,
3,613✔
925
                                                uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
6,518✔
926
        if (auto self = weak_self.lock()) {
6,518✔
927
            self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, progress_version,
6,549✔
928
                                         snapshot_version);
4,521✔
929
        }
4,521✔
930
    });
4,605✔
931

763✔
932
    // Sets up the connection state listener. This callback is used for both reporting errors as well as changes to
1,955✔
933
    // the connection state.
1,955✔
934
    m_session->set_connection_state_change_listener(
2,892✔
935
        [weak_self](sync::ConnectionState state, util::Optional<sync::SessionErrorInfo> error) {
4,164✔
936
            // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is
1,955✔
937
            // nothing useful we can do with them.
8✔
938
            auto self = weak_self.lock();
2,217✔
939
            if (!self) {
4,156✔
940
                return;
1,963✔
941
            }
1,963✔
942
            using cs = sync::ConnectionState;
2,453✔
943
            ConnectionState new_state = [&] {
2,453✔
944
                switch (state) {
3,041✔
945
                    case cs::disconnected:
1,096✔
946
                        return ConnectionState::Disconnected;
1,087✔
947
                    case cs::connecting:
1,846✔
948
                        return ConnectionState::Connecting;
1,007✔
949
                    case cs::connected:
938✔
950
                        return ConnectionState::Connected;
938✔
951
                }
1,947✔
952
                REALM_UNREACHABLE();
1,947✔
953
            }();
1,947✔
954
            util::CheckedUniqueLock lock(self->m_connection_state_mutex);
4,140✔
955
            auto old_state = self->m_connection_state;
4,140✔
956
            self->m_connection_state = new_state;
4,140✔
957
            lock.unlock();
4,121✔
958

1,928✔
959
            if (old_state != new_state) {
4,140✔
960
                self->m_connection_change_notifier.invoke_callbacks(old_state, new_state);
4,121✔
961
            }
2,447✔
962

273✔
963
            if (error) {
4,140✔
964
                self->handle_error(std::move(*error));
1,024✔
965
            }
261✔
966
        });
2,193✔
967
}
4,740✔
968

3,803✔
969
void SyncSession::nonsync_transact_notify(sync::version_type version)
3,803✔
970
{
7,890✔
971
    m_progress_notifier.set_local_version(version);
7,890✔
972

3,672✔
973
    util::CheckedUniqueLock lock(m_state_mutex);
7,759✔
974
    switch (m_state) {
7,759✔
975
        case State::Active:
7,549✔
976
        case State::WaitingForAccessToken:
7,550✔
977
            if (m_session) {
7,551✔
978
                m_session->nonsync_transact_notify(version);
7,550✔
979
            }
4,009✔
980
            break;
4,010✔
981
        case State::Dying:
131✔
982
        case State::Inactive:
3,890✔
983
        case State::Paused:
4,011✔
984
            break;
208✔
985
    }
4,087✔
986
}
5,009✔
987

922✔
988
void SyncSession::revive_if_needed()
922✔
989
{
1,335✔
990
    util::CheckedUniqueLock lock(m_state_mutex);
1,335✔
991
    switch (m_state) {
1,335✔
992
        case State::Active:
467✔
993
        case State::WaitingForAccessToken:
927✔
994
        case State::Paused:
930✔
995
            return;
930✔
996
        case State::Dying:
692✔
997
        case State::Inactive:
1,787✔
998
            do_revive(std::move(lock));
1,787✔
999
            break;
865✔
1000
    }
1,104✔
1001
}
1,104✔
1002

1003
void SyncSession::handle_reconnect()
1004
{
×
1005
    util::CheckedUniqueLock lock(m_state_mutex);
×
1006
    switch (m_state) {
×
1007
        case State::Active:
×
1008
            m_session->cancel_reconnect_delay();
×
1009
            break;
×
1010
        case State::Dying:
×
1011
        case State::Inactive:
×
1012
        case State::WaitingForAccessToken:
×
1013
        case State::Paused:
×
1014
            break;
1015
    }
1016
}
106✔
1017

106✔
1018
void SyncSession::force_close()
106✔
1019
{
229✔
1020
    util::CheckedUniqueLock lock(m_state_mutex);
229✔
1021
    switch (m_state) {
229✔
1022
        case State::Active:
220✔
1023
        case State::Dying:
220✔
1024
        case State::WaitingForAccessToken:
223✔
1025
            become_inactive(std::move(lock));
129✔
1026
            break;
129✔
1027
        case State::Inactive:
111✔
1028
        case State::Paused:
112✔
1029
            break;
6✔
1030
    }
129✔
1031
}
205✔
1032

76✔
1033
void SyncSession::pause()
76✔
1034
{
151✔
1035
    util::CheckedUniqueLock lock(m_state_mutex);
151✔
1036
    switch (m_state) {
151✔
1037
        case State::Active:
149✔
1038
        case State::Dying:
149✔
1039
        case State::WaitingForAccessToken:
149✔
1040
        case State::Inactive:
150✔
1041
            become_paused(std::move(lock));
76✔
1042
            break;
151✔
1043
        case State::Paused:
77✔
1044
            break;
1✔
1045
    }
76✔
1046
}
149✔
1047

73✔
1048
void SyncSession::resume()
73✔
1049
{
73✔
1050
    util::CheckedUniqueLock lock(m_state_mutex);
73✔
1051
    switch (m_state) {
73✔
1052
        case State::Active:
73✔
1053
        case State::WaitingForAccessToken:
73✔
1054
            return;
73✔
1055
        case State::Paused:
146✔
1056
        case State::Dying:
146✔
1057
        case State::Inactive:
146✔
1058
            do_revive(std::move(lock));
146✔
1059
            break;
73✔
1060
    }
73✔
1061
}
837✔
1062

764✔
1063
void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
764✔
1064
{
1,693✔
1065
    auto u = user();
1,693✔
1066
    if (!u || !u->access_token_refresh_required()) {
1,693✔
1067
        become_active();
1,684✔
1068
        m_state_mutex.unlock(lock);
938✔
1069
        return;
938✔
1070
    }
938✔
1071

9✔
1072
    become_waiting_for_access_token();
18✔
1073
    // Release the lock for SDKs with a single threaded
9✔
1074
    // networking implementation such as our test suite
9✔
1075
    // so that the update can trigger a state change from
9✔
1076
    // the completion handler.
9✔
1077
    m_state_mutex.unlock(lock);
9✔
1078
    initiate_access_token_refresh();
9✔
1079
}
23✔
1080

14✔
1081
void SyncSession::close()
14✔
1082
{
100✔
1083
    util::CheckedUniqueLock lock(m_state_mutex);
86✔
1084
    close(std::move(lock));
86✔
1085
}
701✔
1086

615✔
1087
void SyncSession::close(util::CheckedUniqueLock lock)
466✔
1088
{
1,325✔
1089
    switch (m_state) {
1,296✔
1090
        case State::Active: {
1,000✔
1091
            switch (config(&SyncConfig::stop_policy)) {
1,000✔
1092
                case SyncSessionStopPolicy::Immediately:
518✔
1093
                    become_inactive(std::move(lock));
518✔
1094
                    break;
518✔
1095
                case SyncSessionStopPolicy::LiveIndefinitely:
✔
1096
                    // Don't do anything; session lives forever.
29✔
1097
                    m_state_mutex.unlock(lock);
29✔
1098
                    break;
29✔
1099
                case SyncSessionStopPolicy::AfterChangesUploaded:
74✔
1100
                    // Wait for all pending changes to upload.
466✔
1101
                    become_dying(std::move(lock));
511✔
1102
                    break;
511✔
1103
            }
1,029✔
1104
            break;
564✔
1105
        }
564✔
1106
        case State::Dying:
483✔
1107
            m_state_mutex.unlock(lock);
164✔
1108
            break;
164✔
1109
        case State::Paused:
149✔
1110
        case State::Inactive: {
425✔
1111
            // We need to register from the sync manager if it still exists so that we don't end up
147✔
1112
            // holding the DBRef open after the session is closed. Otherwise we can end up preventing
145✔
1113
            // the user from deleting the realm when it's in the paused/inactive state.
145✔
1114
            if (m_sync_manager) {
425✔
1115
                m_sync_manager->unregister_session(m_db->get_path());
423✔
1116
            }
423✔
1117
            m_state_mutex.unlock(lock);
425✔
1118
            break;
279✔
1119
        }
3✔
1120
        case State::WaitingForAccessToken:
2✔
1121
            // Immediately kill the session.
615✔
1122
            become_inactive(std::move(lock));
616✔
1123
            break;
1✔
1124
    }
859✔
1125
}
869✔
1126

10✔
1127
void SyncSession::shutdown_and_wait()
10✔
1128
{
120✔
1129
    {
120✔
1130
        // Transition immediately to `inactive` state. Calling this function must gurantee that any
10✔
1131
        // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
10✔
1132
        // must have been destroyed upon return. This allows the caller to follow up with a call to
10✔
1133
        // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
10✔
1134
        // Realm file to be closed. This works so long as this SyncSession object remains in the
10✔
1135
        // `inactive` state after the invocation of shutdown_and_wait().
5✔
1136
        util::CheckedUniqueLock lock(m_state_mutex);
115✔
1137
        if (m_state != State::Inactive && m_state != State::Paused) {
120✔
1138
            become_inactive(std::move(lock));
69✔
1139
        }
69✔
1140
    }
110✔
1141
    m_client.wait_for_session_terminations();
110✔
1142
}
117✔
1143

7✔
1144
void SyncSession::update_access_token(const std::string& signed_token)
7✔
1145
{
14✔
1146
    util::CheckedUniqueLock lock(m_state_mutex);
14✔
1147
    // We don't expect there to be a session when waiting for access token, but if there is, refresh its token.
1✔
1148
    // If not, the latest token will be seeded from SyncUser::access_token() on session creation.
1✔
1149
    if (m_session) {
14✔
1150
        m_session->refresh(signed_token);
5✔
1151
    }
5✔
1152
    if (m_state == State::WaitingForAccessToken) {
14✔
1153
        become_active();
4✔
1154
    }
4✔
1155
}
17✔
1156

10✔
1157
void SyncSession::initiate_access_token_refresh()
10✔
1158
{
20✔
1159
    if (auto session_user = user()) {
20✔
1160
        session_user->refresh_custom_data(handle_refresh(shared_from_this(), false));
10✔
1161
    }
10✔
1162
}
10✔
1163

1,013✔
1164
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback,
1,013✔
1165
                                          _impl::SyncProgressNotifier::NotifierType direction)
1,013✔
1166
{
2,112✔
1167
    bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);
2,112✔
1168

1,013✔
1169
    m_completion_request_counter++;
2,112✔
1170
    m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
2,112✔
1171
                                        std::make_pair(direction, std::move(callback)));
2,112✔
1172
    // If the state is inactive then just store the callback and return. The callback will get
91✔
1173
    // re-registered with the underlying session if/when the session ever becomes active again.
91✔
1174
    if (!m_session) {
2,021✔
1175
        return;
1,028✔
1176
    }
1,028✔
1177

922✔
1178
    auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
1,915✔
1179
                              : &sync::Session::async_wait_for_upload_completion;
1,422✔
1180

922✔
1181
    (m_session.get()->*waiter)([weak_self = weak_from_this(), id = m_completion_request_counter](Status status) {
1,017✔
1182
        auto self = weak_self.lock();
1,891✔
1183
        if (!self)
1,891✔
1184
            return;
912✔
1185
        util::CheckedUniqueLock lock(self->m_state_mutex);
1,877✔
1186
        auto callback_node = self->m_completion_callbacks.extract(id);
1,796✔
1187
        lock.unlock();
1,796✔
1188
        if (callback_node) {
1,877✔
1189
            callback_node.mapped().second(std::move(status));
1,808✔
1190
        }
886✔
1191
    });
979✔
1192
}
1,363✔
1193

370✔
1194
void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(Status)>&& callback)
370✔
1195
{
811✔
1196
    util::CheckedUniqueLock lock(m_state_mutex);
441✔
1197
    add_completion_callback(std::move(callback), ProgressDirection::upload);
441✔
1198
}
925✔
1199

484✔
1200
void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)>&& callback)
484✔
1201
{
968✔
1202
    util::CheckedUniqueLock lock(m_state_mutex);
484✔
1203
    add_completion_callback(std::move(callback), ProgressDirection::download);
484✔
1204
}
484✔
1205

2✔
1206
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
2✔
1207
                                                 ProgressDirection direction, bool is_streaming)
2✔
1208
{
2✔
1209
    return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
2✔
1210
}
4✔
1211

2✔
1212
void SyncSession::unregister_progress_notifier(uint64_t token)
2✔
1213
{
2✔
1214
    m_progress_notifier.unregister_callback(token);
2✔
1215
}
5✔
1216

3✔
1217
uint64_t SyncSession::register_connection_change_callback(std::function<ConnectionStateChangeCallback>&& callback)
3✔
1218
{
3✔
1219
    return m_connection_change_notifier.add_callback(std::move(callback));
3✔
1220
}
4✔
1221

1✔
1222
void SyncSession::unregister_connection_change_callback(uint64_t token)
1✔
1223
{
1✔
1224
    m_connection_change_notifier.remove_callback(token);
602✔
1225
}
1✔
1226

1227
SyncSession::~SyncSession() {}
15,002✔
1228

14,229✔
1229
SyncSession::State SyncSession::state() const
14,229✔
1230
{
14,514✔
1231
    util::CheckedUniqueLock lock(m_state_mutex);
285✔
1232
    return m_state;
285✔
1233
}
2,072✔
1234

1,787✔
1235
SyncSession::ConnectionState SyncSession::connection_state() const
1,787✔
1236
{
2,603✔
1237
    util::CheckedUniqueLock lock(m_connection_state_mutex);
816✔
1238
    return m_connection_state;
816✔
1239
}
1,489✔
1240

673✔
1241
std::string const& SyncSession::path() const
673✔
1242
{
875✔
1243
    return m_db->get_path();
875✔
1244
}
3,040,262✔
1245

3,039,387✔
1246
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store()
3,039,387✔
1247
{
3,064,716✔
1248
    util::CheckedLockGuard lock(m_state_mutex);
25,329✔
1249
    return m_flx_subscription_store;
25,329✔
1250
}
25,330✔
1251

1✔
1252
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
1✔
1253
{
2✔
1254
    util::CheckedLockGuard lock(m_state_mutex);
1✔
1255
    return m_subscription_store_base;
1✔
1256
}
69✔
1257

68✔
1258
sync::SaltedFileIdent SyncSession::get_file_ident() const
68✔
1259
{
136✔
1260
    auto repl = m_db->get_replication();
136✔
1261
    REALM_ASSERT(repl);
136✔
1262
    REALM_ASSERT(dynamic_cast<sync::ClientReplication*>(repl));
136✔
1263

68✔
1264
    sync::SaltedFileIdent ret;
136✔
1265
    sync::version_type unused_version;
136✔
1266
    sync::SyncProgress unused_progress;
136✔
1267
    static_cast<sync::ClientReplication*>(repl)->get_history().get_status(unused_version, ret, unused_progress);
68✔
1268
    return ret;
68✔
1269
}
78✔
1270

10✔
1271
std::string SyncSession::get_appservices_connection_id() const
10✔
1272
{
10✔
1273
    util::CheckedLockGuard lk(m_state_mutex);
10✔
1274
    if (!m_session) {
20✔
1275
        return {};
10✔
1276
    }
1277
    return m_session->get_appservices_connection_id();
10✔
1278
}
14✔
1279

9✔
1280
void SyncSession::update_configuration(SyncConfig new_config)
9✔
1281
{
13✔
1282
    while (true) {
14✔
1283
        util::CheckedUniqueLock state_lock(m_state_mutex);
14✔
1284
        if (m_state != State::Inactive && m_state != State::Paused) {
14✔
1285
            // Changing the state releases the lock, which means that by the
5✔
1286
            // time we reacquire the lock the state may have changed again
5✔
1287
            // (either due to one of the callbacks being invoked or another
5✔
1288
            // thread coincidentally doing something). We just attempt to keep
5✔
1289
            // switching it to inactive until it stays there.
5✔
1290
            become_inactive(std::move(state_lock));
9✔
1291
            continue;
9✔
1292
        }
9✔
1293

4✔
1294
        util::CheckedUniqueLock config_lock(m_config_mutex);
8✔
1295
        REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
8!
1296
        REALM_ASSERT(!m_session);
8✔
1297
        REALM_ASSERT(m_config.sync_config->user == new_config.user);
8✔
1298
        // Since this is used for testing purposes only, just update the current sync_config
4✔
1299
        m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
8✔
1300
        break;
8✔
1301
    }
4✔
1302
    revive_if_needed();
4✔
1303
}
17✔
1304

13✔
1305
void SyncSession::apply_sync_config_after_migration_or_rollback()
13✔
1306
{
26✔
1307
    // Migration state changed - Update the configuration to
13✔
1308
    // match the new sync mode.
1✔
1309
    util::CheckedLockGuard cfg_lock(m_config_mutex);
25✔
1310
    if (!m_migrated_sync_config)
25✔
1311
        return;
13✔
1312

12✔
1313
    m_config.sync_config = m_migrated_sync_config;
12✔
1314
    m_migrated_sync_config.reset();
12✔
1315
}
26✔
1316

14✔
1317
void SyncSession::save_sync_config_after_migration_or_rollback()
14✔
1318
{
28✔
1319
    util::CheckedLockGuard cfg_lock(m_config_mutex);
14✔
1320
    m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
14✔
1321
}
27✔
1322

13✔
1323
void SyncSession::update_subscription_store(bool flx_sync_requested)
13✔
1324
{
26✔
1325
    util::CheckedUniqueLock lock(m_state_mutex);
26✔
1326

13✔
1327
    // The session should be closed before updating the FLX subscription store
13✔
1328
    REALM_ASSERT(!m_session);
26✔
1329

13✔
1330
    // If the subscription store exists and switching to PBS, then clear the store
4✔
1331
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
17✔
1332
    if (!flx_sync_requested) {
17✔
1333
        if (m_flx_subscription_store) {
8✔
1334
            // Empty the subscription store and cancel any pending subscription notification
4✔
1335
            // waiters
4✔
1336
            auto subscription_store = std::move(m_flx_subscription_store);
8✔
1337
            lock.unlock();
8✔
1338
            subscription_store->terminate();
8✔
1339
            auto tr = m_db->start_write();
8✔
1340
            history.set_write_validator_factory(nullptr);
8✔
1341
            tr->rollback();
8✔
1342
        }
13✔
1343
        return;
13✔
1344
    }
5✔
1345

8✔
1346
    if (m_flx_subscription_store)
17✔
1347
        return; // Using FLX and subscription store already exists
9✔
1348

8✔
1349
    // Going from PBS -> FLX (or one doesn't exist yet), create a new subscription store
8✔
1350
    create_subscription_store();
16✔
1351

8✔
1352
    std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
16✔
1353
    lock.unlock();
16✔
1354

8✔
1355
    // If migrated to FLX, create subscriptions in the local realm to cover the existing data.
8✔
1356
    // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
8✔
1357
    make_active_subscription_set();
16✔
1358

8✔
1359
    auto tr = m_db->start_write();
16✔
1360
    set_write_validator_factory(weak_sub_mgr);
16✔
1361
    tr->rollback();
16✔
1362
}
16✔
1363

8✔
1364
void SyncSession::create_subscription_store()
1365
{
217✔
1366
    REALM_ASSERT(!m_flx_subscription_store);
435✔
1367

218✔
1368
    // Create the main subscription store instance when this is first called - this will
218✔
1369
    // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
218✔
1370
    // will be reset when rolling back to PBS after a client FLX migration
218✔
1371
    if (!m_subscription_store_base) {
435✔
1372
        m_subscription_store_base = sync::SubscriptionStore::create(m_db);
434✔
1373
    }
433✔
1374

217✔
1375
    // m_subscription_store_base is always around for the life of SyncSession, but the
218✔
1376
    // m_flx_subscription_store is set when using FLX.
218✔
1377
    m_flx_subscription_store = m_subscription_store_base;
435✔
1378
}
435✔
1379

218✔
1380
void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
1381
{
217✔
1382
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
435✔
1383
    history.set_write_validator_factory(
435✔
1384
        [weak_sub_mgr](Transaction& tr) -> util::UniqueFunction<sync::SyncReplication::WriteValidator> {
2,743✔
1385
            auto sub_mgr = weak_sub_mgr.lock();
4,964✔
1386
            REALM_ASSERT_RELEASE(sub_mgr);
4,964✔
1387
            auto latest_sub_tables = sub_mgr->get_tables_for_latest(tr);
4,964✔
1388
            return [tables = std::move(latest_sub_tables)](const Table& table) {
2,849✔
1389
                if (table.get_table_type() != Table::Type::TopLevel) {
2,849✔
1390
                    return;
596✔
1391
                }
430✔
1392
                auto object_class_name = Group::table_name_to_class_name(table.get_name());
410✔
1393
                if (tables.find(object_class_name) == tables.end()) {
361✔
1394
                    throw NoSubscriptionForWrite(
167✔
1395
                        util::format("Cannot write to class %1 when no flexible sync subscription has been created.",
2✔
1396
                                     object_class_name));
2✔
1397
                }
2✔
1398
            };
196✔
1399
        });
2,691✔
1400
}
2,656✔
1401

218✔
1402
// Represents a reference to the SyncSession from outside of the sync subsystem.
1403
// We attempt to keep the SyncSession in an active state as long as it has an external reference.
1404
class SyncSession::ExternalReference {
1405
public:
1406
    ExternalReference(std::shared_ptr<SyncSession> session)
1407
        : m_session(std::move(session))
1408
    {
773✔
1409
    }
1,374✔
1410

601✔
1411
    ~ExternalReference()
1412
    {
773✔
1413
        m_session->did_drop_external_reference();
1,374✔
1414
    }
1,374✔
1415

601✔
1416
private:
1417
    std::shared_ptr<SyncSession> m_session;
1418
};
1419

1420
std::shared_ptr<SyncSession> SyncSession::external_reference()
1421
{
883✔
1422
    util::CheckedLockGuard lock(m_external_reference_mutex);
1,592✔
1423

709✔
1424
    if (auto external_reference = m_external_reference.lock())
1,592✔
1425
        return std::shared_ptr<SyncSession>(external_reference, this);
819✔
1426

108✔
1427
    auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
1,374✔
1428
    m_external_reference = external_reference;
1,374✔
1429
    return std::shared_ptr<SyncSession>(external_reference, this);
1,374✔
1430
}
1,374✔
1431

601✔
1432
std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
1433
{
1,317✔
1434
    util::CheckedLockGuard lock(m_external_reference_mutex);
2,227✔
1435

910✔
1436
    if (auto external_reference = m_external_reference.lock())
2,227✔
1437
        return std::shared_ptr<SyncSession>(external_reference, this);
1,452✔
1438

309✔
1439
    return nullptr;
1,376✔
1440
}
1,376✔
1441

601✔
1442
void SyncSession::did_drop_external_reference()
1443
{
773✔
1444
    util::CheckedUniqueLock lock1(m_state_mutex);
1,374✔
1445
    {
1,374✔
1446
        util::CheckedLockGuard lock2(m_external_reference_mutex);
1,374✔
1447

601✔
1448
        // If the session is being resurrected we should not close the session.
601✔
1449
        if (!m_external_reference.expired())
1,374✔
1450
            return;
601✔
1451
    }
773✔
1452

601✔
1453
    close(std::move(lock1));
1,374✔
1454
}
1,374✔
1455

601✔
1456
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1457
                                                 NotifierType direction, bool is_streaming)
1458
{
24✔
1459
    util::UniqueFunction<void()> invocation;
48✔
1460
    uint64_t token_value = 0;
48✔
1461
    {
48✔
1462
        std::lock_guard<std::mutex> lock(m_mutex);
48✔
1463
        token_value = m_progress_notifier_token++;
48✔
1464
        NotifierPackage package{std::move(notifier), util::none, m_local_transaction_version, is_streaming,
48✔
1465
                                direction == NotifierType::download};
48✔
1466
        if (!m_current_progress) {
48✔
1467
            // Simply register the package, since we have no data yet.
24✔
1468
            m_packages.emplace(token_value, std::move(package));
12✔
1469
            return token_value;
12✔
1470
        }
12✔
1471
        bool skip_registration = false;
24✔
1472
        invocation = package.create_invocation(*m_current_progress, skip_registration);
36✔
1473
        if (skip_registration) {
36✔
1474
            token_value = 0;
23✔
1475
        }
10✔
1476
        else {
18✔
1477
            m_packages.emplace(token_value, std::move(package));
26✔
1478
        }
26✔
1479
    }
31✔
1480
    invocation();
36✔
1481
    return token_value;
36✔
1482
}
36✔
1483

18✔
1484
void SyncProgressNotifier::unregister_callback(uint64_t token)
1485
{
4✔
1486
    std::lock_guard<std::mutex> lock(m_mutex);
8✔
1487
    m_packages.erase(token);
8✔
1488
}
8✔
1489

4✔
1490
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1491
                                  uint64_t download_version, uint64_t snapshot_version)
1492
{
3,806✔
1493
    // Ignore progress messages from before we first receive a DOWNLOAD message
2,724✔
1494
    if (download_version == 0)
6,530✔
1495
        return;
4,445✔
1496

1,101✔
1497
    std::vector<util::UniqueFunction<void()>> invocations;
3,708✔
1498
    {
3,708✔
1499
        std::lock_guard<std::mutex> lock(m_mutex);
3,708✔
1500
        m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, snapshot_version};
3,708✔
1501

1,623✔
1502
        for (auto it = m_packages.begin(); it != m_packages.end();) {
3,741✔
1503
            bool should_delete = false;
1,689✔
1504
            invocations.emplace_back(it->second.create_invocation(*m_current_progress, should_delete));
66✔
1505
            it = should_delete ? m_packages.erase(it) : std::next(it);
57✔
1506
        }
66✔
1507
    }
2,118✔
1508
    // Run the notifiers only after we've released the lock.
1,623✔
1509
    for (auto& invocation : invocations)
3,708✔
1510
        invocation();
1,656✔
1511
}
2,118✔
1512

1,623✔
1513
void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
1514
{
4,091✔
1515
    std::lock_guard<std::mutex> lock(m_mutex);
7,898✔
1516
    m_local_transaction_version = snapshot_version;
7,898✔
1517
}
7,898✔
1518

3,807✔
1519
util::UniqueFunction<void()>
1520
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1521
{
51✔
1522
    uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
83✔
1523
    uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
83✔
1524
    if (!is_streaming) {
102✔
1525
        // If the sync client has not yet processed all of the local
51✔
1526
        // transactions then the uploadable data is incorrect and we should
35✔
1527
        // not invoke the callback
35✔
1528
        if (!is_download && snapshot_version > current_progress.snapshot_version)
70✔
1529
            return [] {};
36✔
1530

1✔
1531
        // The initial download size we get from the server is the uncompacted
34✔
1532
        // size, and so the download may complete before we actually receive
34✔
1533
        // that much data. When that happens, transferrable will drop and we
34✔
1534
        // need to use the new value instead of the captured one.
34✔
1535
        if (!captured_transferrable || *captured_transferrable > transferrable)
68✔
1536
            captured_transferrable = transferrable;
52✔
1537
        transferrable = *captured_transferrable;
52✔
1538
    }
68✔
1539

34✔
1540
    // A notifier is expired if at least as many bytes have been transferred
51✔
1541
    // as were originally considered transferrable.
51✔
1542
    is_expired = !is_streaming && transferred >= transferrable;
101✔
1543
    return [=, notifier = notifier] {
101✔
1544
        notifier(transferred, transferrable);
100✔
1545
    };
100✔
1546
}
101✔
1547

51✔
1548
uint64_t SyncSession::ConnectionChangeNotifier::add_callback(std::function<ConnectionStateChangeCallback> callback)
1549
{
3✔
1550
    std::lock_guard<std::mutex> lock(m_callback_mutex);
6✔
1551
    auto token = m_next_token++;
6✔
1552
    m_callbacks.push_back({std::move(callback), token});
6✔
1553
    return token;
6✔
1554
}
6✔
1555

3✔
1556
void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
1557
{
1✔
1558
    Callback old;
2✔
1559
    {
2✔
1560
        std::lock_guard<std::mutex> lock(m_callback_mutex);
2✔
1561
        auto it = std::find_if(begin(m_callbacks), end(m_callbacks), [=](const auto& c) {
2✔
1562
            return c.token == token;
2✔
1563
        });
2✔
1564
        if (it == end(m_callbacks)) {
2✔
1565
            return;
1✔
1566
        }
×
1567

1568
        size_t idx = distance(begin(m_callbacks), it);
2✔
1569
        if (m_callback_index != npos) {
2✔
1570
            if (m_callback_index >= idx)
1!
1571
                --m_callback_index;
×
1572
        }
×
1573
        --m_callback_count;
1✔
1574

1✔
1575
        old = std::move(*it);
2✔
1576
        m_callbacks.erase(it);
2✔
1577
    }
2✔
1578
}
2✔
1579

1✔
1580
void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
1581
{
2,930✔
1582
    std::unique_lock lock(m_callback_mutex);
5,443✔
1583
    m_callback_count = m_callbacks.size();
5,443✔
1584
    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
5,445✔
1585
        // acquire a local reference to the callback so that removing the
2,515✔
1586
        // callback from within it can't result in a dangling pointer
2✔
1587
        auto cb = m_callbacks[m_callback_index].fn;
4✔
1588
        lock.unlock();
4✔
1589
        cb(old_state, new_state);
4✔
1590
        lock.lock();
4✔
1591
    }
4✔
1592
    m_callback_index = npos;
2,932✔
1593
}
5,443✔
1594

2,513✔
1595
util::Future<std::string> SyncSession::send_test_command(std::string body)
1596
{
13✔
1597
    util::CheckedLockGuard lk(m_state_mutex);
26✔
1598
    if (!m_session) {
26✔
1599
        return Status{ErrorCodes::RuntimeError, "Session doesn't exist to send test command on"};
13✔
1600
    }
×
1601

1602
    return m_session->send_test_command(std::move(body));
26✔
1603
}
26✔
1604

13✔
1605
void SyncSession::make_active_subscription_set()
1606
{
8✔
1607
    util::CheckedUniqueLock lock(m_state_mutex);
8✔
1608

1609
    if (!m_active_subscriptions_after_migration)
8✔
1610
        return;
1611

1612
    REALM_ASSERT(m_flx_subscription_store);
8✔
1613

1614
    // Create subscription set from the subscriptions used to download the fresh realm after migration.
1615
    auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
8✔
1616
    active_mut_sub.import(*m_active_subscriptions_after_migration);
8✔
1617
    active_mut_sub.update_state(sync::SubscriptionSet::State::Complete);
8✔
1618
    active_mut_sub.commit();
8✔
1619

1620
    m_active_subscriptions_after_migration.reset();
8✔
1621
}
8✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc