• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / michael.wilkersonbarker_1363

27 Aug 2024 09:44PM UTC coverage: 91.095% (-0.05%) from 91.143%
michael.wilkersonbarker_1363

Pull #7994

Evergreen

michael-wb
Merge branch 'master' of github.com:realm/realm-core into mwb/remove-308-tests
Pull Request #7994: RCORE-2222 Remove 308 redirect tests

102826 of 181514 branches covered (56.65%)

157 of 161 new or added lines in 1 file covered. (97.52%)

134 existing lines in 21 files now uncovered.

217172 of 238402 relevant lines covered (91.09%)

5702800.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.84
/src/realm/object-store/sync/sync_session.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/object-store/thread_safe_reference.hpp>
22
#include <realm/object-store/impl/realm_coordinator.hpp>
23
#include <realm/object-store/sync/app.hpp>
24
#include <realm/object-store/sync/impl/sync_client.hpp>
25
#include <realm/object-store/sync/impl/sync_file.hpp>
26
#include <realm/object-store/sync/impl/app_metadata.hpp>
27
#include <realm/object-store/sync/sync_manager.hpp>
28
#include <realm/object-store/sync/sync_user.hpp>
29
#include <realm/object-store/util/scheduler.hpp>
30

31
#include <realm/db_options.hpp>
32
#include <realm/sync/client.hpp>
33
#include <realm/sync/config.hpp>
34
#include <realm/sync/network/http.hpp>
35
#include <realm/sync/network/websocket_error.hpp>
36
#include <realm/sync/noinst/client_history_impl.hpp>
37
#include <realm/sync/noinst/client_reset_operation.hpp>
38
#include <realm/sync/noinst/migration_store.hpp>
39
#include <realm/sync/noinst/sync_schema_migration.hpp>
40
#include <realm/sync/protocol.hpp>
41

42
using namespace realm;
43
using namespace realm::_impl;
44

45
using SessionWaiterPointer = void (sync::Session::*)(util::UniqueFunction<void(std::error_code)>);
46

47
constexpr const char SyncError::c_original_file_path_key[];
48
constexpr const char SyncError::c_recovery_file_path_key[];
49

50
/// STATES:
51
///
52
/// WAITING_FOR_ACCESS_TOKEN: a request has been initiated to ask
53
/// for an updated access token and the session is waiting for a response.
54
/// From: INACTIVE, DYING
55
/// To:
56
///    * ACTIVE: when the SDK successfully refreshes the token
57
///    * INACTIVE: if asked to log out, or if asked to close
58
///
59
/// ACTIVE: the session is connected to the Sync Server and is actively
60
/// transferring data.
61
/// From: INACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
62
/// To:
63
///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
64
///                is Immediate.
65
///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
66
///
67
/// DYING: the session is performing clean-up work in preparation to be destroyed.
68
/// From: ACTIVE
69
/// To:
70
///    * INACTIVE: when the clean-up work completes, if the session wasn't
71
///                revived, or if explicitly asked to log out before the
72
///                clean-up work begins
73
///    * ACTIVE: if the session is revived
74
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
75
///                                but the token is invalid or expired.
76
///
77
/// INACTIVE: the user owning this session has logged out, the `sync::Session`
78
/// owned by this session is destroyed, and the session is quiescent.
79
/// Note that a session briefly enters this state before being destroyed, but
80
/// it can also enter this state and stay there if the user has been logged out.
81
/// From: initial, ACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
82
/// To:
83
///    * ACTIVE: if the session is revived
84
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
85
///                                but the token is invalid or expired.
86

87
void SyncSession::become_active()
88
{
2,197✔
89
    REALM_ASSERT(m_state != State::Active);
2,197✔
90
    m_state = State::Active;
2,197✔
91

92
    // First time the session becomes active, register a notification on the sentinel subscription set to restart the
93
    // session and update to native FLX.
94
    if (m_migration_sentinel_query_version) {
2,197✔
95
        m_flx_subscription_store->get_by_version(*m_migration_sentinel_query_version)
2✔
96
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
97
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
2✔
98
                if (!s.is_ok()) {
2✔
99
                    return;
×
100
                }
×
101
                REALM_ASSERT(s.get_value() == sync::SubscriptionSet::State::Complete);
2✔
102
                if (auto strong_self = weak_self.lock()) {
2✔
103
                    strong_self->m_migration_store->cancel_migration();
2✔
104
                    strong_self->restart_session();
2✔
105
                }
2✔
106
            });
2✔
107
        m_migration_sentinel_query_version.reset();
2✔
108
    }
2✔
109

110
    // when entering from the Dying state the session will still be bound
111
    create_sync_session();
2,197✔
112

113
    // Register all the pending wait-for-completion blocks. This can
114
    // potentially add a redundant callback if we're coming from the Dying
115
    // state, but that's okay (we won't call the user callbacks twice).
116
    SyncSession::CompletionCallbacks callbacks_to_register;
2,197✔
117
    std::swap(m_completion_callbacks, callbacks_to_register);
2,197✔
118

119
    for (auto& [id, callback_tuple] : callbacks_to_register) {
2,197✔
120
        add_completion_callback(std::move(callback_tuple.second), callback_tuple.first);
504✔
121
    }
504✔
122
}
2,197✔
123

124
void SyncSession::become_dying(util::CheckedUniqueLock lock)
125
{
84✔
126
    REALM_ASSERT(m_state != State::Dying);
84✔
127
    m_state = State::Dying;
84✔
128

129
    // If we have no session, we cannot possibly upload anything.
130
    if (!m_session) {
84✔
131
        become_inactive(std::move(lock));
×
132
        return;
×
133
    }
×
134

135
    size_t current_death_count = ++m_death_count;
84✔
136
    m_session->async_wait_for_upload_completion([weak_session = weak_from_this(), current_death_count](Status) {
84✔
137
        if (auto session = weak_session.lock()) {
84✔
138
            util::CheckedUniqueLock lock(session->m_state_mutex);
82✔
139
            if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
82✔
140
                session->become_inactive(std::move(lock));
25✔
141
            }
25✔
142
        }
82✔
143
    });
84✔
144
    m_state_mutex.unlock(lock);
84✔
145
}
84✔
146

147
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status, bool cancel_subscription_notifications)
148
{
1,958✔
149
    REALM_ASSERT(m_state != State::Inactive);
1,958✔
150
    m_state = State::Inactive;
1,958✔
151

152
    do_become_inactive(std::move(lock), status, cancel_subscription_notifications);
1,958✔
153
}
1,958✔
154

155
void SyncSession::become_paused(util::CheckedUniqueLock lock)
156
{
204✔
157
    REALM_ASSERT(m_state != State::Paused);
204✔
158
    auto old_state = m_state;
204✔
159
    m_state = State::Paused;
204✔
160

161
    // Nothing to do if we're already inactive besides update the state.
162
    if (old_state == State::Inactive) {
204✔
163
        m_state_mutex.unlock(lock);
2✔
164
        return;
2✔
165
    }
2✔
166

167
    do_become_inactive(std::move(lock), Status::OK(), true);
202✔
168
}
202✔
169

170
void SyncSession::restart_session()
171
{
41✔
172
    util::CheckedUniqueLock lock(m_state_mutex);
41✔
173
    switch (m_state) {
41✔
174
        case State::Active:
41✔
175
            do_restart_session(std::move(lock));
41✔
176
            break;
41✔
177
        case State::WaitingForAccessToken:
✔
178
        case State::Paused:
✔
179
        case State::Dying:
✔
180
        case State::Inactive:
✔
181
            return;
×
182
    }
41✔
183
}
41✔
184

185
void SyncSession::do_restart_session(util::CheckedUniqueLock)
186
{
41✔
187
    // Go straight to inactive so the progress completion waiters will
188
    // continue to wait until the session restarts and completes the
189
    // upload/download sync
190
    m_state = State::Inactive;
41✔
191

192
    if (m_session) {
41✔
193
        m_session.reset();
41✔
194
    }
41✔
195

196
    // Create a new session and re-register the completion callbacks
197
    // The latest server path will be retrieved from sync_manager when
198
    // the new session is created by create_sync_session() in become
199
    // active.
200
    become_active();
41✔
201
}
41✔
202

203
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status,
204
                                     bool cancel_subscription_notifications)
205
{
2,160✔
206
    // Manually set the disconnected state. Sync would also do this, but
207
    // since the underlying SyncSession object already have been destroyed,
208
    // we are not able to get the callback.
209
    util::CheckedUniqueLock connection_state_lock(m_connection_state_mutex);
2,160✔
210
    auto old_state = m_connection_state;
2,160✔
211
    auto new_state = m_connection_state = SyncSession::ConnectionState::Disconnected;
2,160✔
212
    connection_state_lock.unlock();
2,160✔
213

214
    SyncSession::CompletionCallbacks waits;
2,160✔
215
    std::swap(waits, m_completion_callbacks);
2,160✔
216

217
    m_session = nullptr;
2,160✔
218
    if (m_sync_manager) {
2,160✔
219
        m_sync_manager->unregister_session(m_db->get_path());
2,160✔
220
    }
2,160✔
221

222
    auto subscription_store = m_flx_subscription_store;
2,160✔
223
    m_state_mutex.unlock(lock);
2,160✔
224

225
    // Send notifications after releasing the lock to prevent deadlocks in the callback.
226
    if (old_state != new_state) {
2,160✔
227
        m_connection_change_notifier.invoke_callbacks(old_state, connection_state());
1,696✔
228
    }
1,696✔
229

230
    if (status.is_ok())
2,160✔
231
        status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");
2,043✔
232

233
    if (subscription_store && cancel_subscription_notifications) {
2,160✔
234
        subscription_store->notify_all_state_change_notifications(status);
793✔
235
    }
793✔
236

237
    // Inform any queued-up completion handlers that they were cancelled.
238
    for (auto& [id, callback] : waits)
2,160✔
239
        callback.second(status);
84✔
240
}
2,160✔
241

242
void SyncSession::become_waiting_for_access_token()
243
{
20✔
244
    REALM_ASSERT(m_state != State::WaitingForAccessToken);
20✔
245
    m_state = State::WaitingForAccessToken;
20✔
246
}
20✔
247

248
void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
249
{
22✔
250
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
251
    {
22✔
252
        util::CheckedUniqueLock lock(m_state_mutex);
22✔
253
        cancel_pending_waits(std::move(lock), status);
22✔
254
    }
22✔
255
    if (user) {
22✔
256
        user->request_log_out();
22✔
257
    }
22✔
258

259
    if (auto error_handler = config(&SyncConfig::error_handler)) {
22✔
260
        auto user_facing_error = SyncError({ErrorCodes::AuthError, status.reason()}, true);
22✔
261
        error_handler(shared_from_this(), std::move(user_facing_error));
22✔
262
    }
22✔
263
}
22✔
264

265
static bool check_for_auth_failure(const app::AppError& error)
266
{
62✔
267
    using namespace realm::sync;
62✔
268
    // Auth failure is returned as a 401 (unauthorized) or 403 (forbidden) response
269
    if (error.additional_status_code) {
62✔
270
        auto status_code = HTTPStatus(*error.additional_status_code);
62✔
271
        if (status_code == HTTPStatus::Unauthorized || status_code == HTTPStatus::Forbidden)
62✔
272
            return true;
22✔
273
    }
62✔
274

275
    return false;
40✔
276
}
62✔
277

278
static bool check_for_redirect_response(const app::AppError& error)
279
{
40✔
280
    using namespace realm::sync;
40✔
281
    // Check for unhandled 301/308 permanent redirect response
282
    if (error.additional_status_code) {
40✔
283
        auto status_code = HTTPStatus(*error.additional_status_code);
40✔
284
        if (status_code == HTTPStatus::MovedPermanently || status_code == HTTPStatus::PermanentRedirect)
40✔
285
            return true;
×
286
    }
40✔
287

288
    return false;
40✔
289
}
40✔
290

291
util::UniqueFunction<void(std::optional<app::AppError>)>
292
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
293
{
89✔
294
    return [session, restart_session](std::optional<app::AppError> error) {
89✔
295
        auto session_user = session->user();
89✔
296
        if (!session_user) {
89✔
297
            util::CheckedUniqueLock lock(session->m_state_mutex);
×
298
            auto refresh_error = error ? error->to_status() : Status::OK();
×
299
            session->cancel_pending_waits(std::move(lock), refresh_error);
×
300
        }
×
301
        else if (error) {
89✔
302
            if (ErrorCodes::error_categories(error->code()).test(ErrorCategory::client_error)) {
62✔
303
                // any other client errors other than app_deallocated are considered fatal because
304
                // there was a problem locally before even sending the request to the server
305
                // eg. ClientErrorCode::user_not_found, ClientErrorCode::user_not_logged_in,
306
                // ClientErrorCode::too_many_redirects
UNCOV
307
                session->handle_bad_auth(session_user, error->to_status());
×
UNCOV
308
            }
×
309
            else if (check_for_auth_failure(*error)) {
62✔
310
                // A 401 response on a refresh request means that the token cannot be refreshed and we should not
311
                // retry. This can be because an admin has revoked this user's sessions, the user has been disabled,
312
                // or the refresh token has expired according to the server's clock.
313
                session->handle_bad_auth(
22✔
314
                    session_user,
22✔
315
                    {error->code(), util::format("Unable to refresh the user access token: %1", error->reason())});
22✔
316
            }
22✔
317
            else if (check_for_redirect_response(*error)) {
40✔
318
                // A 301 or 308 response is an unhandled permanent redirect response (which should not happen) - if
319
                // this is received, fail the request with an appropriate error message.
320
                // Temporary redirect responses (302, 307) are not supported
321
                session->handle_bad_auth(
×
322
                    session_user,
×
323
                    {error->code(), util::format("Unhandled redirect response when trying to reach the server: %1",
×
324
                                                 error->reason())});
×
325
            }
×
326
            else {
40✔
327
                // A refresh request has failed. This is an unexpected non-fatal error and we would
328
                // like to retry but we shouldn't do this immediately in order to not swamp the
329
                // server with requests. Consider two scenarios:
330
                // 1) If this request was spawned from the proactive token check, or a user
331
                // initiated request, the token may actually be valid. Just advance to Active
332
                // from WaitingForAccessToken if needed and let the sync server tell us if the
333
                // token is valid or not. If this also fails we will end up in case 2 below.
334
                // 2) If the sync connection initiated the request because the server is
335
                // unavailable or the connection otherwise encounters an unexpected error, we want
336
                // to let the sync client attempt to reinitialize the connection using its own
337
                // internal backoff timer which will happen automatically so nothing needs to
338
                // happen here.
339
                util::CheckedUniqueLock lock(session->m_state_mutex);
40✔
340
                // If updating access token while opening realm, just become active at this point
341
                // and try to use the current access token.
342
                if (session->m_state == State::WaitingForAccessToken) {
40✔
343
                    session->become_active();
2✔
344
                }
2✔
345
                // If `cancel_waits_on_nonfatal_error` is true, then cancel the waiters and pass along the error
346
                else if (session->config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
38✔
347
                    session->cancel_pending_waits(std::move(lock), error->to_status()); // unlocks the mutex
14✔
348
                }
14✔
349
            }
40✔
350
        }
62✔
351
        else {
27✔
352
            // If the session needs to be restarted, then restart the session now
353
            // The latest access token and server url will be pulled from the sync
354
            // manager when the new session is started.
355
            if (restart_session) {
27✔
356
                session->restart_session();
15✔
357
            }
15✔
358
            // Otherwise, update the access token and reconnect
359
            else {
12✔
360
                session->update_access_token(session_user->access_token());
12✔
361
            }
12✔
362
        }
27✔
363
    };
89✔
364
}
89✔
365

366
SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, const RealmConfig& config,
367
                         SyncManager* sync_manager)
368
    : m_config{config}
775✔
369
    , m_db{std::move(db)}
775✔
370
    , m_original_sync_config{m_config.sync_config}
775✔
371
    , m_migration_store{sync::MigrationStore::create(m_db)}
775✔
372
    , m_client(client)
775✔
373
    , m_sync_manager(sync_manager)
775✔
374
{
1,721✔
375
    REALM_ASSERT(m_config.sync_config);
1,721✔
376
    // we don't want the following configs enabled during a client reset
377
    m_config.scheduler = nullptr;
1,721✔
378
    m_config.audit_config = nullptr;
1,721✔
379

380
    // Adjust the sync_config if using PBS sync and already in the migrated or rollback state
381
    if (m_migration_store->is_migrated() || m_migration_store->is_rollback_in_progress()) {
1,721✔
382
        m_config.sync_config = sync::MigrationStore::convert_sync_config_to_flx(m_original_sync_config);
8✔
383
    }
8✔
384

385
    // If using FLX, set up m_flx_subscription_store and the history_write_validator
386
    if (m_config.sync_config->flx_sync_requested) {
1,721✔
387
        create_subscription_store();
689✔
388
        std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
689✔
389
        set_write_validator_factory(weak_sub_mgr);
689✔
390
    }
689✔
391

392
    // After a migration to FLX, if the user opens the realm with a flexible sync configuration, we need to first
393
    // upload any unsynced changes before updating to native FLX.
394
    // A subscription set is used as sentinel so we know when to stop uploading.
395
    // Note: Currently, a sentinel subscription set is always created even if there is nothing to upload.
396
    if (m_migration_store->is_migrated() && m_original_sync_config->flx_sync_requested) {
1,721✔
397
        m_migration_store->create_sentinel_subscription_set(*m_flx_subscription_store);
2✔
398
        m_migration_sentinel_query_version = m_migration_store->get_sentinel_subscription_set_version();
2✔
399
        REALM_ASSERT(m_migration_sentinel_query_version);
2✔
400
    }
2✔
401
}
1,721✔
402

403
void SyncSession::detach_from_sync_manager()
404
{
2✔
405
    shutdown_and_wait();
2✔
406
    util::CheckedLockGuard lk(m_state_mutex);
2✔
407
    m_sync_manager = nullptr;
2✔
408
}
2✔
409

410
void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
411
{
74✔
412
    util::CheckedLockGuard config_lock(m_config_mutex);
74✔
413
    // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
414
    auto original_path = path();
74✔
415
    error.user_info[SyncError::c_original_file_path_key] = original_path;
74✔
416
    using Action = SyncFileAction;
74✔
417
    auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
74✔
418
    std::string recovery_path = m_config.sync_config->user->create_file_action(
74✔
419
        action, original_path, m_config.sync_config->recovery_directory);
74✔
420
    if (should_backup == ShouldBackup::yes) {
74✔
421
        error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
74✔
422
    }
74✔
423
}
74✔
424

425
void SyncSession::download_fresh_realm(const sync::SessionErrorInfo& error_info)
426
{
218✔
427
    // first check that recovery will not be prevented
428
    if (error_info.server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
218✔
429
        auto mode = config(&SyncConfig::client_resync_mode);
4✔
430
        if (mode == ClientResyncMode::Recover) {
4✔
431
            handle_fresh_realm_downloaded(
2✔
432
                nullptr,
2✔
433
                {ErrorCodes::RuntimeError,
2✔
434
                 "A client reset is required but the server does not permit recovery for this client"},
2✔
435
                error_info);
2✔
436
            return;
2✔
437
        }
2✔
438
    }
4✔
439

440
    std::vector<char> encryption_key;
216✔
441
    {
216✔
442
        util::CheckedLockGuard lock(m_config_mutex);
216✔
443
        encryption_key = m_config.encryption_key;
216✔
444
    }
216✔
445

446
    DBOptions options;
216✔
447
    options.allow_file_format_upgrade = false;
216✔
448
    options.enable_async_writes = false;
216✔
449
    if (!encryption_key.empty())
216✔
450
        options.encryption_key = encryption_key.data();
2✔
451

452
    DBRef db;
216✔
453
    auto fresh_path = client_reset::get_fresh_path_for(m_db->get_path());
216✔
454
    try {
216✔
455
        // We want to attempt to use a pre-existing file to reduce the chance of
456
        // downloading the first part of the file only to then delete it over
457
        // and over, but if we fail to open it then we should just start over.
458
        try {
216✔
459
            db = DB::create(sync::make_client_replication(), fresh_path, options);
216✔
460
        }
216✔
461
        catch (...) {
216✔
462
            util::File::try_remove(fresh_path);
10✔
463
        }
10✔
464

465
        if (!db) {
216✔
466
            db = DB::create(sync::make_client_replication(), fresh_path, options);
2✔
467
        }
2✔
468
    }
208✔
469
    catch (...) {
216✔
470
        // Failed to open the fresh path after attempting to delete it, so we
471
        // just can't do automatic recovery.
472
        handle_fresh_realm_downloaded(nullptr, exception_to_status(), error_info);
8✔
473
        return;
8✔
474
    }
8✔
475

476
    util::CheckedLockGuard state_lock(m_state_mutex);
208✔
477
    if (m_state != State::Active) {
208✔
478
        return;
×
479
    }
×
480

481
    RealmConfig fresh_config;
208✔
482
    {
208✔
483
        util::CheckedLockGuard config_lock(m_config_mutex);
208✔
484
        fresh_config = m_config;
208✔
485
        fresh_config.path = fresh_path;
208✔
486
        // in case of migrations use the migrated config
487
        auto fresh_sync_config = m_migrated_sync_config ? *m_migrated_sync_config : *m_config.sync_config;
208✔
488
        // deep copy the sync config so we don't modify the live session's config
489
        fresh_config.sync_config = std::make_shared<SyncConfig>(fresh_sync_config);
208✔
490
        fresh_config.sync_config->client_resync_mode = ClientResyncMode::Manual;
208✔
491
        fresh_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
208✔
492
    }
208✔
493

494
    auto fresh_sync_session = m_sync_manager->get_session(db, fresh_config);
208✔
495
    auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
208✔
496
    // the fresh Realm may apply writes to this db after it has outlived its sync session
497
    // the writes are used to generate a changeset for recovery, but are never committed
498
    history.set_write_validator_factory({});
208✔
499

500
    fresh_sync_session->assert_mutex_unlocked();
208✔
501
    // The fresh realm uses flexible sync.
502
    if (auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store()) {
208✔
503
        auto fresh_sub = fresh_sub_store->get_latest();
90✔
504
        // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
505
        if (auto local_subs_store = m_flx_subscription_store) {
90✔
506
            auto fresh_mut_sub = fresh_sub.make_mutable_copy();
72✔
507
            fresh_mut_sub.import(local_subs_store->get_active());
72✔
508
            fresh_sub = fresh_mut_sub.commit();
72✔
509
        }
72✔
510

511
        auto self = shared_from_this();
90✔
512
        using SubscriptionState = sync::SubscriptionSet::State;
90✔
513
        fresh_sub.get_state_change_notification(SubscriptionState::Complete)
90✔
514
            .then([=](SubscriptionState) -> util::Future<sync::SubscriptionSet> {
90✔
515
                if (error_info.server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
88✔
516
                    return fresh_sub;
70✔
517
                }
70✔
518
                if (!self->m_migration_store->is_migration_in_progress()) {
18✔
519
                    return fresh_sub;
×
520
                }
×
521

522
                // fresh_sync_session is using a new realm file that doesn't have the migration_store info
523
                // so the query string from the local migration store will need to be provided
524
                auto query_string = self->m_migration_store->get_query_string();
18✔
525
                REALM_ASSERT(query_string);
18✔
526
                // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
527
                // message.
528
                fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
18✔
529
                return fresh_sub_store->get_latest()
18✔
530
                    .get_state_change_notification(SubscriptionState::Complete)
18✔
531
                    .then([fresh_sub_store](SubscriptionState) {
18✔
532
                        return fresh_sub_store->get_latest();
18✔
533
                    });
18✔
534
            })
18✔
535
            .get_async([=](StatusWith<sync::SubscriptionSet>&& subs) {
90✔
536
                // Keep the sync session alive while it's downloading, but then close
537
                // it immediately
538
                fresh_sync_session->force_close();
90✔
539
                if (subs.is_ok()) {
90✔
540
                    self->handle_fresh_realm_downloaded(db, Status::OK(), error_info, std::move(subs.get_value()));
88✔
541
                }
88✔
542
                else {
2✔
543
                    self->handle_fresh_realm_downloaded(nullptr, std::move(subs.get_status()), error_info);
2✔
544
                }
2✔
545
            });
90✔
546
    }
90✔
547
    else { // pbs
118✔
548
        fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](Status status) {
118✔
549
            // Keep the sync session alive while it's downloading, but then close
550
            // it immediately
551
            fresh_sync_session->force_close();
118✔
552
            if (auto strong_self = weak_self.lock()) {
118✔
553
                if (status.is_ok()) {
118✔
554
                    strong_self->handle_fresh_realm_downloaded(db, Status::OK(), error_info);
110✔
555
                }
110✔
556
                else {
8✔
557
                    strong_self->handle_fresh_realm_downloaded(nullptr, std::move(status), error_info);
8✔
558
                }
8✔
559
            }
118✔
560
        });
118✔
561
    }
118✔
562
    fresh_sync_session->revive_if_needed();
208✔
563
}
208✔
564

565
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status result, const sync::SessionErrorInfo& cr_error_info,
566
                                                std::optional<sync::SubscriptionSet> new_subs)
567
{
218✔
568
    util::CheckedUniqueLock lock(m_state_mutex);
218✔
569
    if (m_state != State::Active) {
218✔
570
        return;
×
571
    }
×
572
    // The download can fail for many reasons. For example:
573
    // - unable to write the fresh copy to the file system
574
    // - during download of the fresh copy, the fresh copy itself is reset
575
    // - in FLX mode there was a problem fulfilling the previously active subscription
576
    if (!result.is_ok()) {
218✔
577
        if (result == ErrorCodes::OperationAborted) {
20✔
578
            return;
8✔
579
        }
8✔
580
        lock.unlock();
12✔
581

582
        sync::SessionErrorInfo synthetic(
12✔
583
            Status{ErrorCodes::AutoClientResetFailed,
12✔
584
                   util::format("A fatal error occurred during '%1' client reset for %2: '%3'",
12✔
585
                                cr_error_info.server_requests_action, cr_error_info.status, result)},
12✔
586
            sync::IsFatal{true});
12✔
587
        handle_error(synthetic);
12✔
588
        return;
12✔
589
    }
20✔
590

591
    // Performing a client reset requires tearing down our current
592
    // sync session and creating a new one with the relevant client reset config. This
593
    // will result in session completion handlers firing
594
    // when the old session is torn down, which we don't want as this
595
    // is supposed to be transparent to the user.
596
    //
597
    // To avoid this, we need to move the completion handlers aside temporarily so
598
    // that moving to the inactive state doesn't clear them - they will be
599
    // re-registered when the session becomes active again.
600
    {
198✔
601
        m_client_reset_fresh_copy = db;
198✔
602
        CompletionCallbacks callbacks;
198✔
603
        // Save the client reset error for when the original sync session is revived
604
        m_client_reset_error = cr_error_info;
198✔
605

606
        std::swap(m_completion_callbacks, callbacks);
198✔
607
        // always swap back, even if advance_state throws
608
        auto guard = util::make_scope_exit([&]() noexcept {
198✔
609
            util::CheckedUniqueLock lock(m_state_mutex);
198✔
610
            if (m_completion_callbacks.empty())
198✔
611
                std::swap(callbacks, m_completion_callbacks);
198✔
612
            else
×
613
                m_completion_callbacks.merge(std::move(callbacks));
×
614
        });
198✔
615
        // Do not cancel the notifications on subscriptions.
616
        bool cancel_subscription_notifications = false;
198✔
617
        bool is_migration =
198✔
618
            m_client_reset_error->server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
198✔
619
            m_client_reset_error->server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
198✔
620
        become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock
198✔
621

622
        // Once the session is inactive, update sync config and subscription store after migration.
623
        if (is_migration) {
198✔
624
            apply_sync_config_after_migration_or_rollback();
26✔
625
            auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
26✔
626
            update_subscription_store(flx_sync_requested, std::move(new_subs));
26✔
627
        }
26✔
628
    }
198✔
629
    revive_if_needed();
198✔
630
}
198✔
631

632
util::Future<void> SyncSession::pause_async()
633
{
28✔
634
    {
28✔
635
        util::CheckedUniqueLock lock(m_state_mutex);
28✔
636
        // Nothing to wait for if the session is already paused or inactive.
637
        if (m_state == SyncSession::State::Paused || m_state == SyncSession::State::Inactive) {
28✔
638
            return util::Future<void>::make_ready();
×
639
        }
×
640
    }
28✔
641
    // Transition immediately to `paused` state. Calling this function must guarantee that any
642
    // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
643
    // must have been destroyed upon return. This allows the caller to follow up with a call to
644
    // sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
645
    // so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
646
    pause();
28✔
647
    return m_client.notify_session_terminated();
28✔
648
}
28✔
649

650
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
651
{
16✔
652
    session.handle_error(std::move(error));
16✔
653
}
16✔
654

655
util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
656
{
2✔
657
    return session.pause_async();
2✔
658
}
2✔
659

660
// This method should only be called from within the error handler callback registered upon the underlying
661
// `m_session`.
662
void SyncSession::handle_error(sync::SessionErrorInfo error)
663
{
604✔
664
    enum class NextStateAfterError { none, inactive, error };
604✔
665
    auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
604✔
666
    std::optional<ShouldBackup> delete_file;
604✔
667
    bool log_out_user = false;
604✔
668
    bool unrecognized_by_client = false;
604✔
669

670
    if (error.status == ErrorCodes::AutoClientResetFailed) {
604✔
671
        // At this point, automatic recovery has been attempted but it failed.
672
        // Fallback to a manual reset and let the user try to handle it.
673
        next_state = NextStateAfterError::inactive;
50✔
674
        delete_file = ShouldBackup::yes;
50✔
675
    }
50✔
676
    else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
554✔
677
        switch (error.server_requests_action) {
538✔
678
            case sync::ProtocolErrorInfo::Action::NoAction:
✔
679
                REALM_UNREACHABLE(); // This is not sent by the MongoDB server
680
            case sync::ProtocolErrorInfo::Action::ApplicationBug:
37✔
681
                [[fallthrough]];
37✔
682
            case sync::ProtocolErrorInfo::Action::ProtocolViolation:
43✔
683
                next_state = NextStateAfterError::inactive;
43✔
684
                break;
43✔
685
            case sync::ProtocolErrorInfo::Action::Warning:
36✔
686
                break; // not fatal, but should be bubbled up to the user below.
36✔
687
            case sync::ProtocolErrorInfo::Action::Transient:
118✔
688
                // Not real errors, don't need to be reported to the binding.
689
                return;
118✔
690
            case sync::ProtocolErrorInfo::Action::DeleteRealm:
✔
691
                next_state = NextStateAfterError::inactive;
×
692
                delete_file = ShouldBackup::no;
×
693
                break;
×
694
            case sync::ProtocolErrorInfo::Action::ClientReset:
210✔
695
                [[fallthrough]];
210✔
696
            case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
214✔
697
                switch (config(&SyncConfig::client_resync_mode)) {
214✔
698
                    case ClientResyncMode::Manual:
24✔
699
                        next_state = NextStateAfterError::inactive;
24✔
700
                        delete_file = ShouldBackup::yes;
24✔
701
                        break;
24✔
702
                    case ClientResyncMode::DiscardLocal:
82✔
703
                        [[fallthrough]];
82✔
704
                    case ClientResyncMode::RecoverOrDiscard:
96✔
705
                        [[fallthrough]];
96✔
706
                    case ClientResyncMode::Recover:
190✔
707
                        download_fresh_realm(error);
190✔
708
                        return; // do not propagate the error to the user at this point
190✔
709
                }
214✔
710
                break;
24✔
711
            case sync::ProtocolErrorInfo::Action::MigrateToFLX:
24✔
712
                // Should not receive this error if original sync config is FLX
713
                REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
18✔
714
                REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
18✔
715
                // Original config was PBS, migrating to FLX
716
                m_migration_store->migrate_to_flx(*error.migration_query_string,
18✔
717
                                                  m_original_sync_config->partition_value);
18✔
718
                save_sync_config_after_migration_or_rollback();
18✔
719
                download_fresh_realm(error);
18✔
720
                return;
18✔
721
            case sync::ProtocolErrorInfo::Action::RevertToPBS:
10✔
722
                // If the client was updated to use FLX natively, but the server was rolled back to PBS,
723
                // the server should be sending switch_to_flx_sync; throw exception if this error is not
724
                // received.
725
                if (m_original_sync_config->flx_sync_requested) {
10✔
726
                    throw LogicError(ErrorCodes::InvalidServerResponse,
×
727
                                     "Received 'RevertToPBS' from server after rollback while client is natively "
×
728
                                     "using FLX - expected 'SwitchToPBS'");
×
729
                }
×
730
                // Original config was PBS, rollback the migration
731
                m_migration_store->rollback_to_pbs();
10✔
732
                save_sync_config_after_migration_or_rollback();
10✔
733
                download_fresh_realm(error);
10✔
734
                return;
10✔
735
            case sync::ProtocolErrorInfo::Action::RefreshUser:
22✔
736
                if (auto u = user()) {
22✔
737
                    u->request_access_token(handle_refresh(shared_from_this(), false));
22✔
738
                }
22✔
739
                return;
22✔
740
            case sync::ProtocolErrorInfo::Action::RefreshLocation:
47✔
741
                if (auto u = user()) {
47✔
742
                    u->request_refresh_location(handle_refresh(shared_from_this(), true));
47✔
743
                }
47✔
744
                return;
47✔
745
            case sync::ProtocolErrorInfo::Action::LogOutUser:
✔
746
                next_state = NextStateAfterError::inactive;
×
747
                log_out_user = true;
×
748
                break;
×
749
            case sync::ProtocolErrorInfo::Action::MigrateSchema:
30✔
750
                util::CheckedUniqueLock lock(m_state_mutex);
30✔
751
                // Should only be received for FLX sync.
752
                REALM_ASSERT(m_original_sync_config->flx_sync_requested);
30✔
753
                m_previous_schema_version = error.previous_schema_version;
30✔
754
                return; // do not propagate the error to the user at this point
30✔
755
        }
538✔
756
    }
538✔
757
    else {
16✔
758
        // Unrecognized error code.
759
        unrecognized_by_client = true;
16✔
760
    }
16✔
761

762
    util::CheckedUniqueLock lock(m_state_mutex);
169✔
763
    SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
169✔
764
    // `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
765
    sync_error.server_requests_action = error.server_requests_action;
169✔
766
    sync_error.is_unrecognized_by_client = unrecognized_by_client;
169✔
767

768
    if (delete_file)
169✔
769
        update_error_and_mark_file_for_deletion(sync_error, *delete_file);
74✔
770

771
    if (m_state == State::Dying && error.is_fatal) {
169✔
772
        become_inactive(std::move(lock), error.status);
2✔
773
        return;
2✔
774
    }
2✔
775

776
    // Don't bother invoking m_config.error_handler if the sync is inactive.
777
    // It does not make sense to call the handler when the session is closed.
778
    if (m_state == State::Inactive || m_state == State::Paused) {
167✔
779
        return;
×
780
    }
×
781

782
    switch (next_state) {
167✔
783
        case NextStateAfterError::none:
52✔
784
            if (config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
52✔
785
                cancel_pending_waits(std::move(lock), sync_error.status); // unlocks the mutex
×
786
            }
×
787
            break;
52✔
788
        case NextStateAfterError::inactive: {
115✔
789
            become_inactive(std::move(lock), sync_error.status);
115✔
790
            break;
115✔
791
        }
×
792
        case NextStateAfterError::error: {
✔
793
            cancel_pending_waits(std::move(lock), sync_error.status);
×
794
            break;
×
795
        }
×
796
    }
167✔
797

798
    if (log_out_user) {
167✔
799
        if (auto u = user())
×
800
            u->request_log_out();
×
801
    }
×
802

803
    if (auto error_handler = config(&SyncConfig::error_handler)) {
167✔
804
        error_handler(shared_from_this(), std::move(sync_error));
167✔
805
    }
167✔
806
}
167✔
807

808
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
809
{
36✔
810
    CompletionCallbacks callbacks;
36✔
811
    std::swap(callbacks, m_completion_callbacks);
36✔
812

813
    // Inform any waiters on pending subscription states that they were cancelled
814
    if (m_flx_subscription_store) {
36✔
815
        auto subscription_store = m_flx_subscription_store;
×
816
        m_state_mutex.unlock(lock);
×
817
        subscription_store->notify_all_state_change_notifications(error);
×
818
    }
×
819
    else {
36✔
820
        m_state_mutex.unlock(lock);
36✔
821
    }
36✔
822

823
    // Inform any queued-up completion handlers that they were cancelled.
824
    for (auto& [id, callback] : callbacks)
36✔
825
        callback.second(error);
32✔
826
}
36✔
827

828
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
829
                                         uint64_t uploadable, uint64_t snapshot_version, double download_estimate,
830
                                         double upload_estimate, int64_t query_version)
831
{
4,424✔
832
    m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate,
4,424✔
833
                               upload_estimate, query_version);
4,424✔
834
}
4,424✔
835

836

837
static sync::Session::Config::ClientReset
838
make_client_reset_config(const RealmConfig& base_config, const std::shared_ptr<SyncConfig>& sync_config,
839
                         DBRef&& fresh_copy, sync::SessionErrorInfo&& error_info, bool schema_migration_detected)
840
{
198✔
841
    REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);
198✔
842

843
    sync::Session::Config::ClientReset config{sync_config->client_resync_mode, std::move(fresh_copy),
198✔
844
                                              std::move(error_info.status), error_info.server_requests_action};
198✔
845

846
    // The conditions here are asymmetric because if we have *either* a before
847
    // or after callback we need to make sure to initialize the local schema
848
    // before the client reset happens.
849
    if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
198✔
850
        return config;
30✔
851

852
    // We cannot initialize the local schema in case of a sync schema migration.
853
    // Currently, a schema migration involves breaking changes so opening the realm
854
    // with the new schema results in a crash.
855
    if (schema_migration_detected)
168✔
856
        return config;
4✔
857

858
    RealmConfig realm_config = base_config;
164✔
859
    realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
164✔
860
    realm_config.scheduler = util::Scheduler::make_dummy();
164✔
861

862
    if (sync_config->notify_after_client_reset) {
164✔
863
        config.notify_after_client_reset = [realm_config](VersionID previous_version, bool did_recover) {
160✔
864
            auto coordinator = _impl::RealmCoordinator::get_coordinator(realm_config);
126✔
865
            ThreadSafeReference active_after = coordinator->get_unbound_realm();
126✔
866
            SharedRealm frozen_before = coordinator->get_realm(realm_config, previous_version);
126✔
867
            REALM_ASSERT(frozen_before);
126✔
868
            REALM_ASSERT(frozen_before->is_frozen());
126✔
869
            realm_config.sync_config->notify_after_client_reset(std::move(frozen_before), std::move(active_after),
126✔
870
                                                                did_recover);
126✔
871
        };
126✔
872
    }
160✔
873
    config.notify_before_client_reset = [config = std::move(realm_config)]() -> VersionID {
164✔
874
        // Opening the Realm live here may make a write if the schema is different
875
        // than what exists on disk. It is necessary to pass a fully usable Realm
876
        // to the user here. Note that the schema changes made here will be considered
877
        // an "offline write" to be recovered if this is recovery mode.
878
        auto before = Realm::get_shared_realm(config);
164✔
879
        if (auto& notify_before = config.sync_config->notify_before_client_reset) {
164✔
880
            notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
158✔
881
        }
158✔
882
        // Note that if the SDK wrote to the Realm (hopefully by requesting a
883
        // live instance and not opening a secondary one), this may be a
884
        // different version than what we had before calling the callback.
885
        before->refresh();
164✔
886
        return before->read_transaction_version();
164✔
887
    };
164✔
888

889
    return config;
164✔
890
}
168✔
891

892
void SyncSession::create_sync_session()
893
{
2,197✔
894
    if (m_session)
2,197✔
895
        return;
2✔
896

897
    util::CheckedLockGuard config_lock(m_config_mutex);
2,195✔
898

899
    REALM_ASSERT(m_config.sync_config);
2,195✔
900
    SyncConfig& sync_config = *m_config.sync_config;
2,195✔
901
    REALM_ASSERT(sync_config.user);
2,195✔
902

903
    std::weak_ptr<SyncSession> weak_self = weak_from_this();
2,195✔
904

905
    sync::Session::Config session_config;
2,195✔
906
    session_config.signed_user_token = sync_config.user->access_token();
2,195✔
907
    session_config.user_id = sync_config.user->user_id();
2,195✔
908
    session_config.realm_identifier = sync_config.partition_value;
2,195✔
909
    session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
2,195✔
910
    session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
2,195✔
911
    session_config.ssl_verify_callback = sync_config.ssl_verify_callback;
2,195✔
912
    session_config.proxy_config = sync_config.proxy_config;
2,195✔
913
    session_config.simulate_integration_error = sync_config.simulate_integration_error;
2,195✔
914
    session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
2,195✔
915
    session_config.fresh_realm_download = client_reset::is_fresh_path(m_config.path);
2,195✔
916
    session_config.schema_version = m_config.schema_version;
2,195✔
917

918
    if (sync_config.on_sync_client_event_hook) {
2,195✔
919
        session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
454✔
920
                                                    weak_self](const SyncClientHookData& data) {
7,819✔
921
            return hook(weak_self, data);
7,819✔
922
        };
7,819✔
923
    }
454✔
924

925
    {
2,195✔
926
        // At this point the sync route was either updated when the first App request was performed, or
927
        // was populated by a generated value that will be used for first contact. If the generated sync
928
        // route is not correct, either a redirection will be received or the connection will fail,
929
        // resulting in an update to both the access token and the location.
930
        auto [sync_route, verified] = m_sync_manager->sync_route();
2,195✔
931
        REALM_ASSERT_EX(!sync_route.empty(), "Server URL cannot be empty");
2,195✔
932

933
        if (!m_client.decompose_server_url(sync_route, session_config.protocol_envelope,
2,195✔
934
                                           session_config.server_address, session_config.server_port,
2,195✔
935
                                           session_config.service_identifier)) {
2,195✔
936
            throw sync::BadServerUrl(sync_route);
×
937
        }
×
938
        session_config.server_verified = verified;
2,195✔
939

940
        m_server_url = sync_route;
2,195✔
941
        m_server_url_verified = verified;
2,195✔
942
    }
2,195✔
943

944
    if (sync_config.authorization_header_name) {
2,195✔
945
        session_config.authorization_header_name = *sync_config.authorization_header_name;
×
946
    }
×
947
    session_config.custom_http_headers = sync_config.custom_http_headers;
2,195✔
948

949
    if (m_client_reset_error) {
2,195✔
950
        auto client_reset_error = std::exchange(m_client_reset_error, std::nullopt);
198✔
951
        if (client_reset_error->server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
198✔
952
            // Use the original sync config, not the updated one from the migration store
953
            session_config.client_reset_config =
198✔
954
                make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy),
198✔
955
                                         std::move(*client_reset_error), m_previous_schema_version.has_value());
198✔
956
            session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
198✔
957
        }
198✔
958
    }
198✔
959

960
    session_config.progress_handler = [weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
2,195✔
961
                                                  uint_fast64_t uploaded, uint_fast64_t uploadable,
2,195✔
962
                                                  uint_fast64_t snapshot_version, double download_estimate,
2,195✔
963
                                                  double upload_estimate, int64_t query_version) {
4,472✔
964
        if (auto self = weak_self.lock()) {
4,472✔
965
            self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version,
4,424✔
966
                                         download_estimate, upload_estimate, query_version);
4,424✔
967
        }
4,424✔
968
    };
4,472✔
969

970
    session_config.connection_state_change_listener = [weak_self](sync::ConnectionState state,
2,195✔
971
                                                                  std::optional<sync::SessionErrorInfo> error) {
4,938✔
972
        using cs = sync::ConnectionState;
4,938✔
973
        ConnectionState new_state = [&] {
4,938✔
974
            switch (state) {
4,938✔
975
                case cs::disconnected:
511✔
976
                    return ConnectionState::Disconnected;
511✔
977
                case cs::connecting:
2,249✔
978
                    return ConnectionState::Connecting;
2,249✔
979
                case cs::connected:
2,178✔
980
                    return ConnectionState::Connected;
2,178✔
981
            }
4,938✔
982
            REALM_UNREACHABLE();
983
        }();
×
984
        // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is
985
        // nothing useful we can do with them.
986
        if (auto self = weak_self.lock()) {
4,938✔
987
            self->update_connection_state(new_state);
4,908✔
988
            if (error) {
4,908✔
989
                self->handle_error(std::move(*error));
576✔
990
            }
576✔
991
        }
4,908✔
992
    };
4,938✔
993

994
    m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
2,195✔
995
}
2,195✔
996

997
void SyncSession::update_connection_state(ConnectionState new_state)
998
{
4,908✔
999
    if (new_state == ConnectionState::Connected) {
4,908✔
1000
        util::CheckedLockGuard lock(m_config_mutex);
2,163✔
1001
        m_server_url_verified = true;
2,163✔
1002
    }
2,163✔
1003

1004
    ConnectionState old_state;
4,908✔
1005
    {
4,908✔
1006
        util::CheckedLockGuard lock(m_connection_state_mutex);
4,908✔
1007
        old_state = m_connection_state;
4,908✔
1008
        m_connection_state = new_state;
4,908✔
1009
    }
4,908✔
1010

1011
    // Notify any registered connection callbacks of the state transition
1012
    if (old_state != new_state) {
4,908✔
1013
        m_connection_change_notifier.invoke_callbacks(old_state, new_state);
4,834✔
1014
    }
4,834✔
1015
}
4,908✔
1016

1017
void SyncSession::nonsync_transact_notify(sync::version_type version)
1018
{
10,522✔
1019
    m_progress_notifier.set_local_version(version);
10,522✔
1020

1021
    util::CheckedUniqueLock lock(m_state_mutex);
10,522✔
1022
    switch (m_state) {
10,522✔
1023
        case State::Active:
10,109✔
1024
        case State::WaitingForAccessToken:
10,111✔
1025
            if (m_session) {
10,111✔
1026
                m_session->nonsync_transact_notify(version);
10,109✔
1027
            }
10,109✔
1028
            break;
10,111✔
1029
        case State::Dying:
✔
1030
        case State::Inactive:
71✔
1031
        case State::Paused:
411✔
1032
            break;
411✔
1033
    }
10,522✔
1034
}
10,522✔
1035

1036
void SyncSession::revive_if_needed()
1037
{
2,637✔
1038
    util::CheckedUniqueLock lock(m_state_mutex);
2,637✔
1039
    switch (m_state) {
2,637✔
1040
        case State::Active:
699✔
1041
        case State::WaitingForAccessToken:
699✔
1042
        case State::Paused:
705✔
1043
            return;
705✔
1044
        case State::Dying:
2✔
1045
        case State::Inactive:
1,932✔
1046
            do_revive(std::move(lock));
1,932✔
1047
            break;
1,932✔
1048
    }
2,637✔
1049
}
2,637✔
1050

1051
void SyncSession::handle_reconnect()
1052
{
10✔
1053
    util::CheckedUniqueLock lock(m_state_mutex);
10✔
1054
    switch (m_state) {
10✔
1055
        case State::Active:
10✔
1056
            m_session->cancel_reconnect_delay();
10✔
1057
            break;
10✔
1058
        case State::Dying:
✔
1059
        case State::Inactive:
✔
1060
        case State::WaitingForAccessToken:
✔
1061
        case State::Paused:
✔
1062
            break;
×
1063
    }
10✔
1064
}
10✔
1065

1066
void SyncSession::force_close()
1067
{
358✔
1068
    util::CheckedUniqueLock lock(m_state_mutex);
358✔
1069
    switch (m_state) {
358✔
1070
        case State::Active:
279✔
1071
        case State::Dying:
334✔
1072
        case State::WaitingForAccessToken:
338✔
1073
            become_inactive(std::move(lock));
338✔
1074
            break;
338✔
1075
        case State::Inactive:
18✔
1076
        case State::Paused:
20✔
1077
            break;
20✔
1078
    }
358✔
1079
}
358✔
1080

1081
void SyncSession::pause()
1082
{
206✔
1083
    util::CheckedUniqueLock lock(m_state_mutex);
206✔
1084
    switch (m_state) {
206✔
1085
        case State::Active:
202✔
1086
        case State::Dying:
202✔
1087
        case State::WaitingForAccessToken:
202✔
1088
        case State::Inactive:
204✔
1089
            become_paused(std::move(lock));
204✔
1090
            break;
204✔
1091
        case State::Paused:
2✔
1092
            break;
2✔
1093
    }
206✔
1094
}
206✔
1095

1096
void SyncSession::resume()
1097
{
218✔
1098
    util::CheckedUniqueLock lock(m_state_mutex);
218✔
1099
    switch (m_state) {
218✔
1100
        case State::Active:
✔
1101
        case State::WaitingForAccessToken:
✔
1102
            return;
×
1103
        case State::Paused:
198✔
1104
        case State::Dying:
198✔
1105
        case State::Inactive:
218✔
1106
            do_revive(std::move(lock));
218✔
1107
            break;
218✔
1108
    }
218✔
1109
}
218✔
1110

1111
void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
1112
{
2,166✔
1113
    auto u = user();
2,166✔
1114
    // If the sync manager has a valid route and the user and it's access token
1115
    // are valid, then revive the session.
1116
    if (!u || !u->access_token_refresh_required()) {
2,166✔
1117
        become_active();
2,146✔
1118
        m_state_mutex.unlock(lock);
2,146✔
1119
        return;
2,146✔
1120
    }
2,146✔
1121

1122
    // Otherwise, either the access token has expired or the location info hasn't
1123
    // been requested since the app was started - request a new access token to
1124
    // refresh both.
1125
    become_waiting_for_access_token();
20✔
1126
    // Release the lock for SDKs with a single threaded
1127
    // networking implementation such as our test suite
1128
    // so that the update can trigger a state change from
1129
    // the completion handler.
1130
    m_state_mutex.unlock(lock);
20✔
1131
    initiate_access_token_refresh();
20✔
1132
}
20✔
1133

1134
void SyncSession::close()
1135
{
100✔
1136
    util::CheckedUniqueLock lock(m_state_mutex);
100✔
1137
    close(std::move(lock));
100✔
1138
}
100✔
1139

1140
void SyncSession::close(util::CheckedUniqueLock lock)
1141
{
1,821✔
1142
    switch (m_state) {
1,821✔
1143
        case State::Active: {
1,243✔
1144
            switch (config(&SyncConfig::stop_policy)) {
1,243✔
1145
                case SyncSessionStopPolicy::Immediately:
1,159✔
1146
                    become_inactive(std::move(lock));
1,159✔
1147
                    break;
1,159✔
1148
                case SyncSessionStopPolicy::LiveIndefinitely:
✔
1149
                    // Don't do anything; session lives forever.
1150
                    m_state_mutex.unlock(lock);
×
1151
                    break;
×
1152
                case SyncSessionStopPolicy::AfterChangesUploaded:
84✔
1153
                    // Wait for all pending changes to upload.
1154
                    become_dying(std::move(lock));
84✔
1155
                    break;
84✔
1156
            }
1,243✔
1157
            break;
1,243✔
1158
        }
1,243✔
1159
        case State::Dying:
1,243✔
1160
            m_state_mutex.unlock(lock);
18✔
1161
            break;
18✔
1162
        case State::Paused:
6✔
1163
        case State::Inactive: {
554✔
1164
            // We need to unregister from the sync manager if it still exists so that we don't end up
1165
            // holding the DBRef open after the session is closed. Otherwise we can end up preventing
1166
            // the user from deleting the realm when it's in the paused/inactive state.
1167
            if (m_sync_manager) {
554✔
1168
                m_sync_manager->unregister_session(m_db->get_path());
550✔
1169
            }
550✔
1170
            m_state_mutex.unlock(lock);
554✔
1171
            break;
554✔
1172
        }
6✔
1173
        case State::WaitingForAccessToken:
6✔
1174
            // Immediately kill the session.
1175
            become_inactive(std::move(lock));
6✔
1176
            break;
6✔
1177
    }
1,821✔
1178
}
1,821✔
1179

1180
void SyncSession::shutdown_and_wait()
1181
{
161✔
1182
    {
161✔
1183
        // Transition immediately to `inactive` state. Calling this function must guarantee that any
1184
        // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
1185
        // must have been destroyed upon return. This allows the caller to follow up with a call to
1186
        // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
1187
        // Realm file to be closed. This works so long as this SyncSession object remains in the
1188
        // `inactive` state after the invocation of shutdown_and_wait().
1189
        util::CheckedUniqueLock lock(m_state_mutex);
161✔
1190
        if (m_state != State::Inactive && m_state != State::Paused) {
161✔
1191
            become_inactive(std::move(lock));
105✔
1192
        }
105✔
1193
    }
161✔
1194
    m_client.wait_for_session_terminations();
161✔
1195
}
161✔
1196

1197
void SyncSession::update_access_token(std::string_view signed_token)
1198
{
31✔
1199
    util::CheckedUniqueLock lock(m_state_mutex);
31✔
1200
    switch (m_state) {
31✔
1201
        case State::Active:
7✔
1202
            m_session->refresh(signed_token);
7✔
1203
            break;
7✔
1204
        case State::WaitingForAccessToken:
8✔
1205
            become_active();
8✔
1206
            break;
8✔
1207
        case State::Paused:
✔
1208
            // token will be pulled from user when the session is unpaused
1209
            return;
×
1210
        case State::Dying:
✔
1211
        case State::Inactive:
16✔
1212
            do_revive(std::move(lock));
16✔
1213
            break;
16✔
1214
    }
31✔
1215
}
31✔
1216

1217
void SyncSession::initiate_access_token_refresh()
1218
{
20✔
1219
    if (auto session_user = user()) {
20✔
1220
        session_user->request_access_token(handle_refresh(shared_from_this(), false));
20✔
1221
    }
20✔
1222
}
20✔
1223

1224
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback, ProgressDirection direction)
1225
{
2,936✔
1226
    bool is_download = (direction == ProgressDirection::download);
2,936✔
1227

1228
    m_completion_request_counter++;
2,936✔
1229
    m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
2,936✔
1230
                                        std::make_pair(direction, std::move(callback)));
2,936✔
1231
    // If the state is inactive then just store the callback and return. The callback will get
1232
    // re-registered with the underlying session if/when the session ever becomes active again.
1233
    if (!m_session) {
2,936✔
1234
        return;
339✔
1235
    }
339✔
1236

1237
    auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
2,597✔
1238
                              : &sync::Session::async_wait_for_upload_completion;
2,597✔
1239

1240
    (m_session.get()->*waiter)([weak_self = weak_from_this(), id = m_completion_request_counter](Status status) {
2,597✔
1241
        auto self = weak_self.lock();
2,597✔
1242
        if (!self)
2,597✔
1243
            return;
58✔
1244
        util::CheckedUniqueLock lock(self->m_state_mutex);
2,539✔
1245
        auto callback_node = self->m_completion_callbacks.extract(id);
2,539✔
1246
        lock.unlock();
2,539✔
1247
        if (callback_node) {
2,539✔
1248
            callback_node.mapped().second(std::move(status));
2,316✔
1249
        }
2,316✔
1250
    });
2,539✔
1251
}
2,597✔
1252

1253
void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(Status)>&& callback)
1254
{
1,046✔
1255
    util::CheckedUniqueLock lock(m_state_mutex);
1,046✔
1256
    add_completion_callback(std::move(callback), ProgressDirection::upload);
1,046✔
1257
}
1,046✔
1258

1259
void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)>&& callback)
1260
{
1,384✔
1261
    util::CheckedUniqueLock lock(m_state_mutex);
1,384✔
1262
    add_completion_callback(std::move(callback), ProgressDirection::download);
1,384✔
1263
}
1,384✔
1264

1265
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
1266
                                                 ProgressDirection direction, bool is_streaming)
1267
{
48✔
1268
    int64_t pending_query_version = 0;
48✔
1269
    if (auto sub_store = get_flx_subscription_store()) {
48✔
1270
        pending_query_version = sub_store->get_version_info().latest;
28✔
1271
    }
28✔
1272
    return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming, pending_query_version);
48✔
1273
}
48✔
1274

1275
void SyncSession::unregister_progress_notifier(uint64_t token)
1276
{
16✔
1277
    m_progress_notifier.unregister_callback(token);
16✔
1278
}
16✔
1279

1280
uint64_t SyncSession::register_connection_change_callback(std::function<ConnectionStateChangeCallback>&& callback)
1281
{
10✔
1282
    return m_connection_change_notifier.add_callback(std::move(callback));
10✔
1283
}
10✔
1284

1285
void SyncSession::unregister_connection_change_callback(uint64_t token)
1286
{
4✔
1287
    m_connection_change_notifier.remove_callback(token);
4✔
1288
}
4✔
1289

1290
SyncSession::~SyncSession() {}
1,721✔
1291

1292
SyncSession::State SyncSession::state() const
1293
{
193,326✔
1294
    util::CheckedUniqueLock lock(m_state_mutex);
193,326✔
1295
    return m_state;
193,326✔
1296
}
193,326✔
1297

1298
SyncSession::ConnectionState SyncSession::connection_state() const
1299
{
6,078✔
1300
    util::CheckedUniqueLock lock(m_connection_state_mutex);
6,078✔
1301
    return m_connection_state;
6,078✔
1302
}
6,078✔
1303

1304
std::string const& SyncSession::path() const
1305
{
2,399✔
1306
    return m_db->get_path();
2,399✔
1307
}
2,399✔
1308

1309
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store()
1310
{
1,251✔
1311
    util::CheckedLockGuard lock(m_state_mutex);
1,251✔
1312
    return m_flx_subscription_store;
1,251✔
1313
}
1,251✔
1314

1315
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
1316
{
2✔
1317
    util::CheckedLockGuard lock(m_state_mutex);
2✔
1318
    return m_subscription_store_base;
2✔
1319
}
2✔
1320

1321
sync::SaltedFileIdent SyncSession::get_file_ident() const
1322
{
184✔
1323
    auto repl = m_db->get_replication();
184✔
1324
    REALM_ASSERT(repl);
184✔
1325
    REALM_ASSERT(dynamic_cast<sync::ClientReplication*>(repl));
184✔
1326

1327
    sync::SaltedFileIdent ret;
184✔
1328
    sync::version_type unused_version;
184✔
1329
    sync::SyncProgress unused_progress;
184✔
1330
    static_cast<sync::ClientReplication*>(repl)->get_history().get_status(unused_version, ret, unused_progress);
184✔
1331
    return ret;
184✔
1332
}
184✔
1333

1334
std::string SyncSession::get_appservices_connection_id() const
1335
{
22✔
1336
    util::CheckedLockGuard lk(m_state_mutex);
22✔
1337
    if (!m_session) {
22✔
1338
        return {};
×
1339
    }
×
1340
    return m_session->get_appservices_connection_id();
22✔
1341
}
22✔
1342

1343
void SyncSession::update_configuration(SyncConfig new_config)
1344
{
8✔
1345
    while (true) {
18✔
1346
        util::CheckedUniqueLock state_lock(m_state_mutex);
18✔
1347
        if (m_state != State::Inactive && m_state != State::Paused) {
18✔
1348
            // Changing the state releases the lock, which means that by the
1349
            // time we reacquire the lock the state may have changed again
1350
            // (either due to one of the callbacks being invoked or another
1351
            // thread coincidentally doing something). We just attempt to keep
1352
            // switching it to inactive until it stays there.
1353
            become_inactive(std::move(state_lock));
10✔
1354
            continue;
10✔
1355
        }
10✔
1356

1357
        util::CheckedUniqueLock config_lock(m_config_mutex);
8✔
1358
        REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
8✔
1359
        REALM_ASSERT(!m_session);
8✔
1360
        REALM_ASSERT(m_config.sync_config->user == new_config.user);
8✔
1361
        // Since this is used for testing purposes only, just update the current sync_config
1362
        m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
8✔
1363
        break;
8✔
1364
    }
18✔
1365
    revive_if_needed();
8✔
1366
}
8✔
1367

1368
void SyncSession::apply_sync_config_after_migration_or_rollback()
1369
{
26✔
1370
    // Migration state changed - Update the configuration to
1371
    // match the new sync mode.
1372
    util::CheckedLockGuard cfg_lock(m_config_mutex);
26✔
1373
    if (!m_migrated_sync_config)
26✔
1374
        return;
2✔
1375

1376
    m_config.sync_config = m_migrated_sync_config;
24✔
1377
    m_migrated_sync_config.reset();
24✔
1378
}
24✔
1379

1380
void SyncSession::save_sync_config_after_migration_or_rollback()
1381
{
28✔
1382
    util::CheckedLockGuard cfg_lock(m_config_mutex);
28✔
1383
    m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
28✔
1384
}
28✔
1385

1386
void SyncSession::update_subscription_store(bool flx_sync_requested, std::optional<sync::SubscriptionSet> new_subs)
1387
{
52✔
1388
    util::CheckedUniqueLock lock(m_state_mutex);
52✔
1389

1390
    // The session should be closed before updating the FLX subscription store
1391
    REALM_ASSERT(!m_session);
52✔
1392

1393
    // If the subscription store exists and switching to PBS, then clear the store
1394
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
52✔
1395
    if (!flx_sync_requested) {
52✔
1396
        if (m_flx_subscription_store) {
8✔
1397
            // Empty the subscription store and cancel any pending subscription notification
1398
            // waiters
1399
            auto subscription_store = std::move(m_flx_subscription_store);
8✔
1400
            lock.unlock();
8✔
1401
            auto tr = m_db->start_write();
8✔
1402
            subscription_store->reset(*tr);
8✔
1403
            history.set_write_validator_factory(nullptr);
8✔
1404
            tr->commit();
8✔
1405
        }
8✔
1406
        return;
8✔
1407
    }
8✔
1408

1409
    if (m_flx_subscription_store)
44✔
1410
        return; // Using FLX and subscription store already exists
2✔
1411

1412
    // Going from PBS -> FLX (or one doesn't exist yet), create a new subscription store
1413
    create_subscription_store();
42✔
1414

1415
    std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
42✔
1416

1417
    // If migrated to FLX, create subscriptions in the local realm to cover the existing data.
1418
    // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
1419
    if (new_subs) {
42✔
1420
        auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
16✔
1421
        active_mut_sub.import(std::move(*new_subs));
16✔
1422
        active_mut_sub.set_state(sync::SubscriptionSet::State::Complete);
16✔
1423
        active_mut_sub.commit();
16✔
1424
    }
16✔
1425

1426
    auto tr = m_db->start_write();
42✔
1427
    set_write_validator_factory(weak_sub_mgr);
42✔
1428
    tr->rollback();
42✔
1429
}
42✔
1430

1431
void SyncSession::create_subscription_store()
1432
{
731✔
1433
    REALM_ASSERT(!m_flx_subscription_store);
731✔
1434

1435
    // Create the main subscription store instance when this is first called - this will
1436
    // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
1437
    // will be reset when rolling back to PBS after a client FLX migration
1438
    if (!m_subscription_store_base) {
731✔
1439
        m_subscription_store_base = sync::SubscriptionStore::create(m_db);
729✔
1440
    }
729✔
1441

1442
    // m_subscription_store_base is always around for the life of SyncSession, but the
1443
    // m_flx_subscription_store is set when using FLX.
1444
    m_flx_subscription_store = m_subscription_store_base;
731✔
1445
}
731✔
1446

1447
void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
1448
{
731✔
1449
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
731✔
1450
    history.set_write_validator_factory(
731✔
1451
        [weak_sub_mgr](Transaction& tr) -> util::UniqueFunction<sync::SyncReplication::WriteValidator> {
7,707✔
1452
            auto sub_mgr = weak_sub_mgr.lock();
7,707✔
1453
            REALM_ASSERT_RELEASE(sub_mgr);
7,707✔
1454
            auto latest_sub_tables = sub_mgr->get_tables_for_latest(tr);
7,707✔
1455
            return [tables = std::move(latest_sub_tables)](const Table& table) {
7,707✔
1456
                if (table.get_table_type() != Table::Type::TopLevel) {
2,067✔
1457
                    return;
430✔
1458
                }
430✔
1459
                auto object_class_name = Group::table_name_to_class_name(table.get_name());
1,637✔
1460
                if (tables.find(object_class_name) == tables.end()) {
1,637✔
1461
                    throw NoSubscriptionForWrite(
2✔
1462
                        util::format("Cannot write to class %1 when no flexible sync subscription has been created.",
2✔
1463
                                     object_class_name));
2✔
1464
                }
2✔
1465
            };
1,637✔
1466
        });
7,707✔
1467
}
731✔
1468

1469
// Represents a reference to the SyncSession from outside of the sync subsystem.
1470
// We attempt to keep the SyncSession in an active state as long as it has an external reference.
1471
class SyncSession::ExternalReference {
1472
public:
1473
    ExternalReference(std::shared_ptr<SyncSession> session)
1474
        : m_session(std::move(session))
775✔
1475
    {
1,721✔
1476
    }
1,721✔
1477

1478
    ~ExternalReference()
1479
    {
1,721✔
1480
        m_session->did_drop_external_reference();
1,721✔
1481
    }
1,721✔
1482

1483
private:
1484
    std::shared_ptr<SyncSession> m_session;
1485
};
1486

1487
std::shared_ptr<SyncSession> SyncSession::external_reference()
1488
{
1,962✔
1489
    util::CheckedLockGuard lock(m_external_reference_mutex);
1,962✔
1490

1491
    if (auto external_reference = m_external_reference.lock())
1,962✔
1492
        return std::shared_ptr<SyncSession>(external_reference, this);
241✔
1493

1494
    auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
1,721✔
1495
    m_external_reference = external_reference;
1,721✔
1496
    return std::shared_ptr<SyncSession>(external_reference, this);
1,721✔
1497
}
1,962✔
1498

1499
std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
1500
{
2,819✔
1501
    util::CheckedLockGuard lock(m_external_reference_mutex);
2,819✔
1502

1503
    if (auto external_reference = m_external_reference.lock())
2,819✔
1504
        return std::shared_ptr<SyncSession>(external_reference, this);
1,176✔
1505

1506
    return nullptr;
1,643✔
1507
}
2,819✔
1508

1509
void SyncSession::did_drop_external_reference()
1510
{
1,721✔
1511
    util::CheckedUniqueLock lock1(m_state_mutex);
1,721✔
1512
    {
1,721✔
1513
        util::CheckedLockGuard lock2(m_external_reference_mutex);
1,721✔
1514

1515
        // If the session is being resurrected we should not close the session.
1516
        if (!m_external_reference.expired())
1,721✔
1517
            return;
×
1518
    }
1,721✔
1519

1520
    close(std::move(lock1));
1,721✔
1521
}
1,721✔
1522

1523
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1524
                                                 NotifierType direction, bool is_streaming,
1525
                                                 int64_t pending_query_version)
1526
{
106✔
1527
    util::UniqueFunction<void()> invocation;
106✔
1528
    uint64_t token_value = 0;
106✔
1529
    {
106✔
1530
        std::lock_guard<std::mutex> lock(m_mutex);
106✔
1531
        token_value = m_progress_notifier_token++;
106✔
1532
        NotifierPackage package{std::move(notifier), m_local_transaction_version, is_streaming,
106✔
1533
                                direction == NotifierType::download, pending_query_version};
106✔
1534
        if (!m_current_progress) {
106✔
1535
            // Simply register the package, since we have no data yet.
1536
            m_packages.emplace(token_value, std::move(package));
42✔
1537
            return token_value;
42✔
1538
        }
42✔
1539
        bool skip_registration = false;
64✔
1540
        invocation = package.create_invocation(*m_current_progress, skip_registration);
64✔
1541
        if (skip_registration) {
64✔
1542
            token_value = 0;
22✔
1543
        }
22✔
1544
        else {
42✔
1545
            m_packages.emplace(token_value, std::move(package));
42✔
1546
        }
42✔
1547
    }
64✔
1548
    invocation();
×
1549
    return token_value;
64✔
1550
}
106✔
1551

1552
void SyncProgressNotifier::unregister_callback(uint64_t token)
1553
{
20✔
1554
    std::lock_guard<std::mutex> lock(m_mutex);
20✔
1555
    m_packages.erase(token);
20✔
1556
}
20✔
1557

1558
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1559
                                  uint64_t snapshot_version, double download_estimate, double upload_estimate,
1560
                                  int64_t query_version)
1561
{
4,598✔
1562
    std::vector<util::UniqueFunction<void()>> invocations;
4,598✔
1563
    {
4,598✔
1564
        std::lock_guard<std::mutex> lock(m_mutex);
4,598✔
1565
        m_current_progress = Progress{uploadable,      downloadable,      uploaded,         downloaded,
4,598✔
1566
                                      upload_estimate, download_estimate, snapshot_version, query_version};
4,598✔
1567

1568
        for (auto it = m_packages.begin(); it != m_packages.end();) {
4,938✔
1569
            bool should_delete = false;
340✔
1570
            invocations.emplace_back(it->second.create_invocation(*m_current_progress, should_delete));
340✔
1571
            it = should_delete ? m_packages.erase(it) : std::next(it);
340✔
1572
        }
340✔
1573
    }
4,598✔
1574
    // Run the notifiers only after we've released the lock.
1575
    for (auto& invocation : invocations)
4,598✔
1576
        invocation();
340✔
1577
}
4,598✔
1578

1579
void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
1580
{
10,530✔
1581
    std::lock_guard<std::mutex> lock(m_mutex);
10,530✔
1582
    m_local_transaction_version = snapshot_version;
10,530✔
1583
}
10,530✔
1584

1585
util::UniqueFunction<void()>
1586
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1587
{
404✔
1588
    uint64_t transfered = is_download ? current_progress.downloaded : current_progress.uploaded;
404✔
1589
    uint64_t transferable = is_download ? current_progress.downloadable : current_progress.uploadable;
404✔
1590
    double estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;
404✔
1591

1592
    if (!is_streaming) {
404✔
1593
        // If the sync client has not yet processed all of the local
1594
        // transactions then the uploadable data is incorrect and we should
1595
        // not invoke the callback
1596
        if (!is_download && snapshot_version > current_progress.snapshot_version)
167✔
1597
            return [] {};
4✔
1598

1599
        // If this is a non-streaming download progress update and this notifier was
1600
        // created for a later query version (e.g. we're currently downloading
1601
        // subscription set version zero, but subscription set version 1 existed
1602
        // when the notifier was registered), then we want to skip this callback.
1603
        if (is_download && current_progress.query_version < pending_query_version) {
163✔
1604
            return [] {};
8✔
1605
        }
8✔
1606

1607
        // The initial download size we get from the server is the uncompacted
1608
        // size, and so the download may complete before we actually receive
1609
        // that much data. When that happens, transferrable will drop and we
1610
        // need to use the new value instead of the captured one.
1611
        if (!captured_transferable || *captured_transferable > transferable)
155✔
1612
            captured_transferable = transferable;
58✔
1613
        transferable = *captured_transferable;
155✔
1614

1615
        // Since we can adjust the transferrable downwards the estimate for uploads
1616
        // won't be correct since the sync client's view of the estimate is based on
1617
        // the total number of uploadable bytes available rather than the number of
1618
        // bytes this NotifierPackage was waiting to upload.
1619
        if (!is_download) {
155✔
1620
            estimate = transferable > 0 ? std::min(transfered / double(transferable), 1.0) : 0.0;
89✔
1621
        }
89✔
1622
    }
155✔
1623

1624
    // A notifier is expired if at least as many bytes have been transferred
1625
    // as were originally considered transferrable.
1626
    is_expired =
392✔
1627
        !is_streaming && (transfered >= transferable && (!is_download || !pending_query_version || estimate >= 1.0));
392✔
1628
    return [=, notifier = notifier] {
392✔
1629
        notifier(transfered, transferable, estimate);
392✔
1630
    };
392✔
1631
}
404✔
1632

1633
uint64_t SyncSession::ConnectionChangeNotifier::add_callback(std::function<ConnectionStateChangeCallback> callback)
1634
{
10✔
1635
    std::lock_guard<std::mutex> lock(m_callback_mutex);
10✔
1636
    auto token = m_next_token++;
10✔
1637
    m_callbacks.push_back({std::move(callback), token});
10✔
1638
    return token;
10✔
1639
}
10✔
1640

1641
void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
1642
{
4✔
1643
    Callback old;
4✔
1644
    {
4✔
1645
        std::lock_guard<std::mutex> lock(m_callback_mutex);
4✔
1646
        auto it = std::find_if(begin(m_callbacks), end(m_callbacks), [=](const auto& c) {
4✔
1647
            return c.token == token;
4✔
1648
        });
4✔
1649
        if (it == end(m_callbacks)) {
4✔
1650
            return;
×
1651
        }
×
1652

1653
        size_t idx = distance(begin(m_callbacks), it);
4✔
1654
        if (m_callback_index != npos) {
4✔
1655
            if (m_callback_index >= idx)
2✔
1656
                --m_callback_index;
2✔
1657
        }
2✔
1658
        --m_callback_count;
4✔
1659

1660
        old = std::move(*it);
4✔
1661
        m_callbacks.erase(it);
4✔
1662
    }
4✔
1663
}
4✔
1664

1665
void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
1666
{
6,530✔
1667
    std::unique_lock lock(m_callback_mutex);
6,530✔
1668
    m_callback_count = m_callbacks.size();
6,530✔
1669
    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
6,538✔
1670
        // acquire a local reference to the callback so that removing the
1671
        // callback from within it can't result in a dangling pointer
1672
        auto cb = m_callbacks[m_callback_index].fn;
8✔
1673
        lock.unlock();
8✔
1674
        cb(old_state, new_state);
8✔
1675
        lock.lock();
8✔
1676
    }
8✔
1677
    m_callback_index = npos;
6,530✔
1678
}
6,530✔
1679

1680
util::Future<std::string> SyncSession::send_test_command(std::string body)
1681
{
36✔
1682
    util::CheckedLockGuard lk(m_state_mutex);
36✔
1683
    if (!m_session) {
36✔
1684
        return Status{ErrorCodes::RuntimeError, "Session doesn't exist to send test command on"};
×
1685
    }
×
1686

1687
    return m_session->send_test_command(std::move(body));
36✔
1688
}
36✔
1689

1690
void SyncSession::migrate_schema(util::UniqueFunction<void(Status)>&& callback)
1691
{
28✔
1692
    util::CheckedUniqueLock lock(m_state_mutex);
28✔
1693
    // If the schema migration is already in progress, just wait to complete.
1694
    if (m_schema_migration_in_progress) {
28✔
1695
        add_completion_callback(std::move(callback), ProgressDirection::download);
2✔
1696
        return;
2✔
1697
    }
2✔
1698
    m_schema_migration_in_progress = true;
26✔
1699

1700
    // Perform the migration:
1701
    //  1. Pause the sync session
1702
    //  2. Once the sync client releases the realm file:
1703
    //      a. Delete all tables (private and public)
1704
    //      b. Reset the subscription store
1705
    //      d. Empty the sync history and adjust cursors
1706
    //      e. Reset file ident (the server flags the old ident as in the case of a client reset)
1707
    // 3. Resume the session (the client asks for a new file ident)
1708
    // See `sync_schema_migration::perform_schema_migration` for more details.
1709

1710
    CompletionCallbacks callbacks;
26✔
1711
    std::swap(m_completion_callbacks, callbacks);
26✔
1712
    auto guard = util::make_scope_exit([&]() noexcept {
26✔
1713
        util::CheckedUniqueLock lock(m_state_mutex);
26✔
1714
        if (m_completion_callbacks.empty())
26✔
1715
            std::swap(callbacks, m_completion_callbacks);
26✔
1716
        else
×
1717
            m_completion_callbacks.merge(std::move(callbacks));
×
1718
    });
26✔
1719
    m_state_mutex.unlock(lock);
26✔
1720

1721
    auto future = pause_async();
26✔
1722
    std::move(future).get_async(
26✔
1723
        [callback = std::move(callback), weak_session = weak_from_this()](Status status) mutable {
26✔
1724
            if (!status.is_ok())
26✔
1725
                return callback(status);
×
1726

1727
            auto session = weak_session.lock();
26✔
1728
            if (!session) {
26✔
1729
                status = Status(ErrorCodes::InvalidSession, "Sync session was destroyed during schema migration");
×
1730
                return callback(status);
×
1731
            }
×
1732
            sync_schema_migration::perform_schema_migration(*session->m_db);
26✔
1733
            {
26✔
1734
                util::CheckedUniqueLock lock(session->m_state_mutex);
26✔
1735
                session->m_previous_schema_version.reset();
26✔
1736
                session->m_schema_migration_in_progress = false;
26✔
1737
                session->m_subscription_store_base.reset();
26✔
1738
                session->m_flx_subscription_store.reset();
26✔
1739
            }
26✔
1740
            session->update_subscription_store(true, {});
26✔
1741
            session->wait_for_download_completion(std::move(callback));
26✔
1742
            session->resume();
26✔
1743
        });
26✔
1744
}
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc