• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jorgen.edelbo_138

13 Mar 2024 08:41AM UTC coverage: 91.77% (-0.3%) from 92.078%
jorgen.edelbo_138

Pull #7356

Evergreen

jedelbo
Add ability to get path to modified collections in object notifications
Pull Request #7356: Add ability to get path to modified collections in object notifications

94532 of 174642 branches covered (54.13%)

118 of 163 new or added lines in 16 files covered. (72.39%)

765 existing lines in 41 files now uncovered.

242808 of 264584 relevant lines covered (91.77%)

5878961.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.65
/src/realm/object-store/sync/sync_session.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/object-store/thread_safe_reference.hpp>
22
#include <realm/object-store/impl/realm_coordinator.hpp>
23
#include <realm/object-store/sync/app.hpp>
24
#include <realm/object-store/sync/impl/sync_client.hpp>
25
#include <realm/object-store/sync/impl/sync_file.hpp>
26
#include <realm/object-store/sync/impl/sync_metadata.hpp>
27
#include <realm/object-store/sync/sync_manager.hpp>
28
#include <realm/object-store/sync/sync_user.hpp>
29
#include <realm/object-store/util/scheduler.hpp>
30

31
#include <realm/db_options.hpp>
32
#include <realm/sync/client.hpp>
33
#include <realm/sync/config.hpp>
34
#include <realm/sync/network/http.hpp>
35
#include <realm/sync/network/websocket_error.hpp>
36
#include <realm/sync/noinst/client_history_impl.hpp>
37
#include <realm/sync/noinst/client_reset_operation.hpp>
38
#include <realm/sync/noinst/migration_store.hpp>
39
#include <realm/sync/noinst/sync_schema_migration.hpp>
40
#include <realm/sync/protocol.hpp>
41

42
using namespace realm;
43
using namespace realm::_impl;
44

45
using SessionWaiterPointer = void (sync::Session::*)(util::UniqueFunction<void(std::error_code)>);
46

47
constexpr const char SyncError::c_original_file_path_key[];
48
constexpr const char SyncError::c_recovery_file_path_key[];
49

50
/// STATES:
51
///
52
/// WAITING_FOR_ACCESS_TOKEN: a request has been initiated to ask
53
/// for an updated access token and the session is waiting for a response.
54
/// From: INACTIVE, DYING
55
/// To:
56
///    * ACTIVE: when the SDK successfully refreshes the token
57
///    * INACTIVE: if asked to log out, or if asked to close
58
///
59
/// ACTIVE: the session is connected to the Sync Server and is actively
60
/// transferring data.
61
/// From: INACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
62
/// To:
63
///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
64
///                is Immediate.
65
///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
66
///
67
/// DYING: the session is performing clean-up work in preparation to be destroyed.
68
/// From: ACTIVE
69
/// To:
70
///    * INACTIVE: when the clean-up work completes, if the session wasn't
71
///                revived, or if explicitly asked to log out before the
72
///                clean-up work begins
73
///    * ACTIVE: if the session is revived
74
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
75
///                                but the token is invalid or expired.
76
///
77
/// INACTIVE: the user owning this session has logged out, the `sync::Session`
78
/// owned by this session is destroyed, and the session is quiescent.
79
/// Note that a session briefly enters this state before being destroyed, but
80
/// it can also enter this state and stay there if the user has been logged out.
81
/// From: initial, ACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
82
/// To:
83
///    * ACTIVE: if the session is revived
84
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
85
///                                but the token is invalid or expired.
86

87
void SyncSession::become_active()
88
{
1,889✔
89
    REALM_ASSERT(m_state != State::Active);
1,889✔
90
    m_state = State::Active;
1,889✔
91

857✔
92
    // First time the session becomes active, register a notification on the sentinel subscription set to restart the
857✔
93
    // session and update to native FLX.
857✔
94
    if (m_migration_sentinel_query_version) {
1,889✔
95
        m_flx_subscription_store->get_by_version(*m_migration_sentinel_query_version)
2✔
96
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
97
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
2✔
98
                if (!s.is_ok()) {
2✔
99
                    return;
×
100
                }
×
101
                REALM_ASSERT(s.get_value() == sync::SubscriptionSet::State::Complete);
2✔
102
                if (auto strong_self = weak_self.lock()) {
2✔
103
                    strong_self->m_migration_store->cancel_migration();
2✔
104
                    strong_self->restart_session();
2✔
105
                }
2✔
106
            });
2✔
107
        m_migration_sentinel_query_version.reset();
2✔
108
    }
2✔
109

857✔
110
    // when entering from the Dying state the session will still be bound
857✔
111
    if (!m_session) {
1,889✔
112
        create_sync_session();
1,887✔
113
        m_session->bind();
1,887✔
114
    }
1,887✔
115

857✔
116
    // Register all the pending wait-for-completion blocks. This can
857✔
117
    // potentially add a redundant callback if we're coming from the Dying
857✔
118
    // state, but that's okay (we won't call the user callbacks twice).
857✔
119
    SyncSession::CompletionCallbacks callbacks_to_register;
1,889✔
120
    std::swap(m_completion_callbacks, callbacks_to_register);
1,889✔
121

857✔
122
    for (auto& [id, callback_tuple] : callbacks_to_register) {
1,069✔
123
        add_completion_callback(std::move(callback_tuple.second), callback_tuple.first);
409✔
124
    }
409✔
125
}
1,889✔
126

127
void SyncSession::restart_session()
128
{
10✔
129
    util::CheckedUniqueLock lock(m_state_mutex);
10✔
130
    do_restart_session(std::move(lock));
10✔
131
}
10✔
132

133
void SyncSession::become_dying(util::CheckedUniqueLock lock)
134
{
80✔
135
    REALM_ASSERT(m_state != State::Dying);
80✔
136
    m_state = State::Dying;
80✔
137

32✔
138
    // If we have no session, we cannot possibly upload anything.
32✔
139
    if (!m_session) {
80✔
140
        become_inactive(std::move(lock));
×
141
        return;
×
142
    }
×
143

32✔
144
    size_t current_death_count = ++m_death_count;
80✔
145
    m_session->async_wait_for_upload_completion([weak_session = weak_from_this(), current_death_count](Status) {
80✔
146
        if (auto session = weak_session.lock()) {
80✔
147
            util::CheckedUniqueLock lock(session->m_state_mutex);
78✔
148
            if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
78✔
149
                session->become_inactive(std::move(lock));
32✔
150
            }
32✔
151
        }
78✔
152
    });
80✔
153
    m_state_mutex.unlock(lock);
80✔
154
}
80✔
155

156
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status, bool cancel_subscription_notifications)
157
{
1,705✔
158
    REALM_ASSERT(m_state != State::Inactive);
1,705✔
159
    m_state = State::Inactive;
1,705✔
160

765✔
161
    do_become_inactive(std::move(lock), status, cancel_subscription_notifications);
1,705✔
162
}
1,705✔
163

164
void SyncSession::become_paused(util::CheckedUniqueLock lock)
165
{
188✔
166
    REALM_ASSERT(m_state != State::Paused);
188✔
167
    auto old_state = m_state;
188✔
168
    m_state = State::Paused;
188✔
169

94✔
170
    // Nothing to do if we're already inactive besides update the state.
94✔
171
    if (old_state == State::Inactive) {
188✔
172
        m_state_mutex.unlock(lock);
2✔
173
        return;
2✔
174
    }
2✔
175

93✔
176
    do_become_inactive(std::move(lock), Status::OK(), true);
186✔
177
}
186✔
178

179
void SyncSession::do_restart_session(util::CheckedUniqueLock)
180
{
10✔
181
    // Nothing to do if the sync session is currently paused
5✔
182
    // It will be resumed when resume() is called
5✔
183
    if (m_state == State::Paused)
10✔
184
        return;
×
185

5✔
186
    // Go straight to inactive so the progress completion waiters will
5✔
187
    // continue to wait until the session restarts and completes the
5✔
188
    // upload/download sync
5✔
189
    m_state = State::Inactive;
10✔
190

5✔
191
    if (m_session) {
10✔
192
        m_session.reset();
10✔
193
    }
10✔
194

5✔
195
    // Create a new session and re-register the completion callbacks
5✔
196
    // The latest server path will be retrieved from sync_manager when
5✔
197
    // the new session is created by create_sync_session() in become
5✔
198
    // active.
5✔
199
    become_active();
10✔
200
}
10✔
201

202
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status,
203
                                     bool cancel_subscription_notifications)
204
{
1,891✔
205
    // Manually set the disconnected state. Sync would also do this, but
858✔
206
    // since the underlying SyncSession object already have been destroyed,
858✔
207
    // we are not able to get the callback.
858✔
208
    util::CheckedUniqueLock connection_state_lock(m_connection_state_mutex);
1,891✔
209
    auto old_state = m_connection_state;
1,891✔
210
    auto new_state = m_connection_state = SyncSession::ConnectionState::Disconnected;
1,891✔
211
    connection_state_lock.unlock();
1,891✔
212

858✔
213
    SyncSession::CompletionCallbacks waits;
1,891✔
214
    std::swap(waits, m_completion_callbacks);
1,891✔
215

858✔
216
    m_session = nullptr;
1,891✔
217
    if (m_sync_manager) {
1,891✔
218
        m_sync_manager->unregister_session(m_db->get_path());
1,891✔
219
    }
1,891✔
220

858✔
221
    auto subscription_store = m_flx_subscription_store;
1,891✔
222
    m_state_mutex.unlock(lock);
1,891✔
223

858✔
224
    // Send notifications after releasing the lock to prevent deadlocks in the callback.
858✔
225
    if (old_state != new_state) {
1,891✔
226
        m_connection_change_notifier.invoke_callbacks(old_state, connection_state());
1,477✔
227
    }
1,477✔
228

858✔
229
    if (status.is_ok())
1,891✔
230
        status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");
1,766✔
231

858✔
232
    if (subscription_store && cancel_subscription_notifications) {
1,891✔
233
        subscription_store->notify_all_state_change_notifications(status);
616✔
234
    }
616✔
235

858✔
236
    // Inform any queued-up completion handlers that they were cancelled.
858✔
237
    for (auto& [id, callback] : waits)
1,891✔
238
        callback.second(status);
82✔
239
}
1,891✔
240

241
void SyncSession::become_waiting_for_access_token()
242
{
32✔
243
    REALM_ASSERT(m_state != State::WaitingForAccessToken);
32✔
244
    m_state = State::WaitingForAccessToken;
32✔
245
}
32✔
246

247
void SyncSession::handle_location_update_failed(Status status)
248
{
4✔
249
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
2✔
250
    {
4✔
251
        util::CheckedUniqueLock lock(m_state_mutex);
4✔
252
        // Should only be called while waiting to update the access token
2✔
253
        REALM_ASSERT(m_state == State::WaitingForAccessToken);
4✔
254
        // Close the session, since there's nothing more to do at this point, it can be resumed
2✔
255
        // after resolving the location update failure
2✔
256
        become_inactive(std::move(lock), status);
4✔
257
    }
4✔
258
    if (auto error_handler = config(&SyncConfig::error_handler)) {
4✔
259
        auto user_facing_error = SyncError({ErrorCodes::SyncConnectFailed, status.reason()}, true);
4✔
260
        error_handler(shared_from_this(), std::move(user_facing_error));
4✔
261
    }
4✔
262
}
4✔
263

264
void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
265
{
16✔
266
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
8✔
267
    {
16✔
268
        util::CheckedUniqueLock lock(m_state_mutex);
16✔
269
        cancel_pending_waits(std::move(lock), status);
16✔
270
    }
16✔
271
    if (user) {
16✔
272
        user->log_out();
16✔
273
    }
16✔
274

8✔
275
    if (auto error_handler = config(&SyncConfig::error_handler)) {
16✔
276
        auto user_facing_error = SyncError({ErrorCodes::AuthError, status.reason()}, true);
16✔
277
        error_handler(shared_from_this(), std::move(user_facing_error));
16✔
278
    }
16✔
279
}
16✔
280

281
static bool check_for_auth_failure(const app::AppError& error)
282
{
24✔
283
    using namespace realm::sync;
24✔
284
    // Auth failure is returned as a 401 (unauthorized) or 403 (forbidden) response
12✔
285
    if (error.additional_status_code) {
24✔
286
        auto status_code = HTTPStatus(*error.additional_status_code);
24✔
287
        if (status_code == HTTPStatus::Unauthorized || status_code == HTTPStatus::Forbidden)
24✔
288
            return true;
14✔
289
    }
10✔
290

5✔
291
    return false;
10✔
292
}
10✔
293

294
static bool check_for_redirect_response(const app::AppError& error)
295
{
10✔
296
    using namespace realm::sync;
10✔
297
    // Check for unhandled 301/308 permanent redirect response
5✔
298
    if (error.additional_status_code) {
10✔
299
        auto status_code = HTTPStatus(*error.additional_status_code);
10✔
300
        if (status_code == HTTPStatus::MovedPermanently || status_code == HTTPStatus::PermanentRedirect)
10✔
301
            return true;
×
302
    }
10✔
303

5✔
304
    return false;
10✔
305
}
10✔
306

307
util::UniqueFunction<void(util::Optional<app::AppError>)>
308
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
309
{
56✔
310
    return [session, restart_session](util::Optional<app::AppError> error) {
56✔
311
        auto session_user = session->user();
56✔
312
        if (!session_user) {
56✔
313
            util::CheckedUniqueLock lock(session->m_state_mutex);
×
314
            auto refresh_error = error ? error->to_status() : Status::OK();
×
315
            session->cancel_pending_waits(std::move(lock), refresh_error);
×
316
        }
×
317
        else if (error) {
56✔
318
            if (error->code() == ErrorCodes::ClientAppDeallocated) {
32✔
319
                return; // this response came in after the app shut down, ignore it
2✔
320
            }
2✔
321
            else if (!session->get_sync_route()) {
30✔
322
                // If the sync route is empty at this point, it means the forced location update
2✔
323
                // failed while trying to start a sync session with a cached user and no other
2✔
324
                // AppServices HTTP requests have been performed since the App was created.
2✔
325
                // Since a valid websocket host url is not available, fail the SyncSession start
2✔
326
                // and pass the error to the user.
2✔
327
                // This function will not log out the user, since it is not known at this point
2✔
328
                // whether or not the user is valid.
2✔
329
                session->handle_location_update_failed(
4✔
330
                    {error->code(), util::format("Unable to reach the server: %1", error->reason())});
4✔
331
            }
4✔
332
            else if (ErrorCodes::error_categories(error->code()).test(ErrorCategory::client_error)) {
26✔
333
                // any other client errors other than app_deallocated are considered fatal because
1✔
334
                // there was a problem locally before even sending the request to the server
1✔
335
                // eg. ClientErrorCode::user_not_found, ClientErrorCode::user_not_logged_in,
1✔
336
                // ClientErrorCode::too_many_redirects
1✔
337
                session->handle_bad_auth(session_user, error->to_status());
2✔
338
            }
2✔
339
            else if (check_for_auth_failure(*error)) {
24✔
340
                // A 401 response on a refresh request means that the token cannot be refreshed and we should not
7✔
341
                // retry. This can be because an admin has revoked this user's sessions, the user has been disabled,
7✔
342
                // or the refresh token has expired according to the server's clock.
7✔
343
                session->handle_bad_auth(
14✔
344
                    session_user,
14✔
345
                    {error->code(), util::format("Unable to refresh the user access token: %1", error->reason())});
14✔
346
            }
14✔
347
            else if (check_for_redirect_response(*error)) {
10✔
348
                // A 301 or 308 response is an unhandled permanent redirect response (which should not happen) - if
349
                // this is received, fail the request with an appropriate error message.
350
                // Temporary redirect responses (302, 307) are not supported
351
                session->handle_bad_auth(
×
352
                    session_user,
×
353
                    {error->code(), util::format("Unhandled redirect response when trying to reach the server: %1",
×
354
                                                 error->reason())});
×
355
            }
×
356
            else {
10✔
357
                // A refresh request has failed. This is an unexpected non-fatal error and we would
5✔
358
                // like to retry but we shouldn't do this immediately in order to not swamp the
5✔
359
                // server with requests. Consider two scenarios:
5✔
360
                // 1) If this request was spawned from the proactive token check, or a user
5✔
361
                // initiated request, the token may actually be valid. Just advance to Active
5✔
362
                // from WaitingForAccessToken if needed and let the sync server tell us if the
5✔
363
                // token is valid or not. If this also fails we will end up in case 2 below.
5✔
364
                // 2) If the sync connection initiated the request because the server is
5✔
365
                // unavailable or the connection otherwise encounters an unexpected error, we want
5✔
366
                // to let the sync client attempt to reinitialize the connection using its own
5✔
367
                // internal backoff timer which will happen automatically so nothing needs to
5✔
368
                // happen here.
5✔
369
                util::CheckedUniqueLock lock(session->m_state_mutex);
10✔
370
                if (session->m_state == State::WaitingForAccessToken) {
10✔
371
                    session->become_active();
2✔
372
                }
2✔
373
            }
10✔
374
        }
32✔
375
        else {
24✔
376
            // If the session needs to be restarted, then restart the session now
12✔
377
            // The latest access token and server url will be pulled from the sync
12✔
378
            // manager when the new session is started.
12✔
379
            if (restart_session) {
24✔
380
                session->restart_session();
2✔
381
            }
2✔
382
            // Otherwise, update the access token and reconnect
11✔
383
            else {
22✔
384
                session->update_access_token(session_user->access_token());
22✔
385
            }
22✔
386
        }
24✔
387
    };
56✔
388
}
56✔
389

390
SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, const RealmConfig& config,
391
                         SyncManager* sync_manager)
392
    : m_config{config}
393
    , m_db{std::move(db)}
394
    , m_original_sync_config{m_config.sync_config}
395
    , m_migration_store{sync::MigrationStore::create(m_db)}
396
    , m_client(client)
397
    , m_sync_manager(sync_manager)
398
    , m_previous_schema_version(_impl::sync_schema_migration::has_pending_migration(*m_db->start_read()))
399
{
1,540✔
400
    REALM_ASSERT(m_config.sync_config);
1,540✔
401
    // we don't want the following configs enabled during a client reset
684✔
402
    m_config.scheduler = nullptr;
1,540✔
403
    m_config.audit_config = nullptr;
1,540✔
404

684✔
405
    // Adjust the sync_config if using PBS sync and already in the migrated or rollback state
684✔
406
    if (m_migration_store->is_migrated() || m_migration_store->is_rollback_in_progress()) {
1,540✔
407
        m_config.sync_config = sync::MigrationStore::convert_sync_config_to_flx(m_original_sync_config);
8✔
408
    }
8✔
409

684✔
410
    // If using FLX, set up m_flx_subscription_store and the history_write_validator
684✔
411
    if (m_config.sync_config->flx_sync_requested) {
1,540✔
412
        create_subscription_store();
562✔
413
        std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
562✔
414
        set_write_validator_factory(weak_sub_mgr);
562✔
415
    }
562✔
416

684✔
417
    // After a migration to FLX, if the user opens the realm with a flexible sync configuration, we need to first
684✔
418
    // upload any unsynced changes before updating to native FLX.
684✔
419
    // A subscription set is used as sentinel so we know when to stop uploading.
684✔
420
    // Note: Currently, a sentinel subscription set is always created even if there is nothing to upload.
684✔
421
    if (m_migration_store->is_migrated() && m_original_sync_config->flx_sync_requested) {
1,540✔
422
        m_migration_store->create_sentinel_subscription_set(*m_flx_subscription_store);
2✔
423
        m_migration_sentinel_query_version = m_migration_store->get_sentinel_subscription_set_version();
2✔
424
        REALM_ASSERT(m_migration_sentinel_query_version);
2✔
425
    }
2✔
426
}
1,540✔
427

428
std::shared_ptr<SyncManager> SyncSession::sync_manager() const
429
{
×
430
    util::CheckedLockGuard lk(m_state_mutex);
×
431
    REALM_ASSERT(m_sync_manager);
×
432
    return m_sync_manager->shared_from_this();
×
433
}
×
434

435
void SyncSession::detach_from_sync_manager()
436
{
10✔
437
    shutdown_and_wait();
10✔
438
    util::CheckedLockGuard lk(m_state_mutex);
10✔
439
    m_sync_manager = nullptr;
10✔
440
}
10✔
441

442
void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
443
{
78✔
444
    util::CheckedLockGuard config_lock(m_config_mutex);
78✔
445
    // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
39✔
446
    std::string recovery_path;
78✔
447
    auto original_path = path();
78✔
448
    error.user_info[SyncError::c_original_file_path_key] = original_path;
78✔
449
    if (should_backup == ShouldBackup::yes) {
78✔
450
        recovery_path = util::reserve_unique_file_name(
78✔
451
            m_sync_manager->recovery_directory_path(m_config.sync_config->recovery_directory),
78✔
452
            util::create_timestamped_template("recovered_realm"));
78✔
453
        error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
78✔
454
    }
78✔
455
    using Action = SyncFileActionMetadata::Action;
78✔
456
    auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
78✔
457
    m_sync_manager->perform_metadata_update([action, original_path = std::move(original_path),
78✔
458
                                             recovery_path = std::move(recovery_path)](const auto& manager) {
75✔
459
        manager.make_file_action_metadata(original_path, action, recovery_path);
72✔
460
    });
72✔
461
}
78✔
462

463
void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
464
{
186✔
465
    // first check that recovery will not be prevented
93✔
466
    if (server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
186✔
467
        auto mode = config(&SyncConfig::client_resync_mode);
4✔
468
        if (mode == ClientResyncMode::Recover) {
4✔
469
            handle_fresh_realm_downloaded(
2✔
470
                nullptr,
2✔
471
                {ErrorCodes::RuntimeError,
2✔
472
                 "A client reset is required but the server does not permit recovery for this client"},
2✔
473
                server_requests_action);
2✔
474
            return;
2✔
475
        }
2✔
476
    }
184✔
477

92✔
478
    std::vector<char> encryption_key;
184✔
479
    {
184✔
480
        util::CheckedLockGuard lock(m_config_mutex);
184✔
481
        encryption_key = m_config.encryption_key;
184✔
482
    }
184✔
483

92✔
484
    DBOptions options;
184✔
485
    options.allow_file_format_upgrade = false;
184✔
486
    options.enable_async_writes = false;
184✔
487
    if (!encryption_key.empty())
184✔
488
        options.encryption_key = encryption_key.data();
2✔
489

92✔
490
    DBRef db;
184✔
491
    auto fresh_path = client_reset::get_fresh_path_for(m_db->get_path());
184✔
492
    try {
184✔
493
        // We want to attempt to use a pre-existing file to reduce the chance of
92✔
494
        // downloading the first part of the file only to then delete it over
92✔
495
        // and over, but if we fail to open it then we should just start over.
92✔
496
        try {
184✔
497
            db = DB::create(sync::make_client_replication(), fresh_path, options);
184✔
498
        }
184✔
499
        catch (...) {
97✔
500
            util::File::try_remove(fresh_path);
10✔
501
        }
10✔
502

92✔
503
        if (!db) {
180✔
504
            db = DB::create(sync::make_client_replication(), fresh_path, options);
2✔
505
        }
2✔
506
    }
176✔
507
    catch (...) {
96✔
508
        // Failed to open the fresh path after attempting to delete it, so we
4✔
509
        // just can't do automatic recovery.
4✔
510
        handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
8✔
511
        return;
8✔
512
    }
8✔
513

88✔
514
    util::CheckedLockGuard state_lock(m_state_mutex);
176✔
515
    if (m_state != State::Active) {
176✔
516
        return;
×
517
    }
×
518
    std::shared_ptr<SyncSession> fresh_sync_session;
176✔
519
    {
176✔
520
        util::CheckedLockGuard config_lock(m_config_mutex);
176✔
521
        RealmConfig config = m_config;
176✔
522
        config.path = fresh_path;
176✔
523
        // in case of migrations use the migrated config
88✔
524
        auto fresh_config = m_migrated_sync_config ? *m_migrated_sync_config : *m_config.sync_config;
162✔
525
        // deep copy the sync config so we don't modify the live session's config
88✔
526
        config.sync_config = std::make_shared<SyncConfig>(fresh_config);
176✔
527
        config.sync_config->client_resync_mode = ClientResyncMode::Manual;
176✔
528
        config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
176✔
529
        fresh_sync_session = m_sync_manager->get_session(db, config);
176✔
530
        auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
176✔
531
        // the fresh Realm may apply writes to this db after it has outlived its sync session
88✔
532
        // the writes are used to generate a changeset for recovery, but are never committed
88✔
533
        history.set_write_validator_factory({});
176✔
534
    }
176✔
535

88✔
536
    fresh_sync_session->assert_mutex_unlocked();
176✔
537
    // The fresh realm uses flexible sync.
88✔
538
    if (auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store()) {
176✔
539
        auto fresh_sub = fresh_sub_store->get_latest();
68✔
540
        // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
34✔
541
        if (auto local_subs_store = m_flx_subscription_store) {
68✔
542
            auto fresh_mut_sub = fresh_sub.make_mutable_copy();
50✔
543
            fresh_mut_sub.import(local_subs_store->get_active());
50✔
544
            fresh_sub = fresh_mut_sub.commit();
50✔
545
        }
50✔
546

34✔
547
        auto self = shared_from_this();
68✔
548
        using SubscriptionState = sync::SubscriptionSet::State;
68✔
549
        fresh_sub.get_state_change_notification(SubscriptionState::Complete)
68✔
550
            .then([=](SubscriptionState) -> util::Future<sync::SubscriptionSet> {
67✔
551
                if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
66✔
552
                    return fresh_sub;
48✔
553
                }
48✔
554
                if (!self->m_migration_store->is_migration_in_progress()) {
18✔
555
                    return fresh_sub;
×
556
                }
×
557

9✔
558
                // fresh_sync_session is using a new realm file that doesn't have the migration_store info
9✔
559
                // so the query string from the local migration store will need to be provided
9✔
560
                auto query_string = self->m_migration_store->get_query_string();
18✔
561
                REALM_ASSERT(query_string);
18✔
562
                // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
9✔
563
                // message.
9✔
564
                fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
18✔
565
                return fresh_sub_store->get_latest()
18✔
566
                    .get_state_change_notification(SubscriptionState::Complete)
18✔
567
                    .then([=](SubscriptionState) {
18✔
568
                        return fresh_sub_store->get_latest();
18✔
569
                    });
18✔
570
            })
18✔
571
            .get_async([=](StatusWith<sync::SubscriptionSet>&& subs) {
68✔
572
                // Keep the sync session alive while it's downloading, but then close
34✔
573
                // it immediately
34✔
574
                fresh_sync_session->force_close();
68✔
575
                if (subs.is_ok()) {
68✔
576
                    self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action,
66✔
577
                                                        std::move(subs.get_value()));
66✔
578
                }
66✔
579
                else {
2✔
580
                    self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), server_requests_action);
2✔
581
                }
2✔
582
            });
68✔
583
    }
68✔
584
    else { // pbs
108✔
585
        fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](Status s) {
108✔
586
            // Keep the sync session alive while it's downloading, but then close
54✔
587
            // it immediately
54✔
588
            fresh_sync_session->force_close();
108✔
589
            if (auto strong_self = weak_self.lock()) {
108✔
590
                strong_self->handle_fresh_realm_downloaded(db, s, server_requests_action);
108✔
591
            }
108✔
592
        });
108✔
593
    }
108✔
594
    fresh_sync_session->revive_if_needed();
176✔
595
}
176✔
596

597
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
598
                                                sync::ProtocolErrorInfo::Action server_requests_action,
599
                                                std::optional<sync::SubscriptionSet> new_subs)
600
{
186✔
601
    util::CheckedUniqueLock lock(m_state_mutex);
186✔
602
    if (m_state != State::Active) {
186✔
603
        return;
×
604
    }
×
605
    // The download can fail for many reasons. For example:
93✔
606
    // - unable to write the fresh copy to the file system
93✔
607
    // - during download of the fresh copy, the fresh copy itself is reset
93✔
608
    // - in FLX mode there was a problem fulfilling the previously active subscription
93✔
609
    if (!status.is_ok()) {
186✔
610
        if (status == ErrorCodes::OperationAborted) {
18✔
611
            return;
6✔
612
        }
6✔
613
        lock.unlock();
12✔
614

6✔
615
        sync::SessionErrorInfo synthetic(
12✔
616
            Status{ErrorCodes::AutoClientResetFailed,
12✔
617
                   util::format("A fatal error occurred during client reset: '%1'", status.reason())},
12✔
618
            sync::IsFatal{true});
12✔
619
        handle_error(synthetic);
12✔
620
        return;
12✔
621
    }
12✔
622

84✔
623
    // Performing a client reset requires tearing down our current
84✔
624
    // sync session and creating a new one with the relevant client reset config. This
84✔
625
    // will result in session completion handlers firing
84✔
626
    // when the old session is torn down, which we don't want as this
84✔
627
    // is supposed to be transparent to the user.
84✔
628
    //
84✔
629
    // To avoid this, we need to move the completion handlers aside temporarily so
84✔
630
    // that moving to the inactive state doesn't clear them - they will be
84✔
631
    // re-registered when the session becomes active again.
84✔
632
    {
168✔
633
        m_server_requests_action = server_requests_action;
168✔
634
        m_client_reset_fresh_copy = db;
168✔
635
        CompletionCallbacks callbacks;
168✔
636
        std::swap(m_completion_callbacks, callbacks);
168✔
637
        // always swap back, even if advance_state throws
84✔
638
        auto guard = util::make_scope_exit([&]() noexcept {
168✔
639
            util::CheckedUniqueLock lock(m_state_mutex);
168✔
640
            if (m_completion_callbacks.empty())
168✔
641
                std::swap(callbacks, m_completion_callbacks);
168✔
642
            else
×
643
                m_completion_callbacks.merge(std::move(callbacks));
×
644
        });
168✔
645
        // Do not cancel the notifications on subscriptions.
84✔
646
        bool cancel_subscription_notifications = false;
168✔
647
        become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock
168✔
648

84✔
649
        // Once the session is inactive, update sync config and subscription store after migration.
84✔
650
        if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
168✔
651
            server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
159✔
652
            apply_sync_config_after_migration_or_rollback();
26✔
653
            auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
26✔
654
            update_subscription_store(flx_sync_requested, std::move(new_subs));
26✔
655
        }
26✔
656
    }
168✔
657
    revive_if_needed();
168✔
658
}
168✔
659

660
util::Future<void> SyncSession::Internal::pause_async(SyncSession& session)
661
{
22✔
662
    {
22✔
663
        util::CheckedUniqueLock lock(session.m_state_mutex);
22✔
664
        // Nothing to wait for if the session is already paused or inactive.
11✔
665
        if (session.m_state == SyncSession::State::Paused || session.m_state == SyncSession::State::Inactive) {
22✔
666
            return util::Future<void>::make_ready();
×
667
        }
×
668
    }
22✔
669
    // Transition immediately to `paused` state. Calling this function must guarantee that any
11✔
670
    // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
11✔
671
    // must have been destroyed upon return. This allows the caller to follow up with a call to
11✔
672
    // sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
11✔
673
    // so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
11✔
674
    session.pause();
22✔
675
    return session.m_client.notify_session_terminated();
22✔
676
}
22✔
677

678
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
679
{
16✔
680
    session.handle_error(std::move(error));
16✔
681
}
16✔
682

683
util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
684
{
2✔
685
    return SyncSession::Internal::pause_async(session);
2✔
686
}
2✔
687

688
// This method should only be called from within the error handler callback registered upon the underlying
689
// `m_session`.
690
void SyncSession::handle_error(sync::SessionErrorInfo error)
691
{
467✔
692
    enum class NextStateAfterError { none, inactive, error };
467✔
693
    auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
386✔
694
    util::Optional<ShouldBackup> delete_file;
467✔
695
    bool log_out_user = false;
467✔
696
    bool unrecognized_by_client = false;
467✔
697

231✔
698
    if (error.status == ErrorCodes::AutoClientResetFailed) {
467✔
699
        // At this point, automatic recovery has been attempted but it failed.
25✔
700
        // Fallback to a manual reset and let the user try to handle it.
25✔
701
        next_state = NextStateAfterError::inactive;
50✔
702
        delete_file = ShouldBackup::yes;
50✔
703
    }
50✔
704
    else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
417✔
705
        switch (error.server_requests_action) {
405✔
706
            case sync::ProtocolErrorInfo::Action::NoAction:
✔
707
                REALM_UNREACHABLE(); // This is not sent by the MongoDB server
708
            case sync::ProtocolErrorInfo::Action::ApplicationBug:
37✔
709
                [[fallthrough]];
37✔
710
            case sync::ProtocolErrorInfo::Action::ProtocolViolation:
43✔
711
                next_state = NextStateAfterError::inactive;
43✔
712
                break;
43✔
713
            case sync::ProtocolErrorInfo::Action::Warning:
34✔
714
                break; // not fatal, but should be bubbled up to the user below.
34✔
715
            case sync::ProtocolErrorInfo::Action::Transient:
65✔
716
                // Not real errors, don't need to be reported to the binding.
32✔
717
                return;
65✔
718
            case sync::ProtocolErrorInfo::Action::DeleteRealm:
17✔
719
                next_state = NextStateAfterError::inactive;
×
720
                delete_file = ShouldBackup::no;
×
721
                break;
×
722
            case sync::ProtocolErrorInfo::Action::ClientReset:
182✔
723
                [[fallthrough]];
182✔
724
            case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
186✔
725
                switch (config(&SyncConfig::client_resync_mode)) {
186✔
726
                    case ClientResyncMode::Manual:
28✔
727
                        next_state = NextStateAfterError::inactive;
28✔
728
                        delete_file = ShouldBackup::yes;
28✔
729
                        break;
28✔
730
                    case ClientResyncMode::DiscardLocal:
82✔
731
                        [[fallthrough]];
82✔
732
                    case ClientResyncMode::RecoverOrDiscard:
96✔
733
                        [[fallthrough]];
96✔
734
                    case ClientResyncMode::Recover:
158✔
735
                        download_fresh_realm(error.server_requests_action);
158✔
736
                        return; // do not propagate the error to the user at this point
158✔
737
                }
28✔
738
                break;
28✔
739
            case sync::ProtocolErrorInfo::Action::MigrateToFLX:
23✔
740
                // Should not receive this error if original sync config is FLX
9✔
741
                REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
18✔
742
                REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
18✔
743
                // Original config was PBS, migrating to FLX
9✔
744
                m_migration_store->migrate_to_flx(*error.migration_query_string,
18✔
745
                                                  m_original_sync_config->partition_value);
18✔
746
                save_sync_config_after_migration_or_rollback();
18✔
747
                download_fresh_realm(error.server_requests_action);
18✔
748
                return;
18✔
749
            case sync::ProtocolErrorInfo::Action::RevertToPBS:
19✔
750
                // If the client was updated to use FLX natively, but the server was rolled back to PBS,
5✔
751
                // the server should be sending switch_to_flx_sync; throw exception if this error is not
5✔
752
                // received.
5✔
753
                if (m_original_sync_config->flx_sync_requested) {
10✔
754
                    throw LogicError(ErrorCodes::InvalidServerResponse,
×
755
                                     "Received 'RevertToPBS' from server after rollback while client is natively "
×
756
                                     "using FLX - expected 'SwitchToPBS'");
×
757
                }
×
758
                // Original config was PBS, rollback the migration
5✔
759
                m_migration_store->rollback_to_pbs();
10✔
760
                save_sync_config_after_migration_or_rollback();
10✔
761
                download_fresh_realm(error.server_requests_action);
10✔
762
                return;
10✔
763
            case sync::ProtocolErrorInfo::Action::RefreshUser:
18✔
764
                if (auto u = user()) {
18✔
765
                    u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
18✔
766
                    return;
18✔
767
                }
18✔
768
                break;
×
769
            case sync::ProtocolErrorInfo::Action::RefreshLocation:
6✔
770
                if (auto u = user()) {
6✔
771
                    u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
6✔
772
                    return;
6✔
773
                }
6✔
774
                break;
×
775
            case sync::ProtocolErrorInfo::Action::LogOutUser:
✔
776
                next_state = NextStateAfterError::inactive;
×
777
                log_out_user = true;
×
778
                break;
×
779
            case sync::ProtocolErrorInfo::Action::MigrateSchema:
25✔
780
                util::CheckedUniqueLock lock(m_state_mutex);
25✔
781
                // Should only be received for FLX sync.
12✔
782
                REALM_ASSERT(m_original_sync_config->flx_sync_requested);
25✔
783
                m_previous_schema_version = error.previous_schema_version;
25✔
784
                return; // do not propagate the error to the user at this point
25✔
785
        }
12✔
786
    }
12✔
787
    else {
12✔
788
        // Unrecognized error code.
6✔
789
        unrecognized_by_client = true;
12✔
790
    }
12✔
791

231✔
792
    util::CheckedUniqueLock lock(m_state_mutex);
316✔
793
    SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
167✔
794
    // `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
82✔
795
    sync_error.server_requests_action = error.server_requests_action;
167✔
796
    sync_error.is_unrecognized_by_client = unrecognized_by_client;
167✔
797

82✔
798
    if (delete_file)
167✔
799
        update_error_and_mark_file_for_deletion(sync_error, *delete_file);
78✔
800

82✔
801
    if (m_state == State::Dying && error.is_fatal) {
167✔
802
        become_inactive(std::move(lock), error.status);
2✔
803
        return;
2✔
804
    }
2✔
805

81✔
806
    // Don't bother invoking m_config.error_handler if the sync is inactive.
81✔
807
    // It does not make sense to call the handler when the session is closed.
81✔
808
    if (m_state == State::Inactive || m_state == State::Paused) {
165✔
UNCOV
809
        return;
×
UNCOV
810
    }
×
811

81✔
812
    switch (next_state) {
165✔
813
        case NextStateAfterError::none:
46✔
814
            if (config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
46✔
815
                cancel_pending_waits(std::move(lock), sync_error.status); // unlocks the mutex
×
816
            }
×
817
            break;
46✔
818
        case NextStateAfterError::inactive: {
119✔
819
            become_inactive(std::move(lock), sync_error.status);
119✔
820
            break;
119✔
821
        }
×
822
        case NextStateAfterError::error: {
✔
823
            cancel_pending_waits(std::move(lock), sync_error.status);
×
824
            break;
×
825
        }
165✔
826
    }
165✔
827

81✔
828
    if (log_out_user) {
165✔
829
        if (auto u = user())
×
830
            u->log_out();
×
831
    }
×
832

81✔
833
    if (auto error_handler = config(&SyncConfig::error_handler)) {
165✔
834
        error_handler(shared_from_this(), std::move(sync_error));
157✔
835
    }
157✔
836
}
165✔
837

838
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
839
{
16✔
840
    CompletionCallbacks callbacks;
16✔
841
    std::swap(callbacks, m_completion_callbacks);
16✔
842

8✔
843
    // Inform any waiters on pending subscription states that they were cancelled
8✔
844
    if (m_flx_subscription_store) {
16✔
845
        auto subscription_store = m_flx_subscription_store;
×
846
        m_state_mutex.unlock(lock);
×
847
        subscription_store->notify_all_state_change_notifications(error);
×
848
    }
×
849
    else {
16✔
850
        m_state_mutex.unlock(lock);
16✔
851
    }
16✔
852

8✔
853
    // Inform any queued-up completion handlers that they were cancelled.
8✔
854
    for (auto& [id, callback] : callbacks)
16✔
855
        callback.second(error);
14✔
856
}
16✔
857

858
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
859
                                         uint64_t uploadable, uint64_t download_version, uint64_t snapshot_version)
860
{
6,955✔
861
    m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, download_version, snapshot_version);
6,955✔
862
}
6,955✔
863

864
static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
865
                                                                   const std::shared_ptr<SyncConfig>& sync_config,
866
                                                                   DBRef&& fresh_copy, bool recovery_is_allowed,
867
                                                                   bool schema_migration_detected)
868
{
168✔
869
    REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);
168✔
870

84✔
871
    sync::Session::Config::ClientReset config;
168✔
872
    config.mode = sync_config->client_resync_mode;
168✔
873
    config.fresh_copy = std::move(fresh_copy);
168✔
874
    config.recovery_is_allowed = recovery_is_allowed;
168✔
875

84✔
876
    // The conditions here are asymmetric because if we have *either* a before
84✔
877
    // or after callback we need to make sure to initialize the local schema
84✔
878
    // before the client reset happens.
84✔
879
    if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
168✔
880
        return config;
28✔
881

70✔
882
    // We cannot initialize the local schema in case of a sync schema migration.
70✔
883
    // Currently, a schema migration involves breaking changes so opening the realm
70✔
884
    // with the new schema results in a crash.
70✔
885
    if (schema_migration_detected)
140✔
886
        return config;
2✔
887

69✔
888
    RealmConfig realm_config = base_config;
138✔
889
    realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
138✔
890
    realm_config.scheduler = util::Scheduler::make_dummy();
138✔
891

69✔
892
    if (sync_config->notify_after_client_reset) {
138✔
893
        config.notify_after_client_reset = [realm_config](VersionID previous_version, bool did_recover) {
117✔
894
            auto coordinator = _impl::RealmCoordinator::get_coordinator(realm_config);
100✔
895
            ThreadSafeReference active_after = coordinator->get_unbound_realm();
100✔
896
            SharedRealm frozen_before = coordinator->get_realm(realm_config, previous_version);
100✔
897
            REALM_ASSERT(frozen_before);
100✔
898
            REALM_ASSERT(frozen_before->is_frozen());
100✔
899
            realm_config.sync_config->notify_after_client_reset(std::move(frozen_before), std::move(active_after),
100✔
900
                                                                did_recover);
100✔
901
        };
100✔
902
    }
134✔
903
    config.notify_before_client_reset = [config = std::move(realm_config)]() -> VersionID {
138✔
904
        // Opening the Realm live here may make a write if the schema is different
69✔
905
        // than what exists on disk. It is necessary to pass a fully usable Realm
69✔
906
        // to the user here. Note that the schema changes made here will be considered
69✔
907
        // an "offline write" to be recovered if this is recovery mode.
69✔
908
        auto before = Realm::get_shared_realm(config);
138✔
909
        if (auto& notify_before = config.sync_config->notify_before_client_reset) {
138✔
910
            notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
134✔
911
        }
136✔
912
        // Note that if the SDK wrote to the Realm (hopefully by requesting a
69✔
913
        // live instance and not opening a secondary one), this may be a
69✔
914
        // different version than what we had before calling the callback.
69✔
915
        before->refresh();
138✔
916
        return before->read_transaction_version();
138✔
917
    };
138✔
918

69✔
919
    return config;
138✔
920
}
138✔
921

922
void SyncSession::create_sync_session()
923
{
1,887✔
924
    if (m_session)
1,887✔
925
        return;
×
926

856✔
927
    util::CheckedLockGuard config_lock(m_config_mutex);
1,887✔
928

856✔
929
    REALM_ASSERT(m_config.sync_config);
1,887✔
930
    SyncConfig& sync_config = *m_config.sync_config;
1,887✔
931
    REALM_ASSERT(sync_config.user);
1,887✔
932

856✔
933
    sync::Session::Config session_config;
1,887✔
934
    session_config.signed_user_token = sync_config.user->access_token();
1,887✔
935
    session_config.user_id = sync_config.user->identity();
1,887✔
936
    session_config.realm_identifier = sync_config.partition_value;
1,887✔
937
    session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
1,887✔
938
    session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
1,887✔
939
    session_config.ssl_verify_callback = sync_config.ssl_verify_callback;
1,887✔
940
    session_config.proxy_config = sync_config.proxy_config;
1,887✔
941
    session_config.simulate_integration_error = sync_config.simulate_integration_error;
1,887✔
942
    session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
1,887✔
943
    session_config.session_reason =
1,887✔
944
        client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
1,803✔
945
    session_config.schema_version = m_config.schema_version;
1,887✔
946

856✔
947
    if (sync_config.on_sync_client_event_hook) {
1,887✔
948
        session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
100✔
949
                                                    anchor = weak_from_this()](const SyncClientHookData& data) {
987✔
950
            return hook(anchor, data);
987✔
951
        };
987✔
952
    }
100✔
953

856✔
954
    {
1,887✔
955
        // At this point, the sync_route should be valid, since the location should have been updated by
856✔
956
        // an AppServices http request or by updating the access token before starting the sync session.
856✔
957
        auto sync_route = m_sync_manager->sync_route();
1,887✔
958
        REALM_ASSERT_EX(sync_route && !sync_route->empty(), "Location was not updated prior to sync session start");
1,887✔
959

856✔
960
        if (!m_client.decompose_server_url(*sync_route, session_config.protocol_envelope,
1,887✔
961
                                           session_config.server_address, session_config.server_port,
1,887✔
962
                                           session_config.service_identifier)) {
856✔
963
            throw sync::BadServerUrl(*sync_route);
×
964
        }
×
965

856✔
966
        m_server_url = *sync_route;
1,887✔
967
    }
1,887✔
968

856✔
969
    if (sync_config.authorization_header_name) {
1,887✔
970
        session_config.authorization_header_name = *sync_config.authorization_header_name;
×
971
    }
×
972
    session_config.custom_http_headers = sync_config.custom_http_headers;
1,887✔
973

856✔
974
    if (m_server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
1,887✔
975
        // Migrations are allowed to recover local data.
84✔
976
        const bool allowed_to_recover = m_server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset ||
168✔
977
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
98✔
978
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
89✔
979
        // Use the original sync config, not the updated one from the migration store
84✔
980
        session_config.client_reset_config =
168✔
981
            make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy),
168✔
982
                                     allowed_to_recover, m_previous_schema_version.has_value());
168✔
983
        session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
168✔
984
        m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
168✔
985
    }
168✔
986

856✔
987
    m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
1,887✔
988

856✔
989
    std::weak_ptr<SyncSession> weak_self = weak_from_this();
1,887✔
990

856✔
991
    // Set up the wrapped progress handler callback
856✔
992
    m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
1,887✔
993
                                                uint_fast64_t uploaded, uint_fast64_t uploadable,
1,887✔
994
                                                uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
7,151✔
995
        if (auto self = weak_self.lock()) {
7,151✔
996
            self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, progress_version,
6,955✔
997
                                         snapshot_version);
6,955✔
998
        }
6,955✔
999
    });
7,151✔
1000

856✔
1001
    // Sets up the connection state listener. This callback is used for both reporting errors as well as changes to
856✔
1002
    // the connection state.
856✔
1003
    m_session->set_connection_state_change_listener(
1,887✔
1004
        [weak_self](sync::ConnectionState state, util::Optional<sync::SessionErrorInfo> error) {
4,067✔
1005
            // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is
1,875✔
1006
            // nothing useful we can do with them.
1,875✔
1007
            auto self = weak_self.lock();
4,067✔
1008
            if (!self) {
4,067✔
1009
                return;
30✔
1010
            }
30✔
1011
            using cs = sync::ConnectionState;
4,037✔
1012
            ConnectionState new_state = [&] {
4,037✔
1013
                switch (state) {
4,037✔
1014
                    case cs::disconnected:
380✔
1015
                        return ConnectionState::Disconnected;
380✔
1016
                    case cs::connecting:
1,859✔
1017
                        return ConnectionState::Connecting;
1,859✔
1018
                    case cs::connected:
1,798✔
1019
                        return ConnectionState::Connected;
1,798✔
1020
                }
×
1021
                REALM_UNREACHABLE();
1022
            }();
×
1023
            util::CheckedUniqueLock lock(self->m_connection_state_mutex);
4,037✔
1024
            auto old_state = self->m_connection_state;
4,037✔
1025
            self->m_connection_state = new_state;
4,037✔
1026
            lock.unlock();
4,037✔
1027

1,866✔
1028
            if (old_state != new_state) {
4,037✔
1029
                self->m_connection_change_notifier.invoke_callbacks(old_state, new_state);
3,970✔
1030
            }
3,970✔
1031

1,866✔
1032
            if (error) {
4,037✔
1033
                self->handle_error(std::move(*error));
439✔
1034
            }
439✔
1035
        });
4,037✔
1036
}
1,887✔
1037

1038
void SyncSession::nonsync_transact_notify(sync::version_type version)
1039
{
8,353✔
1040
    m_progress_notifier.set_local_version(version);
8,353✔
1041

4,162✔
1042
    util::CheckedUniqueLock lock(m_state_mutex);
8,353✔
1043
    switch (m_state) {
8,353✔
1044
        case State::Active:
8,060✔
1045
        case State::WaitingForAccessToken:
8,061✔
1046
            if (m_session) {
8,061✔
1047
                m_session->nonsync_transact_notify(version);
8,059✔
1048
            }
8,059✔
1049
            break;
8,061✔
1050
        case State::Dying:
4,019✔
1051
        case State::Inactive:
157✔
1052
        case State::Paused:
292✔
1053
            break;
292✔
1054
    }
8,353✔
1055
}
8,353✔
1056

1057
void SyncSession::revive_if_needed()
1058
{
2,259✔
1059
    util::CheckedUniqueLock lock(m_state_mutex);
2,259✔
1060
    switch (m_state) {
2,259✔
1061
        case State::Active:
523✔
1062
        case State::WaitingForAccessToken:
523✔
1063
        case State::Paused:
526✔
1064
            return;
526✔
1065
        case State::Dying:
780✔
1066
        case State::Inactive:
1,733✔
1067
            do_revive(std::move(lock));
1,733✔
1068
            break;
1,733✔
1069
    }
2,259✔
1070
}
2,259✔
1071

1072
void SyncSession::handle_reconnect()
1073
{
4✔
1074
    util::CheckedUniqueLock lock(m_state_mutex);
4✔
1075
    switch (m_state) {
4✔
1076
        case State::Active:
4✔
1077
            m_session->cancel_reconnect_delay();
4✔
1078
            break;
4✔
1079
        case State::Dying:
✔
1080
        case State::Inactive:
✔
1081
        case State::WaitingForAccessToken:
✔
1082
        case State::Paused:
✔
1083
            break;
×
1084
    }
4✔
1085
}
4✔
1086

1087
void SyncSession::force_close()
1088
{
290✔
1089
    util::CheckedUniqueLock lock(m_state_mutex);
290✔
1090
    switch (m_state) {
290✔
1091
        case State::Active:
255✔
1092
        case State::Dying:
273✔
1093
        case State::WaitingForAccessToken:
277✔
1094
            become_inactive(std::move(lock));
277✔
1095
            break;
277✔
1096
        case State::Inactive:
132✔
1097
        case State::Paused:
13✔
1098
            break;
13✔
1099
    }
290✔
1100
}
290✔
1101

1102
void SyncSession::pause()
1103
{
190✔
1104
    util::CheckedUniqueLock lock(m_state_mutex);
190✔
1105
    switch (m_state) {
190✔
1106
        case State::Active:
187✔
1107
        case State::Dying:
187✔
1108
        case State::WaitingForAccessToken:
187✔
1109
        case State::Inactive:
188✔
1110
            become_paused(std::move(lock));
188✔
1111
            break;
188✔
1112
        case State::Paused:
95✔
1113
            break;
2✔
1114
    }
190✔
1115
}
190✔
1116

1117
void SyncSession::resume()
1118
{
160✔
1119
    util::CheckedUniqueLock lock(m_state_mutex);
160✔
1120
    switch (m_state) {
160✔
1121
        case State::Active:
✔
1122
        case State::WaitingForAccessToken:
✔
1123
            return;
×
1124
        case State::Paused:
158✔
1125
        case State::Dying:
158✔
1126
        case State::Inactive:
160✔
1127
            do_revive(std::move(lock));
160✔
1128
            break;
160✔
1129
    }
160✔
1130
}
160✔
1131

1132
void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
1133
{
1,893✔
1134
    auto u = user();
1,893✔
1135
    // If the sync manager has a valid route and the user and it's access token
859✔
1136
    // are valid, then revive the session.
859✔
1137
    if (m_sync_manager->sync_route() && (!u || !u->access_token_refresh_required())) {
1,893✔
1138
        become_active();
1,861✔
1139
        m_state_mutex.unlock(lock);
1,861✔
1140
        return;
1,861✔
1141
    }
1,861✔
1142

16✔
1143
    // Otherwise, either the access token has expired or the location info hasn't
16✔
1144
    // been requested since the app was started - request a new access token to
16✔
1145
    // refresh both.
16✔
1146
    become_waiting_for_access_token();
32✔
1147
    // Release the lock for SDKs with a single threaded
16✔
1148
    // networking implementation such as our test suite
16✔
1149
    // so that the update can trigger a state change from
16✔
1150
    // the completion handler.
16✔
1151
    m_state_mutex.unlock(lock);
32✔
1152
    initiate_access_token_refresh();
32✔
1153
}
32✔
1154

1155
void SyncSession::close()
1156
{
100✔
1157
    util::CheckedUniqueLock lock(m_state_mutex);
100✔
1158
    close(std::move(lock));
100✔
1159
}
100✔
1160

1161
void SyncSession::close(util::CheckedUniqueLock lock)
1162
{
1,640✔
1163
    switch (m_state) {
1,640✔
1164
        case State::Active: {
1,094✔
1165
            switch (config(&SyncConfig::stop_policy)) {
1,094✔
1166
                case SyncSessionStopPolicy::Immediately:
1,014✔
1167
                    become_inactive(std::move(lock));
1,014✔
1168
                    break;
1,014✔
1169
                case SyncSessionStopPolicy::LiveIndefinitely:
✔
1170
                    // Don't do anything; session lives forever.
1171
                    m_state_mutex.unlock(lock);
×
1172
                    break;
×
1173
                case SyncSessionStopPolicy::AfterChangesUploaded:
80✔
1174
                    // Wait for all pending changes to upload.
32✔
1175
                    become_dying(std::move(lock));
80✔
1176
                    break;
80✔
1177
            }
1,094✔
1178
            break;
1,094✔
1179
        }
1,094✔
1180
        case State::Dying:
516✔
1181
            m_state_mutex.unlock(lock);
18✔
1182
            break;
18✔
1183
        case State::Paused:
515✔
1184
        case State::Inactive: {
526✔
1185
            // We need to unregister from the sync manager if it still exists so that we don't end up
197✔
1186
            // holding the DBRef open after the session is closed. Otherwise we can end up preventing
197✔
1187
            // the user from deleting the realm when it's in the paused/inactive state.
197✔
1188
            if (m_sync_manager) {
526✔
1189
                m_sync_manager->unregister_session(m_db->get_path());
522✔
1190
            }
522✔
1191
            m_state_mutex.unlock(lock);
526✔
1192
            break;
526✔
1193
        }
213✔
1194
        case State::WaitingForAccessToken:
198✔
1195
            // Immediately kill the session.
1✔
1196
            become_inactive(std::move(lock));
2✔
1197
            break;
2✔
1198
    }
1,640✔
1199
}
1,640✔
1200

1201
void SyncSession::shutdown_and_wait()
1202
{
133✔
1203
    {
133✔
1204
        // Transition immediately to `inactive` state. Calling this function must guarantee that any
17✔
1205
        // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
17✔
1206
        // must have been destroyed upon return. This allows the caller to follow up with a call to
17✔
1207
        // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
17✔
1208
        // Realm file to be closed. This works so long as this SyncSession object remains in the
17✔
1209
        // `inactive` state after the invocation of shutdown_and_wait().
17✔
1210
        util::CheckedUniqueLock lock(m_state_mutex);
133✔
1211
        if (m_state != State::Inactive && m_state != State::Paused) {
133✔
1212
            become_inactive(std::move(lock));
77✔
1213
        }
77✔
1214
    }
133✔
1215
    m_client.wait_for_session_terminations();
133✔
1216
}
133✔
1217

1218
void SyncSession::update_access_token(const std::string& signed_token)
1219
{
24✔
1220
    util::CheckedUniqueLock lock(m_state_mutex);
24✔
1221
    // We don't expect there to be a session when waiting for access token, but if there is, refresh its token.
12✔
1222
    // If not, the latest token will be seeded from SyncUser::access_token() on session creation.
12✔
1223
    if (m_session) {
24✔
1224
        m_session->refresh(signed_token);
2✔
1225
    }
2✔
1226
    if (m_state == State::WaitingForAccessToken) {
24✔
1227
        become_active();
16✔
1228
    }
16✔
1229
}
24✔
1230

1231
void SyncSession::initiate_access_token_refresh()
1232
{
32✔
1233
    if (auto session_user = user()) {
32✔
1234
        session_user->refresh_custom_data(handle_refresh(shared_from_this(), false));
32✔
1235
    }
32✔
1236
}
32✔
1237

1238
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback,
1239
                                          _impl::SyncProgressNotifier::NotifierType direction)
1240
{
2,413✔
1241
    bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);
2,413✔
1242

1,164✔
1243
    m_completion_request_counter++;
2,413✔
1244
    m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
2,413✔
1245
                                        std::make_pair(direction, std::move(callback)));
2,413✔
1246
    // If the state is inactive then just store the callback and return. The callback will get
1,164✔
1247
    // re-registered with the underlying session if/when the session ever becomes active again.
1,164✔
1248
    if (!m_session) {
2,413✔
1249
        return;
257✔
1250
    }
257✔
1251

1,043✔
1252
    auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
2,156✔
1253
                              : &sync::Session::async_wait_for_upload_completion;
1,590✔
1254

1,043✔
1255
    (m_session.get()->*waiter)([weak_self = weak_from_this(), id = m_completion_request_counter](Status status) {
2,156✔
1256
        auto self = weak_self.lock();
2,156✔
1257
        if (!self)
2,156✔
1258
            return;
56✔
1259
        util::CheckedUniqueLock lock(self->m_state_mutex);
2,100✔
1260
        auto callback_node = self->m_completion_callbacks.extract(id);
2,100✔
1261
        lock.unlock();
2,100✔
1262
        if (callback_node) {
2,100✔
1263
            callback_node.mapped().second(std::move(status));
1,908✔
1264
        }
1,908✔
1265
    });
2,100✔
1266
}
2,156✔
1267

1268
void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(Status)>&& callback)
1269
{
896✔
1270
    util::CheckedUniqueLock lock(m_state_mutex);
896✔
1271
    add_completion_callback(std::move(callback), ProgressDirection::upload);
896✔
1272
}
896✔
1273

1274
void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)>&& callback)
1275
{
1,108✔
1276
    util::CheckedUniqueLock lock(m_state_mutex);
1,108✔
1277
    add_completion_callback(std::move(callback), ProgressDirection::download);
1,108✔
1278
}
1,108✔
1279

1280
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
1281
                                                 ProgressDirection direction, bool is_streaming)
1282
{
4✔
1283
    return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
4✔
1284
}
4✔
1285

1286
void SyncSession::unregister_progress_notifier(uint64_t token)
1287
{
4✔
1288
    m_progress_notifier.unregister_callback(token);
4✔
1289
}
4✔
1290

1291
uint64_t SyncSession::register_connection_change_callback(std::function<ConnectionStateChangeCallback>&& callback)
1292
{
6✔
1293
    return m_connection_change_notifier.add_callback(std::move(callback));
6✔
1294
}
6✔
1295

1296
void SyncSession::unregister_connection_change_callback(uint64_t token)
1297
{
2✔
1298
    m_connection_change_notifier.remove_callback(token);
2✔
1299
}
2✔
1300

1301
SyncSession::~SyncSession() {}
1,540✔
1302

1303
SyncSession::State SyncSession::state() const
1304
{
16,309✔
1305
    util::CheckedUniqueLock lock(m_state_mutex);
16,309✔
1306
    return m_state;
16,309✔
1307
}
16,309✔
1308

1309
SyncSession::ConnectionState SyncSession::connection_state() const
1310
{
2,752✔
1311
    util::CheckedUniqueLock lock(m_connection_state_mutex);
2,752✔
1312
    return m_connection_state;
2,752✔
1313
}
2,752✔
1314

1315
std::string const& SyncSession::path() const
1316
{
1,786✔
1317
    return m_db->get_path();
1,786✔
1318
}
1,786✔
1319

1320
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store()
1321
{
2,985,604✔
1322
    util::CheckedLockGuard lock(m_state_mutex);
2,985,604✔
1323
    return m_flx_subscription_store;
2,985,604✔
1324
}
2,985,604✔
1325

1326
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
1327
{
2✔
1328
    util::CheckedLockGuard lock(m_state_mutex);
2✔
1329
    return m_subscription_store_base;
2✔
1330
}
2✔
1331

1332
sync::SaltedFileIdent SyncSession::get_file_ident() const
1333
{
158✔
1334
    auto repl = m_db->get_replication();
158✔
1335
    REALM_ASSERT(repl);
158✔
1336
    REALM_ASSERT(dynamic_cast<sync::ClientReplication*>(repl));
158✔
1337

79✔
1338
    sync::SaltedFileIdent ret;
158✔
1339
    sync::version_type unused_version;
158✔
1340
    sync::SyncProgress unused_progress;
158✔
1341
    static_cast<sync::ClientReplication*>(repl)->get_history().get_status(unused_version, ret, unused_progress);
158✔
1342
    return ret;
158✔
1343
}
158✔
1344

1345
std::string SyncSession::get_appservices_connection_id() const
1346
{
20✔
1347
    util::CheckedLockGuard lk(m_state_mutex);
20✔
1348
    if (!m_session) {
20✔
1349
        return {};
×
1350
    }
×
1351
    return m_session->get_appservices_connection_id();
20✔
1352
}
20✔
1353

1354
std::optional<std::string> SyncSession::get_sync_route() const
1355
{
30✔
1356
    util::CheckedLockGuard lk(m_state_mutex);
30✔
1357
    if (!m_sync_manager) {
30✔
1358
        return std::nullopt;
×
1359
    }
×
1360
    return m_sync_manager->sync_route();
30✔
1361
}
30✔
1362

1363
void SyncSession::update_configuration(SyncConfig new_config)
1364
{
8✔
1365
    while (true) {
18✔
1366
        util::CheckedUniqueLock state_lock(m_state_mutex);
18✔
1367
        if (m_state != State::Inactive && m_state != State::Paused) {
18✔
1368
            // Changing the state releases the lock, which means that by the
5✔
1369
            // time we reacquire the lock the state may have changed again
5✔
1370
            // (either due to one of the callbacks being invoked or another
5✔
1371
            // thread coincidentally doing something). We just attempt to keep
5✔
1372
            // switching it to inactive until it stays there.
5✔
1373
            become_inactive(std::move(state_lock));
10✔
1374
            continue;
10✔
1375
        }
10✔
1376

4✔
1377
        util::CheckedUniqueLock config_lock(m_config_mutex);
8✔
1378
        REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
8!
1379
        REALM_ASSERT(!m_session);
8✔
1380
        REALM_ASSERT(m_config.sync_config->user == new_config.user);
8✔
1381
        // Since this is used for testing purposes only, just update the current sync_config
4✔
1382
        m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
8✔
1383
        break;
8✔
1384
    }
8✔
1385
    revive_if_needed();
8✔
1386
}
8✔
1387

1388
void SyncSession::apply_sync_config_after_migration_or_rollback()
1389
{
26✔
1390
    // Migration state changed - Update the configuration to
13✔
1391
    // match the new sync mode.
13✔
1392
    util::CheckedLockGuard cfg_lock(m_config_mutex);
26✔
1393
    if (!m_migrated_sync_config)
26✔
1394
        return;
2✔
1395

12✔
1396
    m_config.sync_config = m_migrated_sync_config;
24✔
1397
    m_migrated_sync_config.reset();
24✔
1398
}
24✔
1399

1400
void SyncSession::save_sync_config_after_migration_or_rollback()
1401
{
28✔
1402
    util::CheckedLockGuard cfg_lock(m_config_mutex);
28✔
1403
    m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
28✔
1404
}
28✔
1405

1406
void SyncSession::update_subscription_store(bool flx_sync_requested, std::optional<sync::SubscriptionSet> new_subs)
1407
{
26✔
1408
    util::CheckedUniqueLock lock(m_state_mutex);
26✔
1409

13✔
1410
    // The session should be closed before updating the FLX subscription store
13✔
1411
    REALM_ASSERT(!m_session);
26✔
1412

13✔
1413
    // If the subscription store exists and switching to PBS, then clear the store
13✔
1414
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
26✔
1415
    if (!flx_sync_requested) {
26✔
1416
        if (m_flx_subscription_store) {
8✔
1417
            // Empty the subscription store and cancel any pending subscription notification
4✔
1418
            // waiters
4✔
1419
            auto subscription_store = std::move(m_flx_subscription_store);
8✔
1420
            lock.unlock();
8✔
1421
            subscription_store->terminate();
8✔
1422
            auto tr = m_db->start_write();
8✔
1423
            history.set_write_validator_factory(nullptr);
8✔
1424
            tr->rollback();
8✔
1425
        }
8✔
1426
        return;
8✔
1427
    }
8✔
1428

9✔
1429
    if (m_flx_subscription_store)
18✔
1430
        return; // Using FLX and subscription store already exists
2✔
1431

8✔
1432
    // Going from PBS -> FLX (or one doesn't exist yet), create a new subscription store
8✔
1433
    create_subscription_store();
16✔
1434

8✔
1435
    std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
16✔
1436

8✔
1437
    // If migrated to FLX, create subscriptions in the local realm to cover the existing data.
8✔
1438
    // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
8✔
1439
    if (new_subs) {
16✔
1440
        auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
16✔
1441
        active_mut_sub.import(std::move(*new_subs));
16✔
1442
        active_mut_sub.set_state(sync::SubscriptionSet::State::Complete);
16✔
1443
        active_mut_sub.commit();
16✔
1444
    }
16✔
1445

8✔
1446
    auto tr = m_db->start_write();
16✔
1447
    set_write_validator_factory(weak_sub_mgr);
16✔
1448
    tr->rollback();
16✔
1449
}
16✔
1450

1451
void SyncSession::create_subscription_store()
1452
{
578✔
1453
    REALM_ASSERT(!m_flx_subscription_store);
578✔
1454

289✔
1455
    // Create the main subscription store instance when this is first called - this will
289✔
1456
    // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
289✔
1457
    // will be reset when rolling back to PBS after a client FLX migration
289✔
1458
    if (!m_subscription_store_base) {
578✔
1459
        m_subscription_store_base = sync::SubscriptionStore::create(m_db);
576✔
1460
    }
576✔
1461

289✔
1462
    // m_subscription_store_base is always around for the life of SyncSession, but the
289✔
1463
    // m_flx_subscription_store is set when using FLX.
289✔
1464
    m_flx_subscription_store = m_subscription_store_base;
578✔
1465
}
578✔
1466

1467
void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
1468
{
578✔
1469
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
578✔
1470
    history.set_write_validator_factory(
578✔
1471
        [weak_sub_mgr](Transaction& tr) -> util::UniqueFunction<sync::SyncReplication::WriteValidator> {
5,824✔
1472
            auto sub_mgr = weak_sub_mgr.lock();
5,824✔
1473
            REALM_ASSERT_RELEASE(sub_mgr);
5,824✔
1474
            auto latest_sub_tables = sub_mgr->get_tables_for_latest(tr);
5,824✔
1475
            return [tables = std::move(latest_sub_tables)](const Table& table) {
3,312✔
1476
                if (table.get_table_type() != Table::Type::TopLevel) {
803✔
1477
                    return;
430✔
1478
                }
430✔
1479
                auto object_class_name = Group::table_name_to_class_name(table.get_name());
373✔
1480
                if (tables.find(object_class_name) == tables.end()) {
373✔
1481
                    throw NoSubscriptionForWrite(
2✔
1482
                        util::format("Cannot write to class %1 when no flexible sync subscription has been created.",
2✔
1483
                                     object_class_name));
2✔
1484
                }
2✔
1485
            };
373✔
1486
        });
5,824✔
1487
}
578✔
1488

1489
// Represents a reference to the SyncSession from outside of the sync subsystem.
1490
// We attempt to keep the SyncSession in an active state as long as it has an external reference.
1491
class SyncSession::ExternalReference {
1492
public:
1493
    ExternalReference(std::shared_ptr<SyncSession> session)
1494
        : m_session(std::move(session))
1495
    {
1,540✔
1496
    }
1,540✔
1497

1498
    ~ExternalReference()
1499
    {
1,540✔
1500
        m_session->did_drop_external_reference();
1,540✔
1501
    }
1,540✔
1502

1503
private:
1504
    std::shared_ptr<SyncSession> m_session;
1505
};
1506

1507
std::shared_ptr<SyncSession> SyncSession::external_reference()
1508
{
1,772✔
1509
    util::CheckedLockGuard lock(m_external_reference_mutex);
1,772✔
1510

800✔
1511
    if (auto external_reference = m_external_reference.lock())
1,772✔
1512
        return std::shared_ptr<SyncSession>(external_reference, this);
232✔
1513

684✔
1514
    auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
1,540✔
1515
    m_external_reference = external_reference;
1,540✔
1516
    return std::shared_ptr<SyncSession>(external_reference, this);
1,540✔
1517
}
1,540✔
1518

1519
std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
1520
{
2,471✔
1521
    util::CheckedLockGuard lock(m_external_reference_mutex);
2,471✔
1522

1,031✔
1523
    if (auto external_reference = m_external_reference.lock())
2,471✔
1524
        return std::shared_ptr<SyncSession>(external_reference, this);
976✔
1525

660✔
1526
    return nullptr;
1,495✔
1527
}
1,495✔
1528

1529
void SyncSession::did_drop_external_reference()
1530
{
1,540✔
1531
    util::CheckedUniqueLock lock1(m_state_mutex);
1,540✔
1532
    {
1,540✔
1533
        util::CheckedLockGuard lock2(m_external_reference_mutex);
1,540✔
1534

684✔
1535
        // If the session is being resurrected we should not close the session.
684✔
1536
        if (!m_external_reference.expired())
1,540✔
1537
            return;
×
1538
    }
1,540✔
1539

684✔
1540
    close(std::move(lock1));
1,540✔
1541
}
1,540✔
1542

1543
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1544
                                                 NotifierType direction, bool is_streaming)
1545
{
48✔
1546
    util::UniqueFunction<void()> invocation;
48✔
1547
    uint64_t token_value = 0;
48✔
1548
    {
48✔
1549
        std::lock_guard<std::mutex> lock(m_mutex);
48✔
1550
        token_value = m_progress_notifier_token++;
48✔
1551
        NotifierPackage package{std::move(notifier), util::none, m_local_transaction_version, is_streaming,
48✔
1552
                                direction == NotifierType::download};
48✔
1553
        if (!m_current_progress) {
48✔
1554
            // Simply register the package, since we have no data yet.
6✔
1555
            m_packages.emplace(token_value, std::move(package));
12✔
1556
            return token_value;
12✔
1557
        }
12✔
1558
        bool skip_registration = false;
36✔
1559
        invocation = package.create_invocation(*m_current_progress, skip_registration);
36✔
1560
        if (skip_registration) {
36✔
1561
            token_value = 0;
10✔
1562
        }
10✔
1563
        else {
26✔
1564
            m_packages.emplace(token_value, std::move(package));
26✔
1565
        }
26✔
1566
    }
36✔
1567
    invocation();
36✔
1568
    return token_value;
36✔
1569
}
36✔
1570

1571
void SyncProgressNotifier::unregister_callback(uint64_t token)
1572
{
8✔
1573
    std::lock_guard<std::mutex> lock(m_mutex);
8✔
1574
    m_packages.erase(token);
8✔
1575
}
8✔
1576

1577
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1578
                                  uint64_t download_version, uint64_t snapshot_version)
1579
{
7,051✔
1580
    // Ignore progress messages from before we first receive a DOWNLOAD message
2,975✔
1581
    if (download_version == 0)
7,051✔
1582
        return;
3,017✔
1583

1,776✔
1584
    std::vector<util::UniqueFunction<void()>> invocations;
4,034✔
1585
    {
4,034✔
1586
        std::lock_guard<std::mutex> lock(m_mutex);
4,034✔
1587
        m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, snapshot_version};
4,034✔
1588

1,776✔
1589
        for (auto it = m_packages.begin(); it != m_packages.end();) {
4,100✔
1590
            bool should_delete = false;
66✔
1591
            invocations.emplace_back(it->second.create_invocation(*m_current_progress, should_delete));
66✔
1592
            it = should_delete ? m_packages.erase(it) : std::next(it);
57✔
1593
        }
66✔
1594
    }
4,034✔
1595
    // Run the notifiers only after we've released the lock.
1,776✔
1596
    for (auto& invocation : invocations)
4,034✔
1597
        invocation();
66✔
1598
}
4,034✔
1599

1600
void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
1601
{
8,361✔
1602
    std::lock_guard<std::mutex> lock(m_mutex);
8,361✔
1603
    m_local_transaction_version = snapshot_version;
8,361✔
1604
}
8,361✔
1605

1606
util::UniqueFunction<void()>
1607
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1608
{
102✔
1609
    uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
83✔
1610
    uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
83✔
1611
    if (!is_streaming) {
102✔
1612
        // If the sync client has not yet processed all of the local
35✔
1613
        // transactions then the uploadable data is incorrect and we should
35✔
1614
        // not invoke the callback
35✔
1615
        if (!is_download && snapshot_version > current_progress.snapshot_version)
70✔
1616
            return [] {};
2✔
1617

34✔
1618
        // The initial download size we get from the server is the uncompacted
34✔
1619
        // size, and so the download may complete before we actually receive
34✔
1620
        // that much data. When that happens, transferrable will drop and we
34✔
1621
        // need to use the new value instead of the captured one.
34✔
1622
        if (!captured_transferrable || *captured_transferrable > transferrable)
68✔
1623
            captured_transferrable = transferrable;
36✔
1624
        transferrable = *captured_transferrable;
68✔
1625
    }
68✔
1626

51✔
1627
    // A notifier is expired if at least as many bytes have been transferred
51✔
1628
    // as were originally considered transferrable.
51✔
1629
    is_expired = !is_streaming && transferred >= transferrable;
101✔
1630
    return [=, notifier = notifier] {
100✔
1631
        notifier(transferred, transferrable);
100✔
1632
    };
100✔
1633
}
102✔
1634

1635
uint64_t SyncSession::ConnectionChangeNotifier::add_callback(std::function<ConnectionStateChangeCallback> callback)
1636
{
6✔
1637
    std::lock_guard<std::mutex> lock(m_callback_mutex);
6✔
1638
    auto token = m_next_token++;
6✔
1639
    m_callbacks.push_back({std::move(callback), token});
6✔
1640
    return token;
6✔
1641
}
6✔
1642

1643
void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
1644
{
2✔
1645
    Callback old;
2✔
1646
    {
2✔
1647
        std::lock_guard<std::mutex> lock(m_callback_mutex);
2✔
1648
        auto it = std::find_if(begin(m_callbacks), end(m_callbacks), [=](const auto& c) {
2✔
1649
            return c.token == token;
2✔
1650
        });
2✔
1651
        if (it == end(m_callbacks)) {
2✔
1652
            return;
×
1653
        }
×
1654

1✔
1655
        size_t idx = distance(begin(m_callbacks), it);
2✔
1656
        if (m_callback_index != npos) {
2✔
1657
            if (m_callback_index >= idx)
×
1658
                --m_callback_index;
×
1659
        }
×
1660
        --m_callback_count;
2✔
1661

1✔
1662
        old = std::move(*it);
2✔
1663
        m_callbacks.erase(it);
2✔
1664
    }
2✔
1665
}
2✔
1666

1667
void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
1668
{
5,447✔
1669
    std::unique_lock lock(m_callback_mutex);
5,447✔
1670
    m_callback_count = m_callbacks.size();
5,447✔
1671
    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
5,451✔
1672
        // acquire a local reference to the callback so that removing the
2✔
1673
        // callback from within it can't result in a dangling pointer
2✔
1674
        auto cb = m_callbacks[m_callback_index].fn;
4✔
1675
        lock.unlock();
4✔
1676
        cb(old_state, new_state);
4✔
1677
        lock.lock();
4✔
1678
    }
4✔
1679
    m_callback_index = npos;
5,447✔
1680
}
5,447✔
1681

1682
util::Future<std::string> SyncSession::send_test_command(std::string body)
1683
{
30✔
1684
    util::CheckedLockGuard lk(m_state_mutex);
30✔
1685
    if (!m_session) {
30✔
1686
        return Status{ErrorCodes::RuntimeError, "Session doesn't exist to send test command on"};
×
1687
    }
×
1688

15✔
1689
    return m_session->send_test_command(std::move(body));
30✔
1690
}
30✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc