• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2213

10 Apr 2024 11:21PM UTC coverage: 91.792% (-0.8%) from 92.623%
2213

push

Evergreen

web-flow
Add missing availability checks for SecCopyErrorMessageString (#7577)

This requires iOS 11.3 and we currently target iOS 11.

94842 of 175770 branches covered (53.96%)

7 of 22 new or added lines in 2 files covered. (31.82%)

1861 existing lines in 82 files now uncovered.

242866 of 264583 relevant lines covered (91.79%)

5593111.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.85
/src/realm/object-store/sync/sync_session.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/object-store/thread_safe_reference.hpp>
22
#include <realm/object-store/impl/realm_coordinator.hpp>
23
#include <realm/object-store/sync/app.hpp>
24
#include <realm/object-store/sync/impl/sync_client.hpp>
25
#include <realm/object-store/sync/impl/sync_file.hpp>
26
#include <realm/object-store/sync/impl/app_metadata.hpp>
27
#include <realm/object-store/sync/sync_manager.hpp>
28
#include <realm/object-store/sync/sync_user.hpp>
29
#include <realm/object-store/util/scheduler.hpp>
30

31
#include <realm/db_options.hpp>
32
#include <realm/sync/client.hpp>
33
#include <realm/sync/config.hpp>
34
#include <realm/sync/network/http.hpp>
35
#include <realm/sync/network/websocket_error.hpp>
36
#include <realm/sync/noinst/client_history_impl.hpp>
37
#include <realm/sync/noinst/client_reset_operation.hpp>
38
#include <realm/sync/noinst/migration_store.hpp>
39
#include <realm/sync/noinst/sync_schema_migration.hpp>
40
#include <realm/sync/protocol.hpp>
41

42
using namespace realm;
43
using namespace realm::_impl;
44

45
using SessionWaiterPointer = void (sync::Session::*)(util::UniqueFunction<void(std::error_code)>);
46

47
constexpr const char SyncError::c_original_file_path_key[];
48
constexpr const char SyncError::c_recovery_file_path_key[];
49

50
/// STATES:
51
///
52
/// WAITING_FOR_ACCESS_TOKEN: a request has been initiated to ask
53
/// for an updated access token and the session is waiting for a response.
54
/// From: INACTIVE, DYING
55
/// To:
56
///    * ACTIVE: when the SDK successfully refreshes the token
57
///    * INACTIVE: if asked to log out, or if asked to close
58
///
59
/// ACTIVE: the session is connected to the Sync Server and is actively
60
/// transferring data.
61
/// From: INACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
62
/// To:
63
///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
64
///                is Immediate.
65
///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
66
///
67
/// DYING: the session is performing clean-up work in preparation to be destroyed.
68
/// From: ACTIVE
69
/// To:
70
///    * INACTIVE: when the clean-up work completes, if the session wasn't
71
///                revived, or if explicitly asked to log out before the
72
///                clean-up work begins
73
///    * ACTIVE: if the session is revived
74
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
75
///                                but the token is invalid or expired.
76
///
77
/// INACTIVE: the user owning this session has logged out, the `sync::Session`
78
/// owned by this session is destroyed, and the session is quiescent.
79
/// Note that a session briefly enters this state before being destroyed, but
80
/// it can also enter this state and stay there if the user has been logged out.
81
/// From: initial, ACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
82
/// To:
83
///    * ACTIVE: if the session is revived
84
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
85
///                                but the token is invalid or expired.
86

87
void SyncSession::become_active()
88
{
1,987✔
89
    REALM_ASSERT(m_state != State::Active);
1,987✔
90
    m_state = State::Active;
1,987✔
91

906✔
92
    // First time the session becomes active, register a notification on the sentinel subscription set to restart the
906✔
93
    // session and update to native FLX.
906✔
94
    if (m_migration_sentinel_query_version) {
1,987✔
95
        m_flx_subscription_store->get_by_version(*m_migration_sentinel_query_version)
2✔
96
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
97
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
2✔
98
                if (!s.is_ok()) {
2✔
99
                    return;
×
100
                }
×
101
                REALM_ASSERT(s.get_value() == sync::SubscriptionSet::State::Complete);
2✔
102
                if (auto strong_self = weak_self.lock()) {
2✔
103
                    strong_self->m_migration_store->cancel_migration();
2✔
104
                    strong_self->restart_session();
2✔
105
                }
2✔
106
            });
2✔
107
        m_migration_sentinel_query_version.reset();
2✔
108
    }
2✔
109

906✔
110
    // when entering from the Dying state the session will still be bound
906✔
111
    if (!m_session) {
1,987✔
112
        create_sync_session();
1,985✔
113
        m_session->bind();
1,985✔
114
    }
1,985✔
115

906✔
116
    // Register all the pending wait-for-completion blocks. This can
906✔
117
    // potentially add a redundant callback if we're coming from the Dying
906✔
118
    // state, but that's okay (we won't call the user callbacks twice).
906✔
119
    SyncSession::CompletionCallbacks callbacks_to_register;
1,987✔
120
    std::swap(m_completion_callbacks, callbacks_to_register);
1,987✔
121

906✔
122
    for (auto& [id, callback_tuple] : callbacks_to_register) {
1,140✔
123
        add_completion_callback(std::move(callback_tuple.second), callback_tuple.first);
453✔
124
    }
453✔
125
}
1,987✔
126

127
void SyncSession::become_dying(util::CheckedUniqueLock lock)
128
{
84✔
129
    REALM_ASSERT(m_state != State::Dying);
84✔
130
    m_state = State::Dying;
84✔
131

34✔
132
    // If we have no session, we cannot possibly upload anything.
34✔
133
    if (!m_session) {
84✔
134
        become_inactive(std::move(lock));
×
135
        return;
×
136
    }
×
137

34✔
138
    size_t current_death_count = ++m_death_count;
84✔
139
    m_session->async_wait_for_upload_completion([weak_session = weak_from_this(), current_death_count](Status) {
84✔
140
        if (auto session = weak_session.lock()) {
84✔
141
            util::CheckedUniqueLock lock(session->m_state_mutex);
83✔
142
            if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
83✔
143
                session->become_inactive(std::move(lock));
30✔
144
            }
30✔
145
        }
83✔
146
    });
84✔
147
    m_state_mutex.unlock(lock);
84✔
148
}
84✔
149

150
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status, bool cancel_subscription_notifications)
151
{
1,719✔
152
    REALM_ASSERT(m_state != State::Inactive);
1,719✔
153
    m_state = State::Inactive;
1,719✔
154

772✔
155
    do_become_inactive(std::move(lock), status, cancel_subscription_notifications);
1,719✔
156
}
1,719✔
157

158
void SyncSession::become_paused(util::CheckedUniqueLock lock)
159
{
252✔
160
    REALM_ASSERT(m_state != State::Paused);
252✔
161
    auto old_state = m_state;
252✔
162
    m_state = State::Paused;
252✔
163

126✔
164
    // Nothing to do if we're already inactive besides update the state.
126✔
165
    if (old_state == State::Inactive) {
252✔
166
        m_state_mutex.unlock(lock);
2✔
167
        return;
2✔
168
    }
2✔
169

125✔
170
    do_become_inactive(std::move(lock), Status::OK(), true);
250✔
171
}
250✔
172

173
void SyncSession::restart_session()
174
{
22✔
175
    util::CheckedUniqueLock lock(m_state_mutex);
22✔
176
    switch (m_state) {
22✔
177
        case State::Active:
22✔
178
            do_restart_session(std::move(lock));
22✔
179
            break;
22✔
180
        case State::WaitingForAccessToken:
✔
181
        case State::Paused:
✔
182
        case State::Dying:
✔
183
        case State::Inactive:
✔
184
            return;
×
185
    }
22✔
186
}
22✔
187

188
void SyncSession::do_restart_session(util::CheckedUniqueLock)
189
{
22✔
190
    // Go straight to inactive so the progress completion waiters will
11✔
191
    // continue to wait until the session restarts and completes the
11✔
192
    // upload/download sync
11✔
193
    m_state = State::Inactive;
22✔
194

11✔
195
    if (m_session) {
22✔
196
        m_session.reset();
22✔
197
    }
22✔
198

11✔
199
    // Create a new session and re-register the completion callbacks
11✔
200
    // The latest server path will be retrieved from sync_manager when
11✔
201
    // the new session is created by create_sync_session() in become
11✔
202
    // active.
11✔
203
    become_active();
22✔
204
}
22✔
205

206
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status,
207
                                     bool cancel_subscription_notifications)
208
{
1,969✔
209
    // Manually set the disconnected state. Sync would also do this, but
897✔
210
    // since the underlying SyncSession object already have been destroyed,
897✔
211
    // we are not able to get the callback.
897✔
212
    util::CheckedUniqueLock connection_state_lock(m_connection_state_mutex);
1,969✔
213
    auto old_state = m_connection_state;
1,969✔
214
    auto new_state = m_connection_state = SyncSession::ConnectionState::Disconnected;
1,969✔
215
    connection_state_lock.unlock();
1,969✔
216

897✔
217
    SyncSession::CompletionCallbacks waits;
1,969✔
218
    std::swap(waits, m_completion_callbacks);
1,969✔
219

897✔
220
    m_session = nullptr;
1,969✔
221
    if (m_sync_manager) {
1,969✔
222
        m_sync_manager->unregister_session(m_db->get_path());
1,969✔
223
    }
1,969✔
224

897✔
225
    auto subscription_store = m_flx_subscription_store;
1,969✔
226
    m_state_mutex.unlock(lock);
1,969✔
227

897✔
228
    // Send notifications after releasing the lock to prevent deadlocks in the callback.
897✔
229
    if (old_state != new_state) {
1,969✔
230
        m_connection_change_notifier.invoke_callbacks(old_state, connection_state());
1,558✔
231
    }
1,558✔
232

897✔
233
    if (status.is_ok())
1,969✔
234
        status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");
1,852✔
235

897✔
236
    if (subscription_store && cancel_subscription_notifications) {
1,969✔
237
        subscription_store->notify_all_state_change_notifications(status);
666✔
238
    }
666✔
239

897✔
240
    // Inform any queued-up completion handlers that they were cancelled.
897✔
241
    for (auto& [id, callback] : waits)
1,969✔
242
        callback.second(status);
82✔
243
}
1,969✔
244

245
void SyncSession::become_waiting_for_access_token()
246
{
20✔
247
    REALM_ASSERT(m_state != State::WaitingForAccessToken);
20✔
248
    m_state = State::WaitingForAccessToken;
20✔
249
}
20✔
250

251
void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
252
{
18✔
253
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
9✔
254
    {
18✔
255
        util::CheckedUniqueLock lock(m_state_mutex);
18✔
256
        cancel_pending_waits(std::move(lock), status);
18✔
257
    }
18✔
258
    if (user) {
18✔
259
        user->request_log_out();
18✔
260
    }
18✔
261

9✔
262
    if (auto error_handler = config(&SyncConfig::error_handler)) {
18✔
263
        auto user_facing_error = SyncError({ErrorCodes::AuthError, status.reason()}, true);
18✔
264
        error_handler(shared_from_this(), std::move(user_facing_error));
18✔
265
    }
18✔
266
}
18✔
267

268
static bool check_for_auth_failure(const app::AppError& error)
269
{
42✔
270
    using namespace realm::sync;
42✔
271
    // Auth failure is returned as a 401 (unauthorized) or 403 (forbidden) response
21✔
272
    if (error.additional_status_code) {
42✔
273
        auto status_code = HTTPStatus(*error.additional_status_code);
42✔
274
        if (status_code == HTTPStatus::Unauthorized || status_code == HTTPStatus::Forbidden)
42✔
275
            return true;
16✔
276
    }
26✔
277

13✔
278
    return false;
26✔
279
}
26✔
280

281
static bool check_for_redirect_response(const app::AppError& error)
282
{
26✔
283
    using namespace realm::sync;
26✔
284
    // Check for unhandled 301/308 permanent redirect response
13✔
285
    if (error.additional_status_code) {
26✔
286
        auto status_code = HTTPStatus(*error.additional_status_code);
26✔
287
        if (status_code == HTTPStatus::MovedPermanently || status_code == HTTPStatus::PermanentRedirect)
26✔
288
            return true;
×
289
    }
26✔
290

13✔
291
    return false;
26✔
292
}
26✔
293

294
util::UniqueFunction<void(util::Optional<app::AppError>)>
295
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
296
{
70✔
297
    return [session, restart_session](util::Optional<app::AppError> error) {
70✔
298
        auto session_user = session->user();
70✔
299
        if (!session_user) {
70✔
300
            util::CheckedUniqueLock lock(session->m_state_mutex);
×
301
            auto refresh_error = error ? error->to_status() : Status::OK();
×
302
            session->cancel_pending_waits(std::move(lock), refresh_error);
×
303
        }
×
304
        else if (error) {
70✔
305
            if (ErrorCodes::error_categories(error->code()).test(ErrorCategory::client_error)) {
44✔
306
                // any other client errors other than app_deallocated are considered fatal because
1✔
307
                // there was a problem locally before even sending the request to the server
1✔
308
                // eg. ClientErrorCode::user_not_found, ClientErrorCode::user_not_logged_in,
1✔
309
                // ClientErrorCode::too_many_redirects
1✔
310
                session->handle_bad_auth(session_user, error->to_status());
2✔
311
            }
2✔
312
            else if (check_for_auth_failure(*error)) {
42✔
313
                // A 401 response on a refresh request means that the token cannot be refreshed and we should not
8✔
314
                // retry. This can be because an admin has revoked this user's sessions, the user has been disabled,
8✔
315
                // or the refresh token has expired according to the server's clock.
8✔
316
                session->handle_bad_auth(
16✔
317
                    session_user,
16✔
318
                    {error->code(), util::format("Unable to refresh the user access token: %1", error->reason())});
16✔
319
            }
16✔
320
            else if (check_for_redirect_response(*error)) {
26✔
321
                // A 301 or 308 response is an unhandled permanent redirect response (which should not happen) - if
322
                // this is received, fail the request with an appropriate error message.
323
                // Temporary redirect responses (302, 307) are not supported
UNCOV
324
                session->handle_bad_auth(
×
UNCOV
325
                    session_user,
×
UNCOV
326
                    {error->code(), util::format("Unhandled redirect response when trying to reach the server: %1",
×
327
                                                 error->reason())});
×
328
            }
×
329
            else {
26✔
330
                // A refresh request has failed. This is an unexpected non-fatal error and we would
13✔
331
                // like to retry but we shouldn't do this immediately in order to not swamp the
13✔
332
                // server with requests. Consider two scenarios:
13✔
333
                // 1) If this request was spawned from the proactive token check, or a user
13✔
334
                // initiated request, the token may actually be valid. Just advance to Active
13✔
335
                // from WaitingForAccessToken if needed and let the sync server tell us if the
13✔
336
                // token is valid or not. If this also fails we will end up in case 2 below.
13✔
337
                // 2) If the sync connection initiated the request because the server is
13✔
338
                // unavailable or the connection otherwise encounters an unexpected error, we want
13✔
339
                // to let the sync client attempt to reinitialize the connection using its own
13✔
340
                // internal backoff timer which will happen automatically so nothing needs to
13✔
341
                // happen here.
13✔
342
                util::CheckedUniqueLock lock(session->m_state_mutex);
26✔
343
                if (session->m_state == State::WaitingForAccessToken) {
26✔
344
                    session->become_active();
2✔
345
                }
2✔
346
            }
26✔
347
        }
44✔
348
        else {
26✔
349
            // If the session needs to be restarted, then restart the session now
13✔
350
            // The latest access token and server url will be pulled from the sync
13✔
351
            // manager when the new session is started.
13✔
352
            if (restart_session) {
26✔
353
                session->restart_session();
14✔
354
            }
14✔
355
            // Otherwise, update the access token and reconnect
6✔
356
            else {
12✔
357
                session->update_access_token(session_user->access_token());
12✔
358
            }
12✔
359
        }
26✔
360
    };
70✔
361
}
70✔
362

363
SyncSession::SyncSession(Private, SyncClient& client, std::shared_ptr<DB> db, const RealmConfig& config,
364
                         SyncManager* sync_manager)
365
    : m_config{config}
366
    , m_db{std::move(db)}
367
    , m_original_sync_config{m_config.sync_config}
368
    , m_migration_store{sync::MigrationStore::create(m_db)}
369
    , m_client(client)
370
    , m_sync_manager(sync_manager)
371
{
1,566✔
372
    REALM_ASSERT(m_config.sync_config);
1,566✔
373
    // we don't want the following configs enabled during a client reset
697✔
374
    m_config.scheduler = nullptr;
1,566✔
375
    m_config.audit_config = nullptr;
1,566✔
376

697✔
377
    // Adjust the sync_config if using PBS sync and already in the migrated or rollback state
697✔
378
    if (m_migration_store->is_migrated() || m_migration_store->is_rollback_in_progress()) {
1,566✔
379
        m_config.sync_config = sync::MigrationStore::convert_sync_config_to_flx(m_original_sync_config);
8✔
380
    }
8✔
381

697✔
382
    // If using FLX, set up m_flx_subscription_store and the history_write_validator
697✔
383
    if (m_config.sync_config->flx_sync_requested) {
1,566✔
384
        create_subscription_store();
576✔
385
        std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
576✔
386
        set_write_validator_factory(weak_sub_mgr);
576✔
387
    }
576✔
388

697✔
389
    // After a migration to FLX, if the user opens the realm with a flexible sync configuration, we need to first
697✔
390
    // upload any unsynced changes before updating to native FLX.
697✔
391
    // A subscription set is used as sentinel so we know when to stop uploading.
697✔
392
    // Note: Currently, a sentinel subscription set is always created even if there is nothing to upload.
697✔
393
    if (m_migration_store->is_migrated() && m_original_sync_config->flx_sync_requested) {
1,566✔
394
        m_migration_store->create_sentinel_subscription_set(*m_flx_subscription_store);
2✔
395
        m_migration_sentinel_query_version = m_migration_store->get_sentinel_subscription_set_version();
2✔
396
        REALM_ASSERT(m_migration_sentinel_query_version);
2✔
397
    }
2✔
398
}
1,566✔
399

400
void SyncSession::detach_from_sync_manager()
401
{
2✔
402
    shutdown_and_wait();
2✔
403
    util::CheckedLockGuard lk(m_state_mutex);
2✔
404
    m_sync_manager = nullptr;
2✔
405
}
2✔
406

407
void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
408
{
74✔
409
    util::CheckedLockGuard config_lock(m_config_mutex);
74✔
410
    // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
37✔
411
    auto original_path = path();
74✔
412
    error.user_info[SyncError::c_original_file_path_key] = original_path;
74✔
413
    using Action = SyncFileAction;
74✔
414
    auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
74✔
415
    std::string recovery_path = m_config.sync_config->user->create_file_action(
74✔
416
        action, original_path, m_config.sync_config->recovery_directory);
74✔
417
    if (should_backup == ShouldBackup::yes) {
74✔
418
        error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
74✔
419
    }
74✔
420
}
74✔
421

422
void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
423
{
188✔
424
    // first check that recovery will not be prevented
94✔
425
    if (server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
188✔
426
        auto mode = config(&SyncConfig::client_resync_mode);
4✔
427
        if (mode == ClientResyncMode::Recover) {
4✔
428
            handle_fresh_realm_downloaded(
2✔
429
                nullptr,
2✔
430
                {ErrorCodes::RuntimeError,
2✔
431
                 "A client reset is required but the server does not permit recovery for this client"},
2✔
432
                server_requests_action);
2✔
433
            return;
2✔
434
        }
2✔
435
    }
186✔
436

93✔
437
    std::vector<char> encryption_key;
186✔
438
    {
186✔
439
        util::CheckedLockGuard lock(m_config_mutex);
186✔
440
        encryption_key = m_config.encryption_key;
186✔
441
    }
186✔
442

93✔
443
    DBOptions options;
186✔
444
    options.allow_file_format_upgrade = false;
186✔
445
    options.enable_async_writes = false;
186✔
446
    if (!encryption_key.empty())
186✔
447
        options.encryption_key = encryption_key.data();
2✔
448

93✔
449
    DBRef db;
186✔
450
    auto fresh_path = client_reset::get_fresh_path_for(m_db->get_path());
186✔
451
    try {
186✔
452
        // We want to attempt to use a pre-existing file to reduce the chance of
93✔
453
        // downloading the first part of the file only to then delete it over
93✔
454
        // and over, but if we fail to open it then we should just start over.
93✔
455
        try {
186✔
456
            db = DB::create(sync::make_client_replication(), fresh_path, options);
186✔
457
        }
186✔
458
        catch (...) {
98✔
459
            util::File::try_remove(fresh_path);
10✔
460
        }
10✔
461

93✔
462
        if (!db) {
182✔
463
            db = DB::create(sync::make_client_replication(), fresh_path, options);
2✔
464
        }
2✔
465
    }
178✔
466
    catch (...) {
97✔
467
        // Failed to open the fresh path after attempting to delete it, so we
4✔
468
        // just can't do automatic recovery.
4✔
469
        handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
8✔
470
        return;
8✔
471
    }
8✔
472

89✔
473
    util::CheckedLockGuard state_lock(m_state_mutex);
178✔
474
    if (m_state != State::Active) {
178✔
UNCOV
475
        return;
×
UNCOV
476
    }
×
477
    RealmConfig fresh_config;
178✔
478
    {
178✔
479
        util::CheckedLockGuard config_lock(m_config_mutex);
178✔
480
        fresh_config = m_config;
178✔
481
        fresh_config.path = fresh_path;
178✔
482
        // in case of migrations use the migrated config
89✔
483
        auto fresh_sync_config = m_migrated_sync_config ? *m_migrated_sync_config : *m_config.sync_config;
164✔
484
        // deep copy the sync config so we don't modify the live session's config
89✔
485
        fresh_config.sync_config = std::make_shared<SyncConfig>(fresh_sync_config);
178✔
486
        fresh_config.sync_config->client_resync_mode = ClientResyncMode::Manual;
178✔
487
        fresh_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
178✔
488
    }
178✔
489

89✔
490
    auto fresh_sync_session = m_sync_manager->get_session(db, fresh_config);
178✔
491
    auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
178✔
492
    // the fresh Realm may apply writes to this db after it has outlived its sync session
89✔
493
    // the writes are used to generate a changeset for recovery, but are never committed
89✔
494
    history.set_write_validator_factory({});
178✔
495

89✔
496
    fresh_sync_session->assert_mutex_unlocked();
178✔
497
    // The fresh realm uses flexible sync.
89✔
498
    if (auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store()) {
178✔
499
        auto fresh_sub = fresh_sub_store->get_latest();
70✔
500
        // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
35✔
501
        if (auto local_subs_store = m_flx_subscription_store) {
70✔
502
            auto fresh_mut_sub = fresh_sub.make_mutable_copy();
52✔
503
            fresh_mut_sub.import(local_subs_store->get_active());
52✔
504
            fresh_sub = fresh_mut_sub.commit();
52✔
505
        }
52✔
506

35✔
507
        auto self = shared_from_this();
70✔
508
        using SubscriptionState = sync::SubscriptionSet::State;
70✔
509
        fresh_sub.get_state_change_notification(SubscriptionState::Complete)
70✔
510
            .then([=](SubscriptionState) -> util::Future<sync::SubscriptionSet> {
69✔
511
                if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
68✔
512
                    return fresh_sub;
50✔
513
                }
50✔
514
                if (!self->m_migration_store->is_migration_in_progress()) {
18✔
UNCOV
515
                    return fresh_sub;
×
UNCOV
516
                }
×
517

9✔
518
                // fresh_sync_session is using a new realm file that doesn't have the migration_store info
9✔
519
                // so the query string from the local migration store will need to be provided
9✔
520
                auto query_string = self->m_migration_store->get_query_string();
18✔
521
                REALM_ASSERT(query_string);
18✔
522
                // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
9✔
523
                // message.
9✔
524
                fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
18✔
525
                return fresh_sub_store->get_latest()
18✔
526
                    .get_state_change_notification(SubscriptionState::Complete)
18✔
527
                    .then([=](SubscriptionState) {
18✔
528
                        return fresh_sub_store->get_latest();
18✔
529
                    });
18✔
530
            })
18✔
531
            .get_async([=](StatusWith<sync::SubscriptionSet>&& subs) {
70✔
532
                // Keep the sync session alive while it's downloading, but then close
35✔
533
                // it immediately
35✔
534
                fresh_sync_session->force_close();
70✔
535
                if (subs.is_ok()) {
70✔
536
                    self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action,
68✔
537
                                                        std::move(subs.get_value()));
68✔
538
                }
68✔
539
                else {
2✔
540
                    self->handle_fresh_realm_downloaded(nullptr, subs.get_status(), server_requests_action);
2✔
541
                }
2✔
542
            });
70✔
543
    }
70✔
544
    else { // pbs
108✔
545
        fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](Status s) {
108✔
546
            // Keep the sync session alive while it's downloading, but then close
54✔
547
            // it immediately
54✔
548
            fresh_sync_session->force_close();
108✔
549
            if (auto strong_self = weak_self.lock()) {
108✔
550
                strong_self->handle_fresh_realm_downloaded(db, s, server_requests_action);
108✔
551
            }
108✔
552
        });
108✔
553
    }
108✔
554
    fresh_sync_session->revive_if_needed();
178✔
555
}
178✔
556

557
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
558
                                                sync::ProtocolErrorInfo::Action server_requests_action,
559
                                                std::optional<sync::SubscriptionSet> new_subs)
560
{
188✔
561
    util::CheckedUniqueLock lock(m_state_mutex);
188✔
562
    if (m_state != State::Active) {
188✔
UNCOV
563
        return;
×
UNCOV
564
    }
×
565
    // The download can fail for many reasons. For example:
94✔
566
    // - unable to write the fresh copy to the file system
94✔
567
    // - during download of the fresh copy, the fresh copy itself is reset
94✔
568
    // - in FLX mode there was a problem fulfilling the previously active subscription
94✔
569
    if (!status.is_ok()) {
188✔
570
        if (status == ErrorCodes::OperationAborted) {
18✔
571
            return;
6✔
572
        }
6✔
573
        lock.unlock();
12✔
574

6✔
575
        sync::SessionErrorInfo synthetic(
12✔
576
            Status{ErrorCodes::AutoClientResetFailed,
12✔
577
                   util::format("A fatal error occurred during client reset: '%1'", status.reason())},
12✔
578
            sync::IsFatal{true});
12✔
579
        handle_error(synthetic);
12✔
580
        return;
12✔
581
    }
12✔
582

85✔
583
    // Performing a client reset requires tearing down our current
85✔
584
    // sync session and creating a new one with the relevant client reset config. This
85✔
585
    // will result in session completion handlers firing
85✔
586
    // when the old session is torn down, which we don't want as this
85✔
587
    // is supposed to be transparent to the user.
85✔
588
    //
85✔
589
    // To avoid this, we need to move the completion handlers aside temporarily so
85✔
590
    // that moving to the inactive state doesn't clear them - they will be
85✔
591
    // re-registered when the session becomes active again.
85✔
592
    {
170✔
593
        m_server_requests_action = server_requests_action;
170✔
594
        m_client_reset_fresh_copy = db;
170✔
595
        CompletionCallbacks callbacks;
170✔
596
        std::swap(m_completion_callbacks, callbacks);
170✔
597
        // always swap back, even if advance_state throws
85✔
598
        auto guard = util::make_scope_exit([&]() noexcept {
170✔
599
            util::CheckedUniqueLock lock(m_state_mutex);
170✔
600
            if (m_completion_callbacks.empty())
170✔
601
                std::swap(callbacks, m_completion_callbacks);
170✔
UNCOV
602
            else
×
UNCOV
603
                m_completion_callbacks.merge(std::move(callbacks));
×
604
        });
170✔
605
        // Do not cancel the notifications on subscriptions.
85✔
606
        bool cancel_subscription_notifications = false;
170✔
607
        become_inactive(std::move(lock), Status::OK(), cancel_subscription_notifications); // unlocks the lock
170✔
608

85✔
609
        // Once the session is inactive, update sync config and subscription store after migration.
85✔
610
        if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
170✔
611
            server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
161✔
612
            apply_sync_config_after_migration_or_rollback();
26✔
613
            auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
26✔
614
            update_subscription_store(flx_sync_requested, std::move(new_subs));
26✔
615
        }
26✔
616
    }
170✔
617
    revive_if_needed();
170✔
618
}
170✔
619

620
util::Future<void> SyncSession::pause_async()
621
{
28✔
622
    {
28✔
623
        util::CheckedUniqueLock lock(m_state_mutex);
28✔
624
        // Nothing to wait for if the session is already paused or inactive.
14✔
625
        if (m_state == SyncSession::State::Paused || m_state == SyncSession::State::Inactive) {
28✔
UNCOV
626
            return util::Future<void>::make_ready();
×
UNCOV
627
        }
×
628
    }
28✔
629
    // Transition immediately to `paused` state. Calling this function must guarantee that any
14✔
630
    // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
14✔
631
    // must have been destroyed upon return. This allows the caller to follow up with a call to
14✔
632
    // sync::Client::notify_session_terminated() in order to be notified when the Realm file is closed. This works
14✔
633
    // so long as this SyncSession object remains in the `paused` state after the invocation of shutdown().
14✔
634
    pause();
28✔
635
    return m_client.notify_session_terminated();
28✔
636
}
28✔
637

638
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
639
{
16✔
640
    session.handle_error(std::move(error));
16✔
641
}
16✔
642

643
util::Future<void> SyncSession::OnlyForTesting::pause_async(SyncSession& session)
644
{
2✔
645
    return session.pause_async();
2✔
646
}
2✔
647

648
// This method should only be called from within the error handler callback registered upon the underlying
649
// `m_session`.
650
void SyncSession::handle_error(sync::SessionErrorInfo error)
651
{
497✔
652
    enum class NextStateAfterError { none, inactive, error };
497✔
653
    auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
401✔
654
    util::Optional<ShouldBackup> delete_file;
497✔
655
    bool log_out_user = false;
497✔
656
    bool unrecognized_by_client = false;
497✔
657

247✔
658
    if (error.status == ErrorCodes::AutoClientResetFailed) {
497✔
659
        // At this point, automatic recovery has been attempted but it failed.
25✔
660
        // Fallback to a manual reset and let the user try to handle it.
25✔
661
        next_state = NextStateAfterError::inactive;
50✔
662
        delete_file = ShouldBackup::yes;
50✔
663
    }
50✔
664
    else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
447✔
665
        switch (error.server_requests_action) {
431✔
UNCOV
666
            case sync::ProtocolErrorInfo::Action::NoAction:
✔
667
                REALM_UNREACHABLE(); // This is not sent by the MongoDB server
668
            case sync::ProtocolErrorInfo::Action::ApplicationBug:
37✔
669
                [[fallthrough]];
37✔
670
            case sync::ProtocolErrorInfo::Action::ProtocolViolation:
43✔
671
                next_state = NextStateAfterError::inactive;
43✔
672
                break;
43✔
673
            case sync::ProtocolErrorInfo::Action::Warning:
32✔
674
                break; // not fatal, but should be bubbled up to the user below.
30✔
675
            case sync::ProtocolErrorInfo::Action::Transient:
65✔
676
                // Not real errors, don't need to be reported to the binding.
33✔
677
                return;
65✔
678
            case sync::ProtocolErrorInfo::Action::DeleteRealm:
17✔
UNCOV
679
                next_state = NextStateAfterError::inactive;
×
UNCOV
680
                delete_file = ShouldBackup::no;
×
681
                break;
×
682
            case sync::ProtocolErrorInfo::Action::ClientReset:
180✔
683
                [[fallthrough]];
180✔
684
            case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
184✔
685
                switch (config(&SyncConfig::client_resync_mode)) {
184✔
686
                    case ClientResyncMode::Manual:
24✔
687
                        next_state = NextStateAfterError::inactive;
24✔
688
                        delete_file = ShouldBackup::yes;
24✔
689
                        break;
24✔
690
                    case ClientResyncMode::DiscardLocal:
82✔
691
                        [[fallthrough]];
82✔
692
                    case ClientResyncMode::RecoverOrDiscard:
96✔
693
                        [[fallthrough]];
96✔
694
                    case ClientResyncMode::Recover:
160✔
695
                        download_fresh_realm(error.server_requests_action);
160✔
696
                        return; // do not propagate the error to the user at this point
160✔
697
                }
24✔
698
                break;
24✔
699
            case sync::ProtocolErrorInfo::Action::MigrateToFLX:
21✔
700
                // Should not receive this error if original sync config is FLX
9✔
701
                REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
18✔
702
                REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
18✔
703
                // Original config was PBS, migrating to FLX
9✔
704
                m_migration_store->migrate_to_flx(*error.migration_query_string,
18✔
705
                                                  m_original_sync_config->partition_value);
18✔
706
                save_sync_config_after_migration_or_rollback();
18✔
707
                download_fresh_realm(error.server_requests_action);
18✔
708
                return;
18✔
709
            case sync::ProtocolErrorInfo::Action::RevertToPBS:
17✔
710
                // If the client was updated to use FLX natively, but the server was rolled back to PBS,
5✔
711
                // the server should be sending switch_to_flx_sync; throw exception if this error is not
5✔
712
                // received.
5✔
713
                if (m_original_sync_config->flx_sync_requested) {
10✔
UNCOV
714
                    throw LogicError(ErrorCodes::InvalidServerResponse,
×
UNCOV
715
                                     "Received 'RevertToPBS' from server after rollback while client is natively "
×
UNCOV
716
                                     "using FLX - expected 'SwitchToPBS'");
×
UNCOV
717
                }
×
718
                // Original config was PBS, rollback the migration
5✔
719
                m_migration_store->rollback_to_pbs();
10✔
720
                save_sync_config_after_migration_or_rollback();
10✔
721
                download_fresh_realm(error.server_requests_action);
10✔
722
                return;
10✔
723
            case sync::ProtocolErrorInfo::Action::RefreshUser:
16✔
724
                if (auto u = user()) {
16✔
725
                    u->request_access_token(handle_refresh(shared_from_this(), false));
16✔
726
                }
16✔
727
                return;
16✔
728
            case sync::ProtocolErrorInfo::Action::RefreshLocation:
34✔
729
                if (auto u = user()) {
34✔
730
                    u->request_refresh_location(handle_refresh(shared_from_this(), true));
34✔
731
                }
34✔
732
                return;
34✔
733
            case sync::ProtocolErrorInfo::Action::LogOutUser:
5✔
UNCOV
734
                next_state = NextStateAfterError::inactive;
×
UNCOV
735
                log_out_user = true;
×
UNCOV
736
                break;
×
737
            case sync::ProtocolErrorInfo::Action::MigrateSchema:
31✔
738
                util::CheckedUniqueLock lock(m_state_mutex);
31✔
739
                // Should only be received for FLX sync.
15✔
740
                REALM_ASSERT(m_original_sync_config->flx_sync_requested);
31✔
741
                m_previous_schema_version = error.previous_schema_version;
31✔
742
                return; // do not propagate the error to the user at this point
31✔
743
        }
16✔
744
    }
16✔
745
    else {
16✔
746
        // Unrecognized error code.
8✔
747
        unrecognized_by_client = true;
16✔
748
    }
16✔
749

247✔
750
    util::CheckedUniqueLock lock(m_state_mutex);
330✔
751
    SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
163✔
752
    // `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
80✔
753
    sync_error.server_requests_action = error.server_requests_action;
163✔
754
    sync_error.is_unrecognized_by_client = unrecognized_by_client;
163✔
755

80✔
756
    if (delete_file)
163✔
757
        update_error_and_mark_file_for_deletion(sync_error, *delete_file);
74✔
758

80✔
759
    if (m_state == State::Dying && error.is_fatal) {
163✔
760
        become_inactive(std::move(lock), error.status);
2✔
761
        return;
2✔
762
    }
2✔
763

79✔
764
    // Don't bother invoking m_config.error_handler if the sync is inactive.
79✔
765
    // It does not make sense to call the handler when the session is closed.
79✔
766
    if (m_state == State::Inactive || m_state == State::Paused) {
161✔
UNCOV
767
        return;
×
UNCOV
768
    }
×
769

79✔
770
    switch (next_state) {
161✔
771
        case NextStateAfterError::none:
46✔
772
            if (config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
46✔
UNCOV
773
                cancel_pending_waits(std::move(lock), sync_error.status); // unlocks the mutex
×
UNCOV
774
            }
×
775
            break;
46✔
776
        case NextStateAfterError::inactive: {
115✔
777
            become_inactive(std::move(lock), sync_error.status);
115✔
778
            break;
115✔
UNCOV
779
        }
×
UNCOV
780
        case NextStateAfterError::error: {
✔
UNCOV
781
            cancel_pending_waits(std::move(lock), sync_error.status);
×
UNCOV
782
            break;
×
783
        }
161✔
784
    }
161✔
785

79✔
786
    if (log_out_user) {
161✔
UNCOV
787
        if (auto u = user())
×
UNCOV
788
            u->request_log_out();
×
UNCOV
789
    }
×
790

79✔
791
    if (auto error_handler = config(&SyncConfig::error_handler)) {
161✔
792
        error_handler(shared_from_this(), std::move(sync_error));
161✔
793
    }
161✔
794
}
161✔
795

796
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error)
797
{
18✔
798
    CompletionCallbacks callbacks;
18✔
799
    std::swap(callbacks, m_completion_callbacks);
18✔
800

9✔
801
    // Inform any waiters on pending subscription states that they were cancelled
9✔
802
    if (m_flx_subscription_store) {
18✔
UNCOV
803
        auto subscription_store = m_flx_subscription_store;
×
804
        m_state_mutex.unlock(lock);
×
805
        subscription_store->notify_all_state_change_notifications(error);
×
806
    }
×
807
    else {
18✔
808
        m_state_mutex.unlock(lock);
18✔
809
    }
18✔
810

9✔
811
    // Inform any queued-up completion handlers that they were cancelled.
9✔
812
    for (auto& [id, callback] : callbacks)
18✔
813
        callback.second(error);
14✔
814
}
18✔
815

816
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
817
                                         uint64_t uploadable, uint64_t snapshot_version, double download_estimate,
818
                                         double upload_estimate)
819
{
4,959✔
820
    m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, snapshot_version, download_estimate,
4,959✔
821
                               upload_estimate);
4,959✔
822
}
4,959✔
823

824
static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
825
                                                                   const std::shared_ptr<SyncConfig>& sync_config,
826
                                                                   DBRef&& fresh_copy, bool recovery_is_allowed,
827
                                                                   bool schema_migration_detected)
828
{
170✔
829
    REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);
170✔
830

85✔
831
    sync::Session::Config::ClientReset config;
170✔
832
    config.mode = sync_config->client_resync_mode;
170✔
833
    config.fresh_copy = std::move(fresh_copy);
170✔
834
    config.recovery_is_allowed = recovery_is_allowed;
170✔
835

85✔
836
    // The conditions here are asymmetric because if we have *either* a before
85✔
837
    // or after callback we need to make sure to initialize the local schema
85✔
838
    // before the client reset happens.
85✔
839
    if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
170✔
840
        return config;
28✔
841

71✔
842
    // We cannot initialize the local schema in case of a sync schema migration.
71✔
843
    // Currently, a schema migration involves breaking changes so opening the realm
71✔
844
    // with the new schema results in a crash.
71✔
845
    if (schema_migration_detected)
142✔
846
        return config;
4✔
847

69✔
848
    RealmConfig realm_config = base_config;
138✔
849
    realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
138✔
850
    realm_config.scheduler = util::Scheduler::make_dummy();
138✔
851

69✔
852
    if (sync_config->notify_after_client_reset) {
138✔
853
        config.notify_after_client_reset = [realm_config](VersionID previous_version, bool did_recover) {
117✔
854
            auto coordinator = _impl::RealmCoordinator::get_coordinator(realm_config);
100✔
855
            ThreadSafeReference active_after = coordinator->get_unbound_realm();
100✔
856
            SharedRealm frozen_before = coordinator->get_realm(realm_config, previous_version);
100✔
857
            REALM_ASSERT(frozen_before);
100✔
858
            REALM_ASSERT(frozen_before->is_frozen());
100✔
859
            realm_config.sync_config->notify_after_client_reset(std::move(frozen_before), std::move(active_after),
100✔
860
                                                                did_recover);
100✔
861
        };
100✔
862
    }
134✔
863
    config.notify_before_client_reset = [config = std::move(realm_config)]() -> VersionID {
138✔
864
        // Opening the Realm live here may make a write if the schema is different
69✔
865
        // than what exists on disk. It is necessary to pass a fully usable Realm
69✔
866
        // to the user here. Note that the schema changes made here will be considered
69✔
867
        // an "offline write" to be recovered if this is recovery mode.
69✔
868
        auto before = Realm::get_shared_realm(config);
138✔
869
        if (auto& notify_before = config.sync_config->notify_before_client_reset) {
138✔
870
            notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
134✔
871
        }
136✔
872
        // Note that if the SDK wrote to the Realm (hopefully by requesting a
69✔
873
        // live instance and not opening a secondary one), this may be a
69✔
874
        // different version than what we had before calling the callback.
69✔
875
        before->refresh();
138✔
876
        return before->read_transaction_version();
138✔
877
    };
138✔
878

69✔
879
    return config;
138✔
880
}
138✔
881

882
void SyncSession::create_sync_session()
883
{
1,985✔
884
    if (m_session)
1,985✔
UNCOV
885
        return;
×
886

905✔
887
    util::CheckedLockGuard config_lock(m_config_mutex);
1,985✔
888

905✔
889
    REALM_ASSERT(m_config.sync_config);
1,985✔
890
    SyncConfig& sync_config = *m_config.sync_config;
1,985✔
891
    REALM_ASSERT(sync_config.user);
1,985✔
892

905✔
893
    sync::Session::Config session_config;
1,985✔
894
    session_config.signed_user_token = sync_config.user->access_token();
1,985✔
895
    session_config.user_id = sync_config.user->user_id();
1,985✔
896
    session_config.realm_identifier = sync_config.partition_value;
1,985✔
897
    session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
1,985✔
898
    session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
1,985✔
899
    session_config.ssl_verify_callback = sync_config.ssl_verify_callback;
1,985✔
900
    session_config.proxy_config = sync_config.proxy_config;
1,985✔
901
    session_config.simulate_integration_error = sync_config.simulate_integration_error;
1,985✔
902
    session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
1,985✔
903
    session_config.session_reason =
1,985✔
904
        client_reset::is_fresh_path(m_config.path) ? sync::SessionReason::ClientReset : sync::SessionReason::Sync;
1,900✔
905
    session_config.schema_version = m_config.schema_version;
1,985✔
906

905✔
907
    if (sync_config.on_sync_client_event_hook) {
1,985✔
908
        session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
108✔
909
                                                    anchor = weak_from_this()](const SyncClientHookData& data) {
1,060✔
910
            return hook(anchor, data);
1,060✔
911
        };
1,060✔
912
    }
108✔
913

905✔
914
    {
1,985✔
915
        // At this point the sync route was either updated when the first App request was performed, or
905✔
916
        // was populated by a generated value that will be used for first contact. If the generated sync
905✔
917
        // route is not correct, either a redirection will be received or the connection will fail,
905✔
918
        // resulting in an update to both the access token and the location.
905✔
919
        auto [sync_route, verified] = m_sync_manager->sync_route();
1,985✔
920
        REALM_ASSERT_EX(!sync_route.empty(), "Server URL cannot be empty");
1,985✔
921

905✔
922
        if (!m_client.decompose_server_url(sync_route, session_config.protocol_envelope,
1,985✔
923
                                           session_config.server_address, session_config.server_port,
1,985✔
924
                                           session_config.service_identifier)) {
905✔
UNCOV
925
            throw sync::BadServerUrl(sync_route);
×
UNCOV
926
        }
×
927
        session_config.server_verified = verified;
1,985✔
928

905✔
929
        m_server_url = sync_route;
1,985✔
930
        m_server_url_verified = verified;
1,985✔
931
    }
1,985✔
932

905✔
933
    if (sync_config.authorization_header_name) {
1,985✔
UNCOV
934
        session_config.authorization_header_name = *sync_config.authorization_header_name;
×
UNCOV
935
    }
×
936
    session_config.custom_http_headers = sync_config.custom_http_headers;
1,985✔
937

905✔
938
    if (m_server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
1,985✔
939
        // Migrations are allowed to recover local data.
85✔
940
        const bool allowed_to_recover = m_server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset ||
170✔
941
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
99✔
942
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
90✔
943
        // Use the original sync config, not the updated one from the migration store
85✔
944
        session_config.client_reset_config =
170✔
945
            make_client_reset_config(m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy),
170✔
946
                                     allowed_to_recover, m_previous_schema_version.has_value());
170✔
947
        session_config.schema_version = m_previous_schema_version.value_or(m_config.schema_version);
170✔
948
        m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
170✔
949
    }
170✔
950

905✔
951
    m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
1,985✔
952

905✔
953
    std::weak_ptr<SyncSession> weak_self = weak_from_this();
1,985✔
954

905✔
955
    // Set up the wrapped progress handler callback
905✔
956
    m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
1,985✔
957
                                                uint_fast64_t uploaded, uint_fast64_t uploadable,
1,985✔
958
                                                uint_fast64_t snapshot_version, double download_estimate,
1,985✔
959
                                                double upload_estimate) {
5,018✔
960
        if (auto self = weak_self.lock()) {
5,018✔
961
            self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, snapshot_version,
4,959✔
962
                                         download_estimate, upload_estimate);
4,959✔
963
        }
4,959✔
964
    });
5,018✔
965

905✔
966
    // Sets up the connection state listener. This callback is used for both reporting errors as well as changes to
905✔
967
    // the connection state.
905✔
968
    m_session->set_connection_state_change_listener(
1,985✔
969
        [weak_self](sync::ConnectionState state, util::Optional<sync::SessionErrorInfo> error) {
4,266✔
970
            using cs = sync::ConnectionState;
4,266✔
971
            ConnectionState new_state = [&] {
4,266✔
972
                switch (state) {
4,266✔
973
                    case cs::disconnected:
408✔
974
                        return ConnectionState::Disconnected;
408✔
975
                    case cs::connecting:
1,977✔
976
                        return ConnectionState::Connecting;
1,977✔
977
                    case cs::connected:
1,881✔
978
                        return ConnectionState::Connected;
1,881✔
UNCOV
979
                }
×
980
                REALM_UNREACHABLE();
UNCOV
981
            }();
×
982
            // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is
1,982✔
983
            // nothing useful we can do with them.
1,982✔
984
            if (auto self = weak_self.lock()) {
4,266✔
985
                self->update_connection_state(new_state);
4,238✔
986
                if (error) {
4,238✔
987
                    self->handle_error(std::move(*error));
469✔
988
                }
469✔
989
            }
4,238✔
990
        });
4,266✔
991
}
1,985✔
992

993
void SyncSession::update_connection_state(ConnectionState new_state)
994
{
4,238✔
995
    if (new_state == ConnectionState::Connected) {
4,238✔
996
        util::CheckedLockGuard lock(m_config_mutex);
1,865✔
997
        m_server_url_verified = true;
1,865✔
998
    }
1,865✔
999

1,971✔
1000
    ConnectionState old_state;
4,238✔
1001
    {
4,238✔
1002
        util::CheckedLockGuard lock(m_connection_state_mutex);
4,238✔
1003
        old_state = m_connection_state;
4,238✔
1004
        m_connection_state = new_state;
4,238✔
1005
    }
4,238✔
1006

1,971✔
1007
    // Notify any registered connection callbacks of the state transition
1,971✔
1008
    if (old_state != new_state) {
4,238✔
1009
        m_connection_change_notifier.invoke_callbacks(old_state, new_state);
4,169✔
1010
    }
4,169✔
1011
}
4,238✔
1012

1013
void SyncSession::nonsync_transact_notify(sync::version_type version)
1014
{
9,092✔
1015
    m_progress_notifier.set_local_version(version);
9,092✔
1016

4,536✔
1017
    util::CheckedUniqueLock lock(m_state_mutex);
9,092✔
1018
    switch (m_state) {
9,092✔
1019
        case State::Active:
8,630✔
1020
        case State::WaitingForAccessToken:
8,631✔
1021
            if (m_session) {
8,631✔
1022
                m_session->nonsync_transact_notify(version);
8,629✔
1023
            }
8,629✔
1024
            break;
8,631✔
1025
        case State::Dying:
4,306✔
1026
        case State::Inactive:
244✔
1027
        case State::Paused:
461✔
1028
            break;
461✔
1029
    }
9,092✔
1030
}
9,092✔
1031

1032
void SyncSession::revive_if_needed()
1033
{
2,323✔
1034
    util::CheckedUniqueLock lock(m_state_mutex);
2,323✔
1035
    switch (m_state) {
2,323✔
1036
        case State::Active:
571✔
1037
        case State::WaitingForAccessToken:
571✔
1038
        case State::Paused:
574✔
1039
            return;
574✔
1040
        case State::Dying:
788✔
1041
        case State::Inactive:
1,749✔
1042
            do_revive(std::move(lock));
1,749✔
1043
            break;
1,749✔
1044
    }
2,323✔
1045
}
2,323✔
1046

1047
void SyncSession::handle_reconnect()
1048
{
4✔
1049
    util::CheckedUniqueLock lock(m_state_mutex);
4✔
1050
    switch (m_state) {
4✔
1051
        case State::Active:
4✔
1052
            m_session->cancel_reconnect_delay();
4✔
1053
            break;
4✔
UNCOV
1054
        case State::Dying:
✔
UNCOV
1055
        case State::Inactive:
✔
UNCOV
1056
        case State::WaitingForAccessToken:
✔
UNCOV
1057
        case State::Paused:
✔
UNCOV
1058
            break;
×
1059
    }
4✔
1060
}
4✔
1061

1062
void SyncSession::force_close()
1063
{
303✔
1064
    util::CheckedUniqueLock lock(m_state_mutex);
303✔
1065
    switch (m_state) {
303✔
1066
        case State::Active:
264✔
1067
        case State::Dying:
287✔
1068
        case State::WaitingForAccessToken:
289✔
1069
            become_inactive(std::move(lock));
289✔
1070
            break;
289✔
1071
        case State::Inactive:
141✔
1072
        case State::Paused:
14✔
1073
            break;
14✔
1074
    }
303✔
1075
}
303✔
1076

1077
void SyncSession::pause()
1078
{
254✔
1079
    util::CheckedUniqueLock lock(m_state_mutex);
254✔
1080
    switch (m_state) {
254✔
1081
        case State::Active:
251✔
1082
        case State::Dying:
251✔
1083
        case State::WaitingForAccessToken:
251✔
1084
        case State::Inactive:
252✔
1085
            become_paused(std::move(lock));
252✔
1086
            break;
252✔
1087
        case State::Paused:
127✔
1088
            break;
2✔
1089
    }
254✔
1090
}
254✔
1091

1092
void SyncSession::resume()
1093
{
222✔
1094
    util::CheckedUniqueLock lock(m_state_mutex);
222✔
1095
    switch (m_state) {
222✔
1096
        case State::Active:
12✔
1097
        case State::WaitingForAccessToken:
12✔
1098
            return;
12✔
1099
        case State::Paused:
210✔
1100
        case State::Dying:
210✔
1101
        case State::Inactive:
210✔
1102
            do_revive(std::move(lock));
210✔
1103
            break;
210✔
1104
    }
222✔
1105
}
222✔
1106

1107
void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
1108
{
1,975✔
1109
    auto u = user();
1,975✔
1110
    // If the sync manager has a valid route and the user and it's access token
900✔
1111
    // are valid, then revive the session.
900✔
1112
    if (!u || !u->access_token_refresh_required()) {
1,975✔
1113
        become_active();
1,955✔
1114
        m_state_mutex.unlock(lock);
1,955✔
1115
        return;
1,955✔
1116
    }
1,955✔
1117

10✔
1118
    // Otherwise, either the access token has expired or the location info hasn't
10✔
1119
    // been requested since the app was started - request a new access token to
10✔
1120
    // refresh both.
10✔
1121
    become_waiting_for_access_token();
20✔
1122
    // Release the lock for SDKs with a single threaded
10✔
1123
    // networking implementation such as our test suite
10✔
1124
    // so that the update can trigger a state change from
10✔
1125
    // the completion handler.
10✔
1126
    m_state_mutex.unlock(lock);
20✔
1127
    initiate_access_token_refresh();
20✔
1128
}
20✔
1129

1130
void SyncSession::close()
1131
{
100✔
1132
    util::CheckedUniqueLock lock(m_state_mutex);
100✔
1133
    close(std::move(lock));
100✔
1134
}
100✔
1135

1136
void SyncSession::close(util::CheckedUniqueLock lock)
1137
{
1,666✔
1138
    switch (m_state) {
1,666✔
1139
        case State::Active: {
1,112✔
1140
            switch (config(&SyncConfig::stop_policy)) {
1,112✔
1141
                case SyncSessionStopPolicy::Immediately:
1,028✔
1142
                    become_inactive(std::move(lock));
1,028✔
1143
                    break;
1,028✔
UNCOV
1144
                case SyncSessionStopPolicy::LiveIndefinitely:
✔
1145
                    // Don't do anything; session lives forever.
UNCOV
1146
                    m_state_mutex.unlock(lock);
×
UNCOV
1147
                    break;
×
1148
                case SyncSessionStopPolicy::AfterChangesUploaded:
84✔
1149
                    // Wait for all pending changes to upload.
34✔
1150
                    become_dying(std::move(lock));
84✔
1151
                    break;
84✔
1152
            }
1,112✔
1153
            break;
1,112✔
1154
        }
1,112✔
1155
        case State::Dying:
525✔
1156
            m_state_mutex.unlock(lock);
18✔
1157
            break;
18✔
1158
        case State::Paused:
529✔
1159
        case State::Inactive: {
530✔
1160
            // We need to unregister from the sync manager if it still exists so that we don't end up
199✔
1161
            // holding the DBRef open after the session is closed. Otherwise we can end up preventing
199✔
1162
            // the user from deleting the realm when it's in the paused/inactive state.
199✔
1163
            if (m_sync_manager) {
530✔
1164
                m_sync_manager->unregister_session(m_db->get_path());
526✔
1165
            }
526✔
1166
            m_state_mutex.unlock(lock);
530✔
1167
            break;
530✔
1168
        }
220✔
1169
        case State::WaitingForAccessToken:
202✔
1170
            // Immediately kill the session.
3✔
1171
            become_inactive(std::move(lock));
6✔
1172
            break;
6✔
1173
    }
1,666✔
1174
}
1,666✔
1175

1176
void SyncSession::shutdown_and_wait()
1177
{
125✔
1178
    {
125✔
1179
        // Transition immediately to `inactive` state. Calling this function must guarantee that any
13✔
1180
        // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
13✔
1181
        // must have been destroyed upon return. This allows the caller to follow up with a call to
13✔
1182
        // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
13✔
1183
        // Realm file to be closed. This works so long as this SyncSession object remains in the
13✔
1184
        // `inactive` state after the invocation of shutdown_and_wait().
13✔
1185
        util::CheckedUniqueLock lock(m_state_mutex);
125✔
1186
        if (m_state != State::Inactive && m_state != State::Paused) {
125✔
1187
            become_inactive(std::move(lock));
69✔
1188
        }
69✔
1189
    }
125✔
1190
    m_client.wait_for_session_terminations();
125✔
1191
}
125✔
1192

1193
void SyncSession::update_access_token(std::string_view signed_token)
1194
{
30✔
1195
    util::CheckedUniqueLock lock(m_state_mutex);
30✔
1196
    switch (m_state) {
30✔
1197
        case State::Active:
6✔
1198
            m_session->refresh(signed_token);
6✔
1199
            break;
6✔
1200
        case State::WaitingForAccessToken:
8✔
1201
            become_active();
8✔
1202
            break;
8✔
UNCOV
1203
        case State::Paused:
✔
1204
            // token will be pulled from user when the session is unpaused
UNCOV
1205
            return;
×
1206
        case State::Dying:
8✔
1207
        case State::Inactive:
16✔
1208
            do_revive(std::move(lock));
16✔
1209
            break;
16✔
1210
    }
30✔
1211
}
30✔
1212

1213
void SyncSession::initiate_access_token_refresh()
1214
{
20✔
1215
    if (auto session_user = user()) {
20✔
1216
        session_user->request_access_token(handle_refresh(shared_from_this(), false));
20✔
1217
    }
20✔
1218
}
20✔
1219

1220
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback, ProgressDirection direction)
1221
{
2,589✔
1222
    bool is_download = (direction == ProgressDirection::download);
2,589✔
1223

1,252✔
1224
    m_completion_request_counter++;
2,589✔
1225
    m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
2,589✔
1226
                                        std::make_pair(direction, std::move(callback)));
2,589✔
1227
    // If the state is inactive then just store the callback and return. The callback will get
1,252✔
1228
    // re-registered with the underlying session if/when the session ever becomes active again.
1,252✔
1229
    if (!m_session) {
2,589✔
1230
        return;
299✔
1231
    }
299✔
1232

1,110✔
1233
    auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
2,290✔
1234
                              : &sync::Session::async_wait_for_upload_completion;
1,680✔
1235

1,110✔
1236
    (m_session.get()->*waiter)([weak_self = weak_from_this(), id = m_completion_request_counter](Status status) {
2,290✔
1237
        auto self = weak_self.lock();
2,290✔
1238
        if (!self)
2,290✔
1239
            return;
54✔
1240
        util::CheckedUniqueLock lock(self->m_state_mutex);
2,236✔
1241
        auto callback_node = self->m_completion_callbacks.extract(id);
2,236✔
1242
        lock.unlock();
2,236✔
1243
        if (callback_node) {
2,236✔
1244
            callback_node.mapped().second(std::move(status));
2,040✔
1245
        }
2,040✔
1246
    });
2,236✔
1247
}
2,290✔
1248

1249
void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(Status)>&& callback)
1250
{
944✔
1251
    util::CheckedUniqueLock lock(m_state_mutex);
944✔
1252
    add_completion_callback(std::move(callback), ProgressDirection::upload);
944✔
1253
}
944✔
1254

1255
void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)>&& callback)
1256
{
1,190✔
1257
    util::CheckedUniqueLock lock(m_state_mutex);
1,190✔
1258
    add_completion_callback(std::move(callback), ProgressDirection::download);
1,190✔
1259
}
1,190✔
1260

1261
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
1262
                                                 ProgressDirection direction, bool is_streaming)
1263
{
156✔
1264
    return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
156✔
1265
}
156✔
1266

1267
void SyncSession::unregister_progress_notifier(uint64_t token)
1268
{
12✔
1269
    m_progress_notifier.unregister_callback(token);
12✔
1270
}
12✔
1271

1272
uint64_t SyncSession::register_connection_change_callback(std::function<ConnectionStateChangeCallback>&& callback)
1273
{
6✔
1274
    return m_connection_change_notifier.add_callback(std::move(callback));
6✔
1275
}
6✔
1276

1277
void SyncSession::unregister_connection_change_callback(uint64_t token)
1278
{
2✔
1279
    m_connection_change_notifier.remove_callback(token);
2✔
1280
}
2✔
1281

1282
SyncSession::~SyncSession() {}
1,566✔
1283

1284
SyncSession::State SyncSession::state() const
1285
{
16,235✔
1286
    util::CheckedUniqueLock lock(m_state_mutex);
16,235✔
1287
    return m_state;
16,235✔
1288
}
16,235✔
1289

1290
SyncSession::ConnectionState SyncSession::connection_state() const
1291
{
2,705✔
1292
    util::CheckedUniqueLock lock(m_connection_state_mutex);
2,705✔
1293
    return m_connection_state;
2,705✔
1294
}
2,705✔
1295

1296
std::string const& SyncSession::path() const
1297
{
234✔
1298
    return m_db->get_path();
234✔
1299
}
234✔
1300

1301
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store()
1302
{
2,050,680✔
1303
    util::CheckedLockGuard lock(m_state_mutex);
2,050,680✔
1304
    return m_flx_subscription_store;
2,050,680✔
1305
}
2,050,680✔
1306

1307
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
1308
{
2✔
1309
    util::CheckedLockGuard lock(m_state_mutex);
2✔
1310
    return m_subscription_store_base;
2✔
1311
}
2✔
1312

1313
sync::SaltedFileIdent SyncSession::get_file_ident() const
1314
{
160✔
1315
    auto repl = m_db->get_replication();
160✔
1316
    REALM_ASSERT(repl);
160✔
1317
    REALM_ASSERT(dynamic_cast<sync::ClientReplication*>(repl));
160✔
1318

80✔
1319
    sync::SaltedFileIdent ret;
160✔
1320
    sync::version_type unused_version;
160✔
1321
    sync::SyncProgress unused_progress;
160✔
1322
    static_cast<sync::ClientReplication*>(repl)->get_history().get_status(unused_version, ret, unused_progress);
160✔
1323
    return ret;
160✔
1324
}
160✔
1325

1326
std::string SyncSession::get_appservices_connection_id() const
1327
{
20✔
1328
    util::CheckedLockGuard lk(m_state_mutex);
20✔
1329
    if (!m_session) {
20✔
UNCOV
1330
        return {};
×
UNCOV
1331
    }
×
1332
    return m_session->get_appservices_connection_id();
20✔
1333
}
20✔
1334

1335
void SyncSession::update_configuration(SyncConfig new_config)
1336
{
8✔
1337
    while (true) {
18✔
1338
        util::CheckedUniqueLock state_lock(m_state_mutex);
18✔
1339
        if (m_state != State::Inactive && m_state != State::Paused) {
18✔
1340
            // Changing the state releases the lock, which means that by the
5✔
1341
            // time we reacquire the lock the state may have changed again
5✔
1342
            // (either due to one of the callbacks being invoked or another
5✔
1343
            // thread coincidentally doing something). We just attempt to keep
5✔
1344
            // switching it to inactive until it stays there.
5✔
1345
            become_inactive(std::move(state_lock));
10✔
1346
            continue;
10✔
1347
        }
10✔
1348

4✔
1349
        util::CheckedUniqueLock config_lock(m_config_mutex);
8✔
1350
        REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
8!
1351
        REALM_ASSERT(!m_session);
8✔
1352
        REALM_ASSERT(m_config.sync_config->user == new_config.user);
8✔
1353
        // Since this is used for testing purposes only, just update the current sync_config
4✔
1354
        m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
8✔
1355
        break;
8✔
1356
    }
8✔
1357
    revive_if_needed();
8✔
1358
}
8✔
1359

1360
void SyncSession::apply_sync_config_after_migration_or_rollback()
1361
{
26✔
1362
    // Migration state changed - Update the configuration to
13✔
1363
    // match the new sync mode.
13✔
1364
    util::CheckedLockGuard cfg_lock(m_config_mutex);
26✔
1365
    if (!m_migrated_sync_config)
26✔
1366
        return;
2✔
1367

12✔
1368
    m_config.sync_config = m_migrated_sync_config;
24✔
1369
    m_migrated_sync_config.reset();
24✔
1370
}
24✔
1371

1372
void SyncSession::save_sync_config_after_migration_or_rollback()
1373
{
28✔
1374
    util::CheckedLockGuard cfg_lock(m_config_mutex);
28✔
1375
    m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
28✔
1376
}
28✔
1377

1378
void SyncSession::update_subscription_store(bool flx_sync_requested, std::optional<sync::SubscriptionSet> new_subs)
1379
{
52✔
1380
    util::CheckedUniqueLock lock(m_state_mutex);
52✔
1381

26✔
1382
    // The session should be closed before updating the FLX subscription store
26✔
1383
    REALM_ASSERT(!m_session);
52✔
1384

26✔
1385
    // If the subscription store exists and switching to PBS, then clear the store
26✔
1386
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
52✔
1387
    if (!flx_sync_requested) {
52✔
1388
        if (m_flx_subscription_store) {
8✔
1389
            // Empty the subscription store and cancel any pending subscription notification
4✔
1390
            // waiters
4✔
1391
            auto subscription_store = std::move(m_flx_subscription_store);
8✔
1392
            lock.unlock();
8✔
1393
            auto tr = m_db->start_write();
8✔
1394
            subscription_store->reset(*tr);
8✔
1395
            history.set_write_validator_factory(nullptr);
8✔
1396
            tr->commit();
8✔
1397
        }
8✔
1398
        return;
8✔
1399
    }
8✔
1400

22✔
1401
    if (m_flx_subscription_store)
44✔
1402
        return; // Using FLX and subscription store already exists
2✔
1403

21✔
1404
    // Going from PBS -> FLX (or one doesn't exist yet), create a new subscription store
21✔
1405
    create_subscription_store();
42✔
1406

21✔
1407
    std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
42✔
1408

21✔
1409
    // If migrated to FLX, create subscriptions in the local realm to cover the existing data.
21✔
1410
    // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
21✔
1411
    if (new_subs) {
42✔
1412
        auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
16✔
1413
        active_mut_sub.import(std::move(*new_subs));
16✔
1414
        active_mut_sub.set_state(sync::SubscriptionSet::State::Complete);
16✔
1415
        active_mut_sub.commit();
16✔
1416
    }
16✔
1417

21✔
1418
    auto tr = m_db->start_write();
42✔
1419
    set_write_validator_factory(weak_sub_mgr);
42✔
1420
    tr->rollback();
42✔
1421
}
42✔
1422

1423
void SyncSession::create_subscription_store()
1424
{
618✔
1425
    REALM_ASSERT(!m_flx_subscription_store);
618✔
1426

309✔
1427
    // Create the main subscription store instance when this is first called - this will
309✔
1428
    // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
309✔
1429
    // will be reset when rolling back to PBS after a client FLX migration
309✔
1430
    if (!m_subscription_store_base) {
618✔
1431
        m_subscription_store_base = sync::SubscriptionStore::create(m_db);
616✔
1432
    }
616✔
1433

309✔
1434
    // m_subscription_store_base is always around for the life of SyncSession, but the
309✔
1435
    // m_flx_subscription_store is set when using FLX.
309✔
1436
    m_flx_subscription_store = m_subscription_store_base;
618✔
1437
}
618✔
1438

1439
void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
1440
{
618✔
1441
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
618✔
1442
    history.set_write_validator_factory(
618✔
1443
        [weak_sub_mgr](Transaction& tr) -> util::UniqueFunction<sync::SyncReplication::WriteValidator> {
6,399✔
1444
            auto sub_mgr = weak_sub_mgr.lock();
6,399✔
1445
            REALM_ASSERT_RELEASE(sub_mgr);
6,399✔
1446
            auto latest_sub_tables = sub_mgr->get_tables_for_latest(tr);
6,399✔
1447
            return [tables = std::move(latest_sub_tables)](const Table& table) {
3,635✔
1448
                if (table.get_table_type() != Table::Type::TopLevel) {
873✔
1449
                    return;
430✔
1450
                }
430✔
1451
                auto object_class_name = Group::table_name_to_class_name(table.get_name());
443✔
1452
                if (tables.find(object_class_name) == tables.end()) {
443✔
1453
                    throw NoSubscriptionForWrite(
2✔
1454
                        util::format("Cannot write to class %1 when no flexible sync subscription has been created.",
2✔
1455
                                     object_class_name));
2✔
1456
                }
2✔
1457
            };
443✔
1458
        });
6,399✔
1459
}
618✔
1460

1461
// Represents a reference to the SyncSession from outside of the sync subsystem.
1462
// We attempt to keep the SyncSession in an active state as long as it has an external reference.
1463
class SyncSession::ExternalReference {
1464
public:
1465
    ExternalReference(std::shared_ptr<SyncSession> session)
1466
        : m_session(std::move(session))
1467
    {
1,566✔
1468
    }
1,566✔
1469

1470
    ~ExternalReference()
1471
    {
1,566✔
1472
        m_session->did_drop_external_reference();
1,566✔
1473
    }
1,566✔
1474

1475
private:
1476
    std::shared_ptr<SyncSession> m_session;
1477
};
1478

1479
std::shared_ptr<SyncSession> SyncSession::external_reference()
1480
{
1,800✔
1481
    util::CheckedLockGuard lock(m_external_reference_mutex);
1,800✔
1482

814✔
1483
    if (auto external_reference = m_external_reference.lock())
1,800✔
1484
        return std::shared_ptr<SyncSession>(external_reference, this);
234✔
1485

697✔
1486
    auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
1,566✔
1487
    m_external_reference = external_reference;
1,566✔
1488
    return std::shared_ptr<SyncSession>(external_reference, this);
1,566✔
1489
}
1,566✔
1490

1491
std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
1492
{
2,645✔
1493
    util::CheckedLockGuard lock(m_external_reference_mutex);
2,645✔
1494

1,116✔
1495
    if (auto external_reference = m_external_reference.lock())
2,645✔
1496
        return std::shared_ptr<SyncSession>(external_reference, this);
1,131✔
1497

668✔
1498
    return nullptr;
1,514✔
1499
}
1,514✔
1500

1501
void SyncSession::did_drop_external_reference()
1502
{
1,566✔
1503
    util::CheckedUniqueLock lock1(m_state_mutex);
1,566✔
1504
    {
1,566✔
1505
        util::CheckedLockGuard lock2(m_external_reference_mutex);
1,566✔
1506

697✔
1507
        // If the session is being resurrected we should not close the session.
697✔
1508
        if (!m_external_reference.expired())
1,566✔
UNCOV
1509
            return;
×
1510
    }
1,566✔
1511

697✔
1512
    close(std::move(lock1));
1,566✔
1513
}
1,566✔
1514

1515
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1516
                                                 NotifierType direction, bool is_streaming)
1517
{
200✔
1518
    util::UniqueFunction<void()> invocation;
200✔
1519
    uint64_t token_value = 0;
200✔
1520
    {
200✔
1521
        std::lock_guard<std::mutex> lock(m_mutex);
200✔
1522
        token_value = m_progress_notifier_token++;
200✔
1523
        NotifierPackage package{std::move(notifier), util::none, m_local_transaction_version, is_streaming,
200✔
1524
                                direction == NotifierType::download};
200✔
1525
        if (!m_current_progress) {
200✔
1526
            // Simply register the package, since we have no data yet.
58✔
1527
            m_packages.emplace(token_value, std::move(package));
116✔
1528
            return token_value;
116✔
1529
        }
116✔
1530
        bool skip_registration = false;
84✔
1531
        invocation = package.create_invocation(*m_current_progress, skip_registration);
84✔
1532
        if (skip_registration) {
84✔
1533
            token_value = 0;
26✔
1534
        }
26✔
1535
        else {
58✔
1536
            m_packages.emplace(token_value, std::move(package));
58✔
1537
        }
58✔
1538
    }
84✔
1539
    invocation();
84✔
1540
    return token_value;
84✔
1541
}
84✔
1542

1543
void SyncProgressNotifier::unregister_callback(uint64_t token)
1544
{
16✔
1545
    std::lock_guard<std::mutex> lock(m_mutex);
16✔
1546
    m_packages.erase(token);
16✔
1547
}
16✔
1548

1549
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1550
                                  uint64_t snapshot_version, double download_estimate, double upload_estimate)
1551
{
5,053✔
1552
    std::vector<util::UniqueFunction<void()>> invocations;
5,053✔
1553
    {
5,053✔
1554
        std::lock_guard<std::mutex> lock(m_mutex);
5,053✔
1555
        m_current_progress = Progress{uploadable,      downloadable,      uploaded,        downloaded,
5,053✔
1556
                                      upload_estimate, download_estimate, snapshot_version};
5,053✔
1557

2,321✔
1558
        for (auto it = m_packages.begin(); it != m_packages.end();) {
6,057✔
1559
            bool should_delete = false;
1,004✔
1560
            invocations.emplace_back(it->second.create_invocation(*m_current_progress, should_delete));
1,004✔
1561
            it = should_delete ? m_packages.erase(it) : std::next(it);
968✔
1562
        }
1,004✔
1563
    }
5,053✔
1564
    // Run the notifiers only after we've released the lock.
2,321✔
1565
    for (auto& invocation : invocations)
5,053✔
1566
        invocation();
1,004✔
1567
}
5,053✔
1568

1569
void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
1570
{
9,100✔
1571
    std::lock_guard<std::mutex> lock(m_mutex);
9,100✔
1572
    m_local_transaction_version = snapshot_version;
9,100✔
1573
}
9,100✔
1574

1575
util::UniqueFunction<void()>
1576
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1577
{
1,088✔
1578
    uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
823✔
1579
    uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
823✔
1580
    double progress_estimate = is_download ? current_progress.download_estimate : current_progress.upload_estimate;
823✔
1581

545✔
1582
    // If the sync client has not yet processed all of the local
545✔
1583
    // transactions then the uploadable data is incorrect and we should
545✔
1584
    // not invoke the callback
545✔
1585
    if (!is_download && snapshot_version > current_progress.snapshot_version)
1,088✔
1586
        return [] {};
18✔
1587

536✔
1588
    if (!is_streaming) {
1,070✔
1589
        // The initial download size we get from the server is the uncompacted
152✔
1590
        // size, and so the download may complete before we actually receive
152✔
1591
        // that much data. When that happens, transferrable will drop and we
152✔
1592
        // need to use the new value instead of the captured one.
152✔
1593
        if (!captured_transferrable || *captured_transferrable > transferrable)
304✔
1594
            captured_transferrable = transferrable;
130✔
1595
        transferrable = *captured_transferrable;
304✔
1596
    }
304✔
1597

536✔
1598
    // A notifier is expired if at least as many bytes have been transferred
536✔
1599
    // as were originally considered transferrable.
536✔
1600
    is_expired = !is_streaming && transferred >= transferrable;
1,070✔
1601
    return [=, notifier = notifier] {
1,070✔
1602
        notifier(transferred, transferrable, progress_estimate);
1,070✔
1603
    };
1,070✔
1604
}
1,070✔
1605

1606
uint64_t SyncSession::ConnectionChangeNotifier::add_callback(std::function<ConnectionStateChangeCallback> callback)
1607
{
6✔
1608
    std::lock_guard<std::mutex> lock(m_callback_mutex);
6✔
1609
    auto token = m_next_token++;
6✔
1610
    m_callbacks.push_back({std::move(callback), token});
6✔
1611
    return token;
6✔
1612
}
6✔
1613

1614
void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
1615
{
2✔
1616
    Callback old;
2✔
1617
    {
2✔
1618
        std::lock_guard<std::mutex> lock(m_callback_mutex);
2✔
1619
        auto it = std::find_if(begin(m_callbacks), end(m_callbacks), [=](const auto& c) {
2✔
1620
            return c.token == token;
2✔
1621
        });
2✔
1622
        if (it == end(m_callbacks)) {
2✔
UNCOV
1623
            return;
×
UNCOV
1624
        }
×
1625

1✔
1626
        size_t idx = distance(begin(m_callbacks), it);
2✔
1627
        if (m_callback_index != npos) {
2✔
UNCOV
1628
            if (m_callback_index >= idx)
×
UNCOV
1629
                --m_callback_index;
×
UNCOV
1630
        }
×
1631
        --m_callback_count;
2✔
1632

1✔
1633
        old = std::move(*it);
2✔
1634
        m_callbacks.erase(it);
2✔
1635
    }
2✔
1636
}
2✔
1637

1638
void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
1639
{
5,727✔
1640
    std::unique_lock lock(m_callback_mutex);
5,727✔
1641
    m_callback_count = m_callbacks.size();
5,727✔
1642
    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
5,731✔
1643
        // acquire a local reference to the callback so that removing the
2✔
1644
        // callback from within it can't result in a dangling pointer
2✔
1645
        auto cb = m_callbacks[m_callback_index].fn;
4✔
1646
        lock.unlock();
4✔
1647
        cb(old_state, new_state);
4✔
1648
        lock.lock();
4✔
1649
    }
4✔
1650
    m_callback_index = npos;
5,727✔
1651
}
5,727✔
1652

1653
util::Future<std::string> SyncSession::send_test_command(std::string body)
1654
{
30✔
1655
    util::CheckedLockGuard lk(m_state_mutex);
30✔
1656
    if (!m_session) {
30✔
UNCOV
1657
        return Status{ErrorCodes::RuntimeError, "Session doesn't exist to send test command on"};
×
UNCOV
1658
    }
×
1659

15✔
1660
    return m_session->send_test_command(std::move(body));
30✔
1661
}
30✔
1662

1663
void SyncSession::migrate_schema(util::UniqueFunction<void(Status)>&& callback)
1664
{
28✔
1665
    util::CheckedUniqueLock lock(m_state_mutex);
28✔
1666
    // If the schema migration is already in progress, just wait to complete.
14✔
1667
    if (m_schema_migration_in_progress) {
28✔
1668
        add_completion_callback(std::move(callback), ProgressDirection::download);
2✔
1669
        return;
2✔
1670
    }
2✔
1671
    m_schema_migration_in_progress = true;
26✔
1672

13✔
1673
    // Perform the migration:
13✔
1674
    //  1. Pause the sync session
13✔
1675
    //  2. Once the sync client releases the realm file:
13✔
1676
    //      a. Delete all tables (private and public)
13✔
1677
    //      b. Reset the subscription store
13✔
1678
    //      d. Empty the sync history and adjust cursors
13✔
1679
    //      e. Reset file ident (the server flags the old ident as in the case of a client reset)
13✔
1680
    // 3. Resume the session (the client asks for a new file ident)
13✔
1681
    // See `sync_schema_migration::perform_schema_migration` for more details.
13✔
1682

13✔
1683
    CompletionCallbacks callbacks;
26✔
1684
    std::swap(m_completion_callbacks, callbacks);
26✔
1685
    auto guard = util::make_scope_exit([&]() noexcept {
26✔
1686
        util::CheckedUniqueLock lock(m_state_mutex);
26✔
1687
        if (m_completion_callbacks.empty())
26✔
1688
            std::swap(callbacks, m_completion_callbacks);
26✔
UNCOV
1689
        else
×
UNCOV
1690
            m_completion_callbacks.merge(std::move(callbacks));
×
1691
    });
26✔
1692
    m_state_mutex.unlock(lock);
26✔
1693

13✔
1694
    auto future = pause_async();
26✔
1695
    std::move(future).get_async(
26✔
1696
        [callback = std::move(callback), weak_session = weak_from_this()](Status status) mutable {
26✔
1697
            if (!status.is_ok())
26✔
UNCOV
1698
                return callback(status);
×
1699

13✔
1700
            auto session = weak_session.lock();
26✔
1701
            if (!session) {
26✔
UNCOV
1702
                status = Status(ErrorCodes::InvalidSession, "Sync session was destroyed during schema migration");
×
UNCOV
1703
                return callback(status);
×
UNCOV
1704
            }
×
1705
            sync_schema_migration::perform_schema_migration(*session->m_db);
26✔
1706
            {
26✔
1707
                util::CheckedUniqueLock lock(session->m_state_mutex);
26✔
1708
                session->m_previous_schema_version.reset();
26✔
1709
                session->m_schema_migration_in_progress = false;
26✔
1710
                session->m_subscription_store_base.reset();
26✔
1711
                session->m_flx_subscription_store.reset();
26✔
1712
            }
26✔
1713
            session->update_subscription_store(true, {});
26✔
1714
            session->wait_for_download_completion(std::move(callback));
26✔
1715
            session->resume();
26✔
1716
        });
26✔
1717
}
26✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc