• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1714

29 Sep 2023 04:22PM UTC coverage: 91.218% (+0.03%) from 91.188%
1714

push

Evergreen

realm-ci
Updated release notes

95850 of 175738 branches covered (0.0%)

232557 of 254946 relevant lines covered (91.22%)

7251217.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.05
/src/realm/object-store/sync/sync_session.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_session.hpp>
20

21
#include <realm/object-store/thread_safe_reference.hpp>
22
#include <realm/object-store/impl/realm_coordinator.hpp>
23
#include <realm/object-store/sync/app.hpp>
24
#include <realm/object-store/sync/impl/sync_client.hpp>
25
#include <realm/object-store/sync/impl/sync_file.hpp>
26
#include <realm/object-store/sync/impl/sync_metadata.hpp>
27
#include <realm/object-store/sync/sync_manager.hpp>
28
#include <realm/object-store/sync/sync_user.hpp>
29
#include <realm/object-store/util/scheduler.hpp>
30

31
#include <realm/db_options.hpp>
32
#include <realm/sync/client.hpp>
33
#include <realm/sync/config.hpp>
34
#include <realm/sync/network/http.hpp>
35
#include <realm/sync/network/websocket_error.hpp>
36
#include <realm/sync/noinst/client_history_impl.hpp>
37
#include <realm/sync/noinst/client_reset_operation.hpp>
38
#include <realm/sync/noinst/migration_store.hpp>
39
#include <realm/sync/protocol.hpp>
40

41
using namespace realm;
42
using namespace realm::_impl;
43

44
using SessionWaiterPointer = void (sync::Session::*)(util::UniqueFunction<void(std::error_code)>);
45

46
constexpr const char SyncError::c_original_file_path_key[];
47
constexpr const char SyncError::c_recovery_file_path_key[];
48

49
/// STATES:
50
///
51
/// WAITING_FOR_ACCESS_TOKEN: a request has been initiated to ask
52
/// for an updated access token and the session is waiting for a response.
53
/// From: INACTIVE, DYING
54
/// To:
55
///    * ACTIVE: when the SDK successfully refreshes the token
56
///    * INACTIVE: if asked to log out, or if asked to close
57
///
58
/// ACTIVE: the session is connected to the Sync Server and is actively
59
/// transferring data.
60
/// From: INACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
61
/// To:
62
///    * INACTIVE: if asked to log out, or if asked to close and the stop policy
63
///                is Immediate.
64
///    * DYING: if asked to close and the stop policy is AfterChangesUploaded
65
///
66
/// DYING: the session is performing clean-up work in preparation to be destroyed.
67
/// From: ACTIVE
68
/// To:
69
///    * INACTIVE: when the clean-up work completes, if the session wasn't
70
///                revived, or if explicitly asked to log out before the
71
///                clean-up work begins
72
///    * ACTIVE: if the session is revived
73
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
74
///                                but the token is invalid or expired.
75
///
76
/// INACTIVE: the user owning this session has logged out, the `sync::Session`
77
/// owned by this session is destroyed, and the session is quiescent.
78
/// Note that a session briefly enters this state before being destroyed, but
79
/// it can also enter this state and stay there if the user has been logged out.
80
/// From: initial, ACTIVE, DYING, WAITING_FOR_ACCESS_TOKEN
81
/// To:
82
///    * ACTIVE: if the session is revived
83
///    * WAITING_FOR_ACCESS_TOKEN: if the session tried to enter ACTIVE,
84
///                                but the token is invalid or expired.
85

86
void SyncSession::become_active()
87
{
1,664✔
88
    REALM_ASSERT(m_state != State::Active);
1,664✔
89
    m_state = State::Active;
1,664✔
90

745✔
91
    // First time the session becomes active, register a notification on the sentinel subscription set to restart the
745✔
92
    // session and update to native FLX.
745✔
93
    if (m_migration_sentinel_query_version) {
1,664✔
94
        m_flx_subscription_store->get_by_version(*m_migration_sentinel_query_version)
2✔
95
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
96
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
2✔
97
                if (!s.is_ok()) {
2✔
98
                    return;
×
99
                }
×
100
                REALM_ASSERT(s.get_value() == sync::SubscriptionSet::State::Complete);
2✔
101
                if (auto strong_self = weak_self.lock()) {
2✔
102
                    strong_self->m_migration_store->cancel_migration();
2✔
103
                    strong_self->restart_session();
2✔
104
                }
2✔
105
            });
2✔
106
        m_migration_sentinel_query_version.reset();
2✔
107
    }
2✔
108

745✔
109
    // when entering from the Dying state the session will still be bound
745✔
110
    if (!m_session) {
1,664✔
111
        create_sync_session();
1,662✔
112
        m_session->bind();
1,662✔
113
    }
1,662✔
114

745✔
115
    // Register all the pending wait-for-completion blocks. This can
745✔
116
    // potentially add a redundant callback if we're coming from the Dying
745✔
117
    // state, but that's okay (we won't call the user callbacks twice).
745✔
118
    SyncSession::CompletionCallbacks callbacks_to_register;
1,664✔
119
    std::swap(m_completion_callbacks, callbacks_to_register);
1,664✔
120

745✔
121
    for (auto& [id, callback_tuple] : callbacks_to_register) {
915✔
122
        add_completion_callback(std::move(callback_tuple.second), callback_tuple.first);
326✔
123
    }
326✔
124
}
1,664✔
125

126
void SyncSession::restart_session()
127
{
8✔
128
    util::CheckedUniqueLock lock(m_state_mutex);
8✔
129
    do_restart_session(std::move(lock));
8✔
130
}
8✔
131

132
void SyncSession::become_dying(util::CheckedUniqueLock lock)
133
{
72✔
134
    REALM_ASSERT(m_state != State::Dying);
72✔
135
    m_state = State::Dying;
72✔
136

28✔
137
    // If we have no session, we cannot possibly upload anything.
28✔
138
    if (!m_session) {
72✔
139
        become_inactive(std::move(lock));
×
140
        return;
×
141
    }
×
142

28✔
143
    size_t current_death_count = ++m_death_count;
72✔
144
    m_session->async_wait_for_upload_completion([weak_session = weak_from_this(), current_death_count](Status) {
72✔
145
        if (auto session = weak_session.lock()) {
72✔
146
            util::CheckedUniqueLock lock(session->m_state_mutex);
38✔
147
            if (session->m_state == State::Dying && session->m_death_count == current_death_count) {
38✔
148
                session->become_inactive(std::move(lock));
36✔
149
            }
36✔
150
        }
38✔
151
    });
72✔
152
    m_state_mutex.unlock(lock);
72✔
153
}
72✔
154

155
void SyncSession::become_inactive(util::CheckedUniqueLock lock, Status status)
156
{
1,487✔
157
    REALM_ASSERT(m_state != State::Inactive);
1,487✔
158
    m_state = State::Inactive;
1,487✔
159

656✔
160
    do_become_inactive(std::move(lock), status);
1,487✔
161
}
1,487✔
162

163
void SyncSession::become_paused(util::CheckedUniqueLock lock)
164
{
146✔
165
    REALM_ASSERT(m_state != State::Paused);
146✔
166
    auto old_state = m_state;
146✔
167
    m_state = State::Paused;
146✔
168

73✔
169
    // Nothing to do if we're already inactive besides update the state.
73✔
170
    if (old_state == State::Inactive) {
146✔
171
        m_state_mutex.unlock(lock);
2✔
172
        return;
2✔
173
    }
2✔
174

72✔
175
    do_become_inactive(std::move(lock), Status::OK());
144✔
176
}
144✔
177

178
void SyncSession::do_restart_session(util::CheckedUniqueLock)
179
{
8✔
180
    // Nothing to do if the sync session is currently paused
4✔
181
    // It will be resumed when resume() is called
4✔
182
    if (m_state == State::Paused)
8✔
183
        return;
×
184

4✔
185
    // Go straight to inactive so the progress completion waiters will
4✔
186
    // continue to wait until the session restarts and completes the
4✔
187
    // upload/download sync
4✔
188
    m_state = State::Inactive;
8✔
189

4✔
190
    if (m_session) {
8✔
191
        m_session.reset();
8✔
192
    }
8✔
193

4✔
194
    // Create a new session and re-register the completion callbacks
4✔
195
    // The latest server path will be retrieved from sync_manager when
4✔
196
    // the new session is created by create_sync_session() in become
4✔
197
    // active.
4✔
198
    become_active();
8✔
199
}
8✔
200

201
void SyncSession::do_become_inactive(util::CheckedUniqueLock lock, Status status)
202
{
1,631✔
203
    // Manually set the disconnected state. Sync would also do this, but
728✔
204
    // since the underlying SyncSession object already have been destroyed,
728✔
205
    // we are not able to get the callback.
728✔
206
    util::CheckedUniqueLock connection_state_lock(m_connection_state_mutex);
1,631✔
207
    auto old_state = m_connection_state;
1,631✔
208
    auto new_state = m_connection_state = SyncSession::ConnectionState::Disconnected;
1,631✔
209
    connection_state_lock.unlock();
1,631✔
210

728✔
211
    SyncSession::CompletionCallbacks waits;
1,631✔
212
    std::swap(waits, m_completion_callbacks);
1,631✔
213

728✔
214
    m_session = nullptr;
1,631✔
215
    if (m_sync_manager) {
1,631✔
216
        m_sync_manager->unregister_session(m_db->get_path());
1,631✔
217
    }
1,631✔
218

728✔
219
    m_state_mutex.unlock(lock);
1,631✔
220

728✔
221
    // Send notifications after releasing the lock to prevent deadlocks in the callback.
728✔
222
    if (old_state != new_state) {
1,631✔
223
        m_connection_change_notifier.invoke_callbacks(old_state, connection_state());
1,308✔
224
    }
1,308✔
225

728✔
226
    if (status.is_ok())
1,631✔
227
        status = Status(ErrorCodes::OperationAborted, "Sync session became inactive");
1,538✔
228

728✔
229
    // Inform any queued-up completion handlers that they were cancelled.
728✔
230
    for (auto& [id, callback] : waits)
1,631✔
231
        callback.second(status);
58✔
232
}
1,631✔
233

234
void SyncSession::become_waiting_for_access_token()
235
{
18✔
236
    REALM_ASSERT(m_state != State::WaitingForAccessToken);
18✔
237
    m_state = State::WaitingForAccessToken;
18✔
238
}
18✔
239

240
void SyncSession::handle_bad_auth(const std::shared_ptr<SyncUser>& user, Status status)
241
{
16✔
242
    // TODO: ideally this would write to the logs as well in case users didn't set up their error handler.
8✔
243
    {
16✔
244
        util::CheckedUniqueLock lock(m_state_mutex);
16✔
245
        cancel_pending_waits(std::move(lock), status);
16✔
246
    }
16✔
247
    if (user) {
16✔
248
        user->log_out();
16✔
249
    }
16✔
250

8✔
251
    if (auto error_handler = config(&SyncConfig::error_handler)) {
16✔
252
        auto user_facing_error = SyncError({ErrorCodes::AuthError, status.reason()}, true);
16✔
253
        error_handler(shared_from_this(), std::move(user_facing_error));
16✔
254
    }
16✔
255
}
16✔
256

257
static bool check_for_auth_failure(const app::AppError& error)
258
{
26✔
259
    using namespace realm::sync;
26✔
260
    // Auth failure is returned as a 401 (unauthorized) or 403 (forbidden) response
13✔
261
    if (error.additional_status_code) {
26✔
262
        auto status_code = HTTPStatus(*error.additional_status_code);
24✔
263
        if (status_code == HTTPStatus::Unauthorized || status_code == HTTPStatus::Forbidden)
24✔
264
            return true;
14✔
265
    }
12✔
266

6✔
267
    return false;
12✔
268
}
12✔
269

270
static bool check_for_redirect_response(const app::AppError& error)
271
{
12✔
272
    using namespace realm::sync;
12✔
273
    // Check for unhandled 301/308 permanent redirect response
6✔
274
    if (error.additional_status_code) {
12✔
275
        auto status_code = HTTPStatus(*error.additional_status_code);
10✔
276
        if (status_code == HTTPStatus::MovedPermanently || status_code == HTTPStatus::PermanentRedirect)
10✔
277
            return true;
×
278
    }
12✔
279

6✔
280
    return false;
12✔
281
}
12✔
282

283
util::UniqueFunction<void(util::Optional<app::AppError>)>
284
SyncSession::handle_refresh(const std::shared_ptr<SyncSession>& session, bool restart_session)
285
{
44✔
286
    return [session, restart_session](util::Optional<app::AppError> error) {
44✔
287
        auto session_user = session->user();
44✔
288
        if (!session_user) {
44✔
289
            util::CheckedUniqueLock lock(session->m_state_mutex);
×
290
            auto refresh_error = error ? error->to_status() : Status::OK();
×
291
            session->cancel_pending_waits(std::move(lock), refresh_error);
×
292
        }
×
293
        else if (error) {
44✔
294
            if (error->code() == ErrorCodes::ClientAppDeallocated) {
28✔
295
                return; // this response came in after the app shut down, ignore it
×
296
            }
×
297
            else if (ErrorCodes::error_categories(error->code()).test(ErrorCategory::client_error)) {
28✔
298
                // any other client errors other than app_deallocated are considered fatal because
1✔
299
                // there was a problem locally before even sending the request to the server
1✔
300
                // eg. ClientErrorCode::user_not_found, ClientErrorCode::user_not_logged_in,
1✔
301
                // ClientErrorCode::too_many_redirects
1✔
302
                session->handle_bad_auth(session_user, error->to_status());
2✔
303
            }
2✔
304
            else if (check_for_auth_failure(*error)) {
26✔
305
                // A 401 response on a refresh request means that the token cannot be refreshed and we should not
7✔
306
                // retry. This can be because an admin has revoked this user's sessions, the user has been disabled,
7✔
307
                // or the refresh token has expired according to the server's clock.
7✔
308
                session->handle_bad_auth(
14✔
309
                    session_user,
14✔
310
                    {error->code(), util::format("Unable to refresh the user access token: %1", error->reason())});
14✔
311
            }
14✔
312
            else if (check_for_redirect_response(*error)) {
12✔
313
                // A 301 or 308 response is an unhandled permanent redirect response (which should not happen) - if
314
                // this is received, fail the request with an appropriate error message.
315
                // Temporary redirect responses (302, 307) are not supported
316
                session->handle_bad_auth(
×
317
                    session_user,
×
318
                    {error->code(), util::format("Unhandled redirect response when trying to reach the server: %1",
×
319
                                                 error->reason())});
×
320
            }
×
321
            else {
12✔
322
                // A refresh request has failed. This is an unexpected non-fatal error and we would
6✔
323
                // like to retry but we shouldn't do this immediately in order to not swamp the
6✔
324
                // server with requests. Consider two scenarios:
6✔
325
                // 1) If this request was spawned from the proactive token check, or a user
6✔
326
                // initiated request, the token may actually be valid. Just advance to Active
6✔
327
                // from WaitingForAccessToken if needed and let the sync server tell us if the
6✔
328
                // token is valid or not. If this also fails we will end up in case 2 below.
6✔
329
                // 2) If the sync connection initiated the request because the server is
6✔
330
                // unavailable or the connection otherwise encounters an unexpected error, we want
6✔
331
                // to let the sync client attempt to reinitialize the connection using its own
6✔
332
                // internal backoff timer which will happen automatically so nothing needs to
6✔
333
                // happen here.
6✔
334
                util::CheckedUniqueLock lock(session->m_state_mutex);
12✔
335
                if (session->m_state == State::WaitingForAccessToken) {
12✔
336
                    session->become_active();
2✔
337
                }
2✔
338
            }
12✔
339
        }
28✔
340
        else {
16✔
341
            // If the session needs to be restarted, then restart the session now
8✔
342
            // The latest access token and server url will be pulled from the sync
8✔
343
            // manager when the new session is started.
8✔
344
            if (restart_session) {
16✔
345
                session->restart_session();
2✔
346
            }
2✔
347
            // Otherwise, update the access token and reconnect
7✔
348
            else {
14✔
349
                session->update_access_token(session_user->access_token());
14✔
350
            }
14✔
351
        }
16✔
352
    };
44✔
353
}
44✔
354

355
SyncSession::SyncSession(SyncClient& client, std::shared_ptr<DB> db, const RealmConfig& config,
356
                         SyncManager* sync_manager)
357
    : m_config{config}
358
    , m_db{std::move(db)}
359
    , m_original_sync_config{m_config.sync_config}
360
    , m_migration_store{sync::MigrationStore::create(m_db)}
361
    , m_client(client)
362
    , m_sync_manager(sync_manager)
363
{
1,342✔
364
    REALM_ASSERT(m_config.sync_config);
1,342✔
365
    // we don't want the following configs enabled during a client reset
585✔
366
    m_config.scheduler = nullptr;
1,342✔
367
    m_config.audit_config = nullptr;
1,342✔
368

585✔
369
    // Adjust the sync_config if using PBS sync and already in the migrated or rollback state
585✔
370
    if (m_migration_store->is_migrated() || m_migration_store->is_rollback_in_progress()) {
1,342✔
371
        m_config.sync_config = sync::MigrationStore::convert_sync_config_to_flx(m_original_sync_config);
8✔
372
    }
8✔
373

585✔
374
    // If using FLX, set up m_flx_subscription_store and the history_write_validator
585✔
375
    if (m_config.sync_config->flx_sync_requested) {
1,342✔
376
        create_subscription_store();
393✔
377
        std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
393✔
378
        set_write_validator_factory(weak_sub_mgr);
393✔
379
    }
393✔
380

585✔
381
    // After a migration to FLX, if the user opens the realm with a flexible sync configuration, we need to first
585✔
382
    // upload any unsynced changes before updating to native FLX.
585✔
383
    // A subscription set is used as sentinel so we know when to stop uploading.
585✔
384
    // Note: Currently, a sentinel subscription set is always created even if there is nothing to upload.
585✔
385
    if (m_migration_store->is_migrated() && m_original_sync_config->flx_sync_requested) {
1,342✔
386
        m_migration_store->create_sentinel_subscription_set(*m_flx_subscription_store);
2✔
387
        m_migration_sentinel_query_version = m_migration_store->get_sentinel_subscription_set_version();
2✔
388
        REALM_ASSERT(m_migration_sentinel_query_version);
2✔
389
    }
2✔
390
}
1,342✔
391

392
std::shared_ptr<SyncManager> SyncSession::sync_manager() const
393
{
×
394
    util::CheckedLockGuard lk(m_state_mutex);
×
395
    REALM_ASSERT(m_sync_manager);
×
396
    return m_sync_manager->shared_from_this();
×
397
}
×
398

399
void SyncSession::detach_from_sync_manager()
400
{
3✔
401
    shutdown_and_wait();
3✔
402
    util::CheckedLockGuard lk(m_state_mutex);
3✔
403
    m_sync_manager = nullptr;
3✔
404
}
3✔
405

406
void SyncSession::update_error_and_mark_file_for_deletion(SyncError& error, ShouldBackup should_backup)
407
{
68✔
408
    util::CheckedLockGuard config_lock(m_config_mutex);
68✔
409
    // Add a SyncFileActionMetadata marking the Realm as needing to be deleted.
34✔
410
    std::string recovery_path;
68✔
411
    auto original_path = path();
68✔
412
    error.user_info[SyncError::c_original_file_path_key] = original_path;
68✔
413
    if (should_backup == ShouldBackup::yes) {
68✔
414
        recovery_path = util::reserve_unique_file_name(
68✔
415
            m_sync_manager->recovery_directory_path(m_config.sync_config->recovery_directory),
68✔
416
            util::create_timestamped_template("recovered_realm"));
68✔
417
        error.user_info[SyncError::c_recovery_file_path_key] = recovery_path;
68✔
418
    }
68✔
419
    using Action = SyncFileActionMetadata::Action;
68✔
420
    auto action = should_backup == ShouldBackup::yes ? Action::BackUpThenDeleteRealm : Action::DeleteRealm;
68✔
421
    m_sync_manager->perform_metadata_update([action, original_path = std::move(original_path),
68✔
422
                                             recovery_path = std::move(recovery_path),
68✔
423
                                             partition_value = m_config.sync_config->partition_value,
68✔
424
                                             identity = m_config.sync_config->user->identity()](const auto& manager) {
68✔
425
        manager.make_file_action_metadata(original_path, partition_value, identity, action, recovery_path);
68✔
426
    });
68✔
427
}
68✔
428

429
void SyncSession::download_fresh_realm(sync::ProtocolErrorInfo::Action server_requests_action)
430
{
172✔
431
    // first check that recovery will not be prevented
86✔
432
    if (server_requests_action == sync::ProtocolErrorInfo::Action::ClientResetNoRecovery) {
172✔
433
        auto mode = config(&SyncConfig::client_resync_mode);
4✔
434
        if (mode == ClientResyncMode::Recover) {
4✔
435
            handle_fresh_realm_downloaded(
2✔
436
                nullptr,
2✔
437
                {ErrorCodes::RuntimeError,
2✔
438
                 "A client reset is required but the server does not permit recovery for this client"},
2✔
439
                server_requests_action);
2✔
440
            return;
2✔
441
        }
2✔
442
    }
170✔
443

85✔
444
    std::vector<char> encryption_key;
170✔
445
    {
170✔
446
        util::CheckedLockGuard lock(m_config_mutex);
170✔
447
        encryption_key = m_config.encryption_key;
170✔
448
    }
170✔
449

85✔
450
    DBOptions options;
170✔
451
    options.allow_file_format_upgrade = false;
170✔
452
    options.enable_async_writes = false;
170✔
453
    if (!encryption_key.empty())
170✔
454
        options.encryption_key = encryption_key.data();
2✔
455

85✔
456
    DBRef db;
170✔
457
    auto fresh_path = ClientResetOperation::get_fresh_path_for(m_db->get_path());
170✔
458
    try {
170✔
459
        // We want to attempt to use a pre-existing file to reduce the chance of
85✔
460
        // downloading the first part of the file only to then delete it over
85✔
461
        // and over, but if we fail to open it then we should just start over.
85✔
462
        try {
170✔
463
            db = DB::create(sync::make_client_replication(), fresh_path, options);
170✔
464
        }
170✔
465
        catch (...) {
90✔
466
            util::File::try_remove(fresh_path);
10✔
467
        }
10✔
468

85✔
469
        if (!db) {
166✔
470
            db = DB::create(sync::make_client_replication(), fresh_path, options);
2✔
471
        }
2✔
472
    }
162✔
473
    catch (...) {
89✔
474
        // Failed to open the fresh path after attempting to delete it, so we
4✔
475
        // just can't do automatic recovery.
4✔
476
        handle_fresh_realm_downloaded(nullptr, exception_to_status(), server_requests_action);
8✔
477
        return;
8✔
478
    }
8✔
479

81✔
480
    util::CheckedLockGuard state_lock(m_state_mutex);
162✔
481
    if (m_state != State::Active) {
162✔
482
        return;
×
483
    }
×
484
    std::shared_ptr<SyncSession> fresh_sync_session;
162✔
485
    {
162✔
486
        util::CheckedLockGuard config_lock(m_config_mutex);
162✔
487
        RealmConfig config = m_config;
162✔
488
        config.path = fresh_path;
162✔
489
        // in case of migrations use the migrated config
81✔
490
        auto fresh_config = m_migrated_sync_config ? *m_migrated_sync_config : *m_config.sync_config;
148✔
491
        // deep copy the sync config so we don't modify the live session's config
81✔
492
        config.sync_config = std::make_shared<SyncConfig>(fresh_config);
162✔
493
        config.sync_config->client_resync_mode = ClientResyncMode::Manual;
162✔
494
        fresh_sync_session = m_sync_manager->get_session(db, config);
162✔
495
        auto& history = static_cast<sync::ClientReplication&>(*db->get_replication());
162✔
496
        // the fresh Realm may apply writes to this db after it has outlived its sync session
81✔
497
        // the writes are used to generate a changeset for recovery, but are never committed
81✔
498
        history.set_write_validator_factory({});
162✔
499
    }
162✔
500

81✔
501
    fresh_sync_session->assert_mutex_unlocked();
162✔
502
    // The fresh realm uses flexible sync.
81✔
503
    if (auto fresh_sub_store = fresh_sync_session->get_flx_subscription_store()) {
162✔
504
        auto fresh_sub = fresh_sub_store->get_latest();
64✔
505
        // The local realm uses flexible sync as well so copy the active subscription set to the fresh realm.
32✔
506
        if (auto local_subs_store = m_flx_subscription_store) {
64✔
507
            sync::SubscriptionSet active = local_subs_store->get_active();
46✔
508
            auto fresh_mut_sub = fresh_sub.make_mutable_copy();
46✔
509
            fresh_mut_sub.import(active);
46✔
510
            fresh_sub = fresh_mut_sub.commit();
46✔
511
        }
46✔
512
        fresh_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete)
64✔
513
            .then([=, weak_self = weak_from_this()](sync::SubscriptionSet::State state) {
63✔
514
                if (server_requests_action != sync::ProtocolErrorInfo::Action::MigrateToFLX) {
62✔
515
                    return util::Future<sync::SubscriptionSet::State>::make_ready(state);
44✔
516
                }
44✔
517
                auto strong_self = weak_self.lock();
18✔
518
                if (!strong_self || !strong_self->m_migration_store->is_migration_in_progress()) {
18✔
519
                    return util::Future<sync::SubscriptionSet::State>::make_ready(state);
×
520
                }
×
521

9✔
522
                // fresh_sync_session is using a new realm file that doesn't have the migration_store info
9✔
523
                // so the query string from the local migration store will need to be provided
9✔
524
                auto query_string = strong_self->m_migration_store->get_query_string();
18✔
525
                REALM_ASSERT(query_string);
18✔
526
                // Create subscriptions in the fresh realm based on the schema instructions received in the bootstrap
9✔
527
                // message.
9✔
528
                fresh_sync_session->m_migration_store->create_subscriptions(*fresh_sub_store, *query_string);
18✔
529
                auto latest_subs = fresh_sub_store->get_latest();
18✔
530
                {
18✔
531
                    util::CheckedLockGuard lock(strong_self->m_state_mutex);
18✔
532
                    // Save a copy of the subscriptions so we add them to the local realm once the
9✔
533
                    // subscription store is created.
9✔
534
                    strong_self->m_active_subscriptions_after_migration = latest_subs;
18✔
535
                }
18✔
536

9✔
537
                return latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
18✔
538
            })
18✔
539
            .get_async([=, weak_self = weak_from_this()](StatusWith<sync::SubscriptionSet::State> s) {
64✔
540
                // Keep the sync session alive while it's downloading, but then close
32✔
541
                // it immediately
32✔
542
                fresh_sync_session->force_close();
64✔
543
                if (auto strong_self = weak_self.lock()) {
64✔
544
                    if (s.is_ok()) {
64✔
545
                        strong_self->handle_fresh_realm_downloaded(db, Status::OK(), server_requests_action);
62✔
546
                    }
62✔
547
                    else {
2✔
548
                        strong_self->handle_fresh_realm_downloaded(nullptr, s.get_status(), server_requests_action);
2✔
549
                    }
2✔
550
                }
64✔
551
            });
64✔
552
    }
64✔
553
    else { // pbs
98✔
554
        fresh_sync_session->wait_for_download_completion([=, weak_self = weak_from_this()](Status s) {
98✔
555
            // Keep the sync session alive while it's downloading, but then close
49✔
556
            // it immediately
49✔
557
            fresh_sync_session->force_close();
98✔
558
            if (auto strong_self = weak_self.lock()) {
98✔
559
                strong_self->handle_fresh_realm_downloaded(db, s, server_requests_action);
98✔
560
            }
98✔
561
        });
98✔
562
    }
98✔
563
    fresh_sync_session->revive_if_needed();
162✔
564
}
162✔
565

566
void SyncSession::handle_fresh_realm_downloaded(DBRef db, Status status,
567
                                                sync::ProtocolErrorInfo::Action server_requests_action)
568
{
172✔
569
    util::CheckedUniqueLock lock(m_state_mutex);
172✔
570
    if (m_state != State::Active) {
172✔
571
        return;
×
572
    }
×
573
    // The download can fail for many reasons. For example:
86✔
574
    // - unable to write the fresh copy to the file system
86✔
575
    // - during download of the fresh copy, the fresh copy itself is reset
86✔
576
    // - in FLX mode there was a problem fulfilling the previously active subscription
86✔
577
    if (!status.is_ok()) {
172✔
578
        if (status == ErrorCodes::OperationAborted) {
18✔
579
            return;
6✔
580
        }
6✔
581
        lock.unlock();
12✔
582

6✔
583
        sync::SessionErrorInfo synthetic(
12✔
584
            Status{ErrorCodes::AutoClientResetFailed,
12✔
585
                   util::format("A fatal error occurred during client reset: '%1'", status.reason())},
12✔
586
            sync::IsFatal{true});
12✔
587
        handle_error(synthetic);
12✔
588
        return;
12✔
589
    }
12✔
590

77✔
591
    // Performing a client reset requires tearing down our current
77✔
592
    // sync session and creating a new one with the relevant client reset config. This
77✔
593
    // will result in session completion handlers firing
77✔
594
    // when the old session is torn down, which we don't want as this
77✔
595
    // is supposed to be transparent to the user.
77✔
596
    //
77✔
597
    // To avoid this, we need to move the completion handlers aside temporarily so
77✔
598
    // that moving to the inactive state doesn't clear them - they will be
77✔
599
    // re-registered when the session becomes active again.
77✔
600
    {
154✔
601
        m_server_requests_action = server_requests_action;
154✔
602
        m_client_reset_fresh_copy = db;
154✔
603
        CompletionCallbacks callbacks;
154✔
604
        std::swap(m_completion_callbacks, callbacks);
154✔
605
        // always swap back, even if advance_state throws
77✔
606
        auto guard = util::make_scope_exit([&]() noexcept {
154✔
607
            util::CheckedUniqueLock lock(m_state_mutex);
154✔
608
            if (m_completion_callbacks.empty())
154✔
609
                std::swap(callbacks, m_completion_callbacks);
154✔
610
            else
×
611
                m_completion_callbacks.merge(std::move(callbacks));
×
612
        });
154✔
613
        become_inactive(std::move(lock)); // unlocks the lock
154✔
614

77✔
615
        // Once the session is inactive, update sync config and subscription store after migration.
77✔
616
        if (server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
154✔
617
            server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS) {
145✔
618
            apply_sync_config_after_migration_or_rollback();
26✔
619
            auto flx_sync_requested = config(&SyncConfig::flx_sync_requested);
26✔
620
            update_subscription_store(flx_sync_requested);
26✔
621
        }
26✔
622
    }
154✔
623
    revive_if_needed();
154✔
624
}
154✔
625

626
void SyncSession::OnlyForTesting::handle_error(SyncSession& session, sync::SessionErrorInfo&& error)
627
{
16✔
628
    session.handle_error(std::move(error));
16✔
629
}
16✔
630

631
// This method should only be called from within the error handler callback registered upon the underlying
632
// `m_session`.
633
void SyncSession::handle_error(sync::SessionErrorInfo error)
634
{
542✔
635
    enum class NextStateAfterError { none, inactive, error };
542✔
636
    auto next_state = error.is_fatal ? NextStateAfterError::error : NextStateAfterError::none;
412✔
637
    util::Optional<ShouldBackup> delete_file;
542✔
638
    bool log_out_user = false;
542✔
639
    bool unrecognized_by_client = false;
542✔
640

278✔
641
    if (error.status == ErrorCodes::AutoClientResetFailed) {
542✔
642
        // At this point, automatic recovery has been attempted but it failed.
22✔
643
        // Fallback to a manual reset and let the user try to handle it.
22✔
644
        next_state = NextStateAfterError::inactive;
44✔
645
        delete_file = ShouldBackup::yes;
44✔
646
    }
44✔
647
    else if (error.server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
498✔
648
        switch (error.server_requests_action) {
488✔
649
            case sync::ProtocolErrorInfo::Action::NoAction:
✔
650
                REALM_UNREACHABLE(); // This is not sent by the MongoDB server
×
651
            case sync::ProtocolErrorInfo::Action::ApplicationBug:
19✔
652
                [[fallthrough]];
19✔
653
            case sync::ProtocolErrorInfo::Action::ProtocolViolation:
25✔
654
                next_state = NextStateAfterError::inactive;
25✔
655
                break;
25✔
656
            case sync::ProtocolErrorInfo::Action::Warning:
20✔
657
                break; // not fatal, but should be bubbled up to the user below.
20✔
658
            case sync::ProtocolErrorInfo::Action::Transient:
223✔
659
                // Not real errors, don't need to be reported to the binding.
120✔
660
                return;
223✔
661
            case sync::ProtocolErrorInfo::Action::DeleteRealm:
8✔
662
                next_state = NextStateAfterError::inactive;
×
663
                delete_file = ShouldBackup::no;
×
664
                break;
×
665
            case sync::ProtocolErrorInfo::Action::ClientReset:
164✔
666
                [[fallthrough]];
164✔
667
            case sync::ProtocolErrorInfo::Action::ClientResetNoRecovery:
168✔
668
                switch (config(&SyncConfig::client_resync_mode)) {
168✔
669
                    case ClientResyncMode::Manual:
24✔
670
                        next_state = NextStateAfterError::inactive;
24✔
671
                        delete_file = ShouldBackup::yes;
24✔
672
                        break;
24✔
673
                    case ClientResyncMode::DiscardLocal:
78✔
674
                        [[fallthrough]];
78✔
675
                    case ClientResyncMode::RecoverOrDiscard:
90✔
676
                        [[fallthrough]];
90✔
677
                    case ClientResyncMode::Recover:
144✔
678
                        download_fresh_realm(error.server_requests_action);
144✔
679
                        return; // do not propgate the error to the user at this point
144✔
680
                }
24✔
681
                break;
24✔
682
            case sync::ProtocolErrorInfo::Action::MigrateToFLX:
21✔
683
                // Should not receive this error if original sync config is FLX
9✔
684
                REALM_ASSERT(!m_original_sync_config->flx_sync_requested);
18✔
685
                REALM_ASSERT(error.migration_query_string && !error.migration_query_string->empty());
18✔
686
                // Original config was PBS, migrating to FLX
9✔
687
                m_migration_store->migrate_to_flx(*error.migration_query_string,
18✔
688
                                                  m_original_sync_config->partition_value);
18✔
689
                save_sync_config_after_migration_or_rollback();
18✔
690
                download_fresh_realm(error.server_requests_action);
18✔
691
                return;
18✔
692
            case sync::ProtocolErrorInfo::Action::RevertToPBS:
17✔
693
                // If the client was updated to use FLX natively, but the server was rolled back to PBS,
5✔
694
                // the server should be sending switch_to_flx_sync; throw exception if this error is not
5✔
695
                // received.
5✔
696
                if (m_original_sync_config->flx_sync_requested) {
10✔
697
                    throw LogicError(ErrorCodes::InvalidServerResponse,
×
698
                                     "Received 'RevertToPBS' from server after rollback while client is natively "
×
699
                                     "using FLX - expected 'SwitchToPBS'");
×
700
                }
×
701
                // Original config was PBS, rollback the migration
5✔
702
                m_migration_store->rollback_to_pbs();
10✔
703
                save_sync_config_after_migration_or_rollback();
10✔
704
                download_fresh_realm(error.server_requests_action);
10✔
705
                return;
10✔
706
            case sync::ProtocolErrorInfo::Action::RefreshUser:
18✔
707
                if (auto u = user()) {
18✔
708
                    u->refresh_custom_data(false, handle_refresh(shared_from_this(), false));
18✔
709
                    return;
18✔
710
                }
18✔
711
                break;
×
712
            case sync::ProtocolErrorInfo::Action::RefreshLocation:
6✔
713
                if (auto u = user()) {
6✔
714
                    u->refresh_custom_data(true, handle_refresh(shared_from_this(), true));
6✔
715
                    return;
6✔
716
                }
6✔
717
                break;
×
718
            case sync::ProtocolErrorInfo::Action::LogOutUser:
✔
719
                next_state = NextStateAfterError::inactive;
×
720
                log_out_user = true;
×
721
                break;
×
722
        }
10✔
723
    }
10✔
724
    else {
10✔
725
        // Unrecognized error code.
5✔
726
        unrecognized_by_client = true;
10✔
727
    }
10✔
728

278✔
729
    util::CheckedUniqueLock lock(m_state_mutex);
341✔
730
    SyncError sync_error{error.status, error.is_fatal, error.log_url, std::move(error.compensating_writes)};
123✔
731
    // `action` is used over `shouldClientReset` and `isRecoveryModeDisabled`.
60✔
732
    sync_error.server_requests_action = error.server_requests_action;
123✔
733
    sync_error.is_unrecognized_by_client = unrecognized_by_client;
123✔
734

60✔
735
    if (delete_file)
123✔
736
        update_error_and_mark_file_for_deletion(sync_error, *delete_file);
68✔
737

60✔
738
    if (m_state == State::Dying && error.is_fatal) {
123✔
739
        become_inactive(std::move(lock), error.status);
2✔
740
        return;
2✔
741
    }
2✔
742

59✔
743
    // Don't bother invoking m_config.error_handler if the sync is inactive.
59✔
744
    // It does not make sense to call the handler when the session is closed.
59✔
745
    if (m_state == State::Inactive || m_state == State::Paused) {
121✔
746
        return;
×
747
    }
×
748

59✔
749
    switch (next_state) {
121✔
750
        case NextStateAfterError::none:
28✔
751
            if (config(&SyncConfig::cancel_waits_on_nonfatal_error)) {
28✔
752
                cancel_pending_waits(std::move(lock), sync_error.status); // unlocks the mutex
×
753
            }
×
754
            break;
28✔
755
        case NextStateAfterError::inactive: {
91✔
756
            become_inactive(std::move(lock), sync_error.status);
91✔
757
            break;
91✔
758
        }
×
759
        case NextStateAfterError::error: {
2✔
760
            cancel_pending_waits(std::move(lock), sync_error.status);
2✔
761
            break;
2✔
762
        }
121✔
763
    }
121✔
764

59✔
765
    if (log_out_user) {
121✔
766
        if (auto u = user())
×
767
            u->log_out();
×
768
    }
×
769

59✔
770
    if (auto error_handler = config(&SyncConfig::error_handler)) {
121✔
771
        error_handler(shared_from_this(), std::move(sync_error));
121✔
772
    }
121✔
773
}
121✔
774

775
void SyncSession::cancel_pending_waits(util::CheckedUniqueLock lock, Status error,
776
                                       std::optional<Status> subs_notify_error)
777
{
18✔
778
    CompletionCallbacks callbacks;
18✔
779
    std::swap(callbacks, m_completion_callbacks);
18✔
780

9✔
781
    // Inform any waiters on pending subscription states that they were cancelled
9✔
782
    if (subs_notify_error && m_flx_subscription_store) {
18!
783
        auto subscription_store = m_flx_subscription_store;
×
784
        m_state_mutex.unlock(lock);
×
785
        subscription_store->notify_all_state_change_notifications(*subs_notify_error);
×
786
    }
×
787
    else {
18✔
788
        m_state_mutex.unlock(lock);
18✔
789
    }
18✔
790

9✔
791
    // Inform any queued-up completion handlers that they were cancelled.
9✔
792
    for (auto& [id, callback] : callbacks)
18✔
793
        callback.second(error);
14✔
794
}
18✔
795

796
void SyncSession::handle_progress_update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded,
797
                                         uint64_t uploadable, uint64_t download_version, uint64_t snapshot_version)
798
{
9,790✔
799
    m_progress_notifier.update(downloaded, downloadable, uploaded, uploadable, download_version, snapshot_version);
9,790✔
800
}
9,790✔
801

802
static sync::Session::Config::ClientReset make_client_reset_config(const RealmConfig& base_config,
803
                                                                   const std::shared_ptr<SyncConfig>& sync_config,
804
                                                                   DBRef&& fresh_copy, bool recovery_is_allowed)
805
{
154✔
806
    REALM_ASSERT(sync_config->client_resync_mode != ClientResyncMode::Manual);
154✔
807

77✔
808
    sync::Session::Config::ClientReset config;
154✔
809
    config.mode = sync_config->client_resync_mode;
154✔
810
    config.fresh_copy = std::move(fresh_copy);
154✔
811
    config.recovery_is_allowed = recovery_is_allowed;
154✔
812

77✔
813
    // The conditions here are asymmetric because if we have *either* a before
77✔
814
    // or after callback we need to make sure to initialize the local schema
77✔
815
    // before the client reset happens.
77✔
816
    if (!sync_config->notify_before_client_reset && !sync_config->notify_after_client_reset)
154✔
817
        return config;
26✔
818

64✔
819
    RealmConfig realm_config = base_config;
128✔
820
    realm_config.sync_config = std::make_shared<SyncConfig>(*sync_config); // deep copy
128✔
821
    realm_config.scheduler = util::Scheduler::make_dummy();
128✔
822

64✔
823
    if (sync_config->notify_after_client_reset) {
128✔
824
        config.notify_after_client_reset = [realm_config](VersionID previous_version, bool did_recover) {
110✔
825
            auto coordinator = _impl::RealmCoordinator::get_coordinator(realm_config);
94✔
826
            ThreadSafeReference active_after = coordinator->get_unbound_realm();
94✔
827
            SharedRealm frozen_before = coordinator->get_realm(realm_config, previous_version);
94✔
828
            REALM_ASSERT(frozen_before);
94✔
829
            REALM_ASSERT(frozen_before->is_frozen());
94✔
830
            realm_config.sync_config->notify_after_client_reset(std::move(frozen_before), std::move(active_after),
94✔
831
                                                                did_recover);
94✔
832
        };
94✔
833
    }
126✔
834
    config.notify_before_client_reset = [config = std::move(realm_config)]() -> VersionID {
128✔
835
        // Opening the Realm live here may make a write if the schema is different
64✔
836
        // than what exists on disk. It is necessary to pass a fully usable Realm
64✔
837
        // to the user here. Note that the schema changes made here will be considered
64✔
838
        // an "offline write" to be recovered if this is recovery mode.
64✔
839
        auto before = Realm::get_shared_realm(config);
128✔
840
        before->read_group();
128✔
841
        if (auto& notify_before = config.sync_config->notify_before_client_reset) {
128✔
842
            notify_before(config.sync_config->freeze_before_reset_realm ? before->freeze() : before);
124✔
843
        }
126✔
844
        // Note that if the SDK requested a live Realm this may be a different
64✔
845
        // version than what we had before calling the callback.
64✔
846
        return before->read_transaction_version();
128✔
847
    };
128✔
848

64✔
849
    return config;
128✔
850
}
128✔
851

852
void SyncSession::create_sync_session()
853
{
1,662✔
854
    if (m_session)
1,662✔
855
        return;
×
856

744✔
857
    util::CheckedLockGuard config_lock(m_config_mutex);
1,662✔
858

744✔
859
    REALM_ASSERT(m_config.sync_config);
1,662✔
860
    SyncConfig& sync_config = *m_config.sync_config;
1,662✔
861
    REALM_ASSERT(sync_config.user);
1,662✔
862

744✔
863
    sync::Session::Config session_config;
1,662✔
864
    session_config.signed_user_token = sync_config.user->access_token();
1,662✔
865
    session_config.user_id = sync_config.user->identity();
1,662✔
866
    session_config.realm_identifier = sync_config.partition_value;
1,662✔
867
    session_config.verify_servers_ssl_certificate = sync_config.client_validate_ssl;
1,662✔
868
    session_config.ssl_trust_certificate_path = sync_config.ssl_trust_certificate_path;
1,662✔
869
    session_config.ssl_verify_callback = sync_config.ssl_verify_callback;
1,662✔
870
    session_config.proxy_config = sync_config.proxy_config;
1,662✔
871
    session_config.simulate_integration_error = sync_config.simulate_integration_error;
1,662✔
872
    session_config.flx_bootstrap_batch_size_bytes = sync_config.flx_bootstrap_batch_size_bytes;
1,662✔
873
    session_config.session_reason = ClientResetOperation::is_fresh_path(m_config.path)
1,662✔
874
                                        ? sync::SessionReason::ClientReset
821✔
875
                                        : sync::SessionReason::Sync;
1,585✔
876

744✔
877
    if (sync_config.on_sync_client_event_hook) {
1,662✔
878
        session_config.on_sync_client_event_hook = [hook = sync_config.on_sync_client_event_hook,
58✔
879
                                                    anchor = weak_from_this()](const SyncClientHookData& data) {
424✔
880
            return hook(anchor, data);
424✔
881
        };
424✔
882
    }
58✔
883

744✔
884
    {
1,662✔
885
        std::string sync_route = m_sync_manager->sync_route();
1,662✔
886

744✔
887
        if (!m_client.decompose_server_url(sync_route, session_config.protocol_envelope,
1,662✔
888
                                           session_config.server_address, session_config.server_port,
1,662✔
889
                                           session_config.service_identifier)) {
744✔
890
            throw sync::BadServerUrl(sync_route);
×
891
        }
×
892
        // FIXME: Java needs the fully resolved URL for proxy support, but we also need it before
744✔
893
        // the session is created. How to resolve this?
744✔
894
        m_server_url = sync_route;
1,662✔
895
    }
1,662✔
896

744✔
897
    if (sync_config.authorization_header_name) {
1,662✔
898
        session_config.authorization_header_name = *sync_config.authorization_header_name;
×
899
    }
×
900
    session_config.custom_http_headers = sync_config.custom_http_headers;
1,662✔
901

744✔
902
    if (m_server_requests_action != sync::ProtocolErrorInfo::Action::NoAction) {
1,662✔
903
        // Migrations are allowed to recover local data.
77✔
904
        const bool allowed_to_recover = m_server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset ||
154✔
905
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::MigrateToFLX ||
91✔
906
                                        m_server_requests_action == sync::ProtocolErrorInfo::Action::RevertToPBS;
82✔
907
        // Use the original sync config, not the updated one from the migration store
77✔
908
        session_config.client_reset_config = make_client_reset_config(
154✔
909
            m_config, m_original_sync_config, std::move(m_client_reset_fresh_copy), allowed_to_recover);
154✔
910
        m_server_requests_action = sync::ProtocolErrorInfo::Action::NoAction;
154✔
911
    }
154✔
912

744✔
913
    m_session = m_client.make_session(m_db, m_flx_subscription_store, m_migration_store, std::move(session_config));
1,662✔
914

744✔
915
    std::weak_ptr<SyncSession> weak_self = weak_from_this();
1,662✔
916

744✔
917
    // Configure the sync transaction callback.
744✔
918
    auto wrapped_callback = [weak_self](VersionID old_version, VersionID new_version) {
1,571✔
919
        std::function<TransactionCallback> callback;
1,571✔
920
        if (auto self = weak_self.lock()) {
1,571✔
921
            util::CheckedLockGuard l(self->m_state_mutex);
1,559✔
922
            callback = self->m_sync_transact_callback;
1,559✔
923
        }
1,559✔
924
        if (callback) {
1,571✔
925
            callback(old_version, new_version);
1,345✔
926
        }
1,345✔
927
    };
1,571✔
928
    m_session->set_sync_transact_callback(std::move(wrapped_callback));
1,662✔
929

744✔
930
    // Set up the wrapped progress handler callback
744✔
931
    m_session->set_progress_handler([weak_self](uint_fast64_t downloaded, uint_fast64_t downloadable,
1,662✔
932
                                                uint_fast64_t uploaded, uint_fast64_t uploadable,
1,662✔
933
                                                uint_fast64_t progress_version, uint_fast64_t snapshot_version) {
10,050✔
934
        if (auto self = weak_self.lock()) {
10,050✔
935
            self->handle_progress_update(downloaded, downloadable, uploaded, uploadable, progress_version,
9,790✔
936
                                         snapshot_version);
9,790✔
937
        }
9,790✔
938
    });
10,050✔
939

744✔
940
    // Sets up the connection state listener. This callback is used for both reporting errors as well as changes to
744✔
941
    // the connection state.
744✔
942
    m_session->set_connection_state_change_listener(
1,662✔
943
        [weak_self](sync::ConnectionState state, util::Optional<sync::SessionErrorInfo> error) {
4,026✔
944
            // If the OS SyncSession object is destroyed, we ignore any events from the underlying Session as there is
1,887✔
945
            // nothing useful we can do with them.
1,887✔
946
            auto self = weak_self.lock();
4,026✔
947
            if (!self) {
4,026✔
948
                return;
25✔
949
            }
25✔
950
            using cs = sync::ConnectionState;
4,001✔
951
            ConnectionState new_state = [&] {
4,001✔
952
                switch (state) {
4,001✔
953
                    case cs::disconnected:
490✔
954
                        return ConnectionState::Disconnected;
490✔
955
                    case cs::connecting:
1,805✔
956
                        return ConnectionState::Connecting;
1,805✔
957
                    case cs::connected:
1,706✔
958
                        return ConnectionState::Connected;
1,706✔
959
                }
×
960
                REALM_UNREACHABLE();
×
961
            }();
×
962
            util::CheckedUniqueLock lock(self->m_connection_state_mutex);
4,001✔
963
            auto old_state = self->m_connection_state;
4,001✔
964
            self->m_connection_state = new_state;
4,001✔
965
            lock.unlock();
4,001✔
966

1,879✔
967
            if (old_state != new_state) {
4,001✔
968
                self->m_connection_change_notifier.invoke_callbacks(old_state, new_state);
3,965✔
969
            }
3,965✔
970

1,879✔
971
            if (error) {
4,001✔
972
                self->handle_error(std::move(*error));
514✔
973
            }
514✔
974
        });
4,001✔
975
}
1,662✔
976

977
void SyncSession::set_sync_transact_callback(std::function<sync::Session::SyncTransactCallback>&& callback)
978
{
1,199✔
979
    util::CheckedLockGuard l(m_state_mutex);
1,199✔
980
    m_sync_transact_callback = std::move(callback);
1,199✔
981
}
1,199✔
982

983
void SyncSession::nonsync_transact_notify(sync::version_type version)
984
{
5,869✔
985
    m_progress_notifier.set_local_version(version);
5,869✔
986

2,520✔
987
    util::CheckedUniqueLock lock(m_state_mutex);
5,869✔
988
    switch (m_state) {
5,869✔
989
        case State::Active:
5,354✔
990
        case State::WaitingForAccessToken:
5,354✔
991
            if (m_session) {
5,354✔
992
                m_session->nonsync_transact_notify(version);
5,354✔
993
            }
5,354✔
994
            break;
5,354✔
995
        case State::Dying:
2,325✔
996
        case State::Inactive:
326✔
997
        case State::Paused:
515✔
998
            break;
515✔
999
    }
5,869✔
1000
}
5,869✔
1001

1002
void SyncSession::revive_if_needed()
1003
{
1,990✔
1004
    util::CheckedUniqueLock lock(m_state_mutex);
1,990✔
1005
    switch (m_state) {
1,990✔
1006
        case State::Active:
465✔
1007
        case State::WaitingForAccessToken:
465✔
1008
        case State::Paused:
468✔
1009
            return;
468✔
1010
        case State::Dying:
675✔
1011
        case State::Inactive:
1,522✔
1012
            do_revive(std::move(lock));
1,522✔
1013
            break;
1,522✔
1014
    }
1,990✔
1015
}
1,990✔
1016

1017
void SyncSession::handle_reconnect()
1018
{
×
1019
    util::CheckedUniqueLock lock(m_state_mutex);
×
1020
    switch (m_state) {
×
1021
        case State::Active:
×
1022
            m_session->cancel_reconnect_delay();
×
1023
            break;
×
1024
        case State::Dying:
×
1025
        case State::Inactive:
×
1026
        case State::WaitingForAccessToken:
×
1027
        case State::Paused:
×
1028
            break;
×
1029
    }
×
1030
}
×
1031

1032
void SyncSession::force_close()
1033
{
233✔
1034
    util::CheckedUniqueLock lock(m_state_mutex);
233✔
1035
    switch (m_state) {
233✔
1036
        case State::Active:
218✔
1037
        case State::Dying:
218✔
1038
        case State::WaitingForAccessToken:
221✔
1039
            become_inactive(std::move(lock));
221✔
1040
            break;
221✔
1041
        case State::Inactive:
104✔
1042
        case State::Paused:
12✔
1043
            break;
12✔
1044
    }
233✔
1045
}
233✔
1046

1047
void SyncSession::pause()
1048
{
148✔
1049
    util::CheckedUniqueLock lock(m_state_mutex);
148✔
1050
    switch (m_state) {
148✔
1051
        case State::Active:
145✔
1052
        case State::Dying:
145✔
1053
        case State::WaitingForAccessToken:
145✔
1054
        case State::Inactive:
146✔
1055
            become_paused(std::move(lock));
146✔
1056
            break;
146✔
1057
        case State::Paused:
74✔
1058
            break;
2✔
1059
    }
148✔
1060
}
148✔
1061

1062
void SyncSession::resume()
1063
{
142✔
1064
    util::CheckedUniqueLock lock(m_state_mutex);
142✔
1065
    switch (m_state) {
142✔
1066
        case State::Active:
✔
1067
        case State::WaitingForAccessToken:
✔
1068
            return;
×
1069
        case State::Paused:
142✔
1070
        case State::Dying:
142✔
1071
        case State::Inactive:
142✔
1072
            do_revive(std::move(lock));
142✔
1073
            break;
142✔
1074
    }
142✔
1075
}
142✔
1076

1077
void SyncSession::do_revive(util::CheckedUniqueLock&& lock)
1078
{
1,664✔
1079
    auto u = user();
1,664✔
1080
    if (!u || !u->access_token_refresh_required()) {
1,664✔
1081
        become_active();
1,646✔
1082
        m_state_mutex.unlock(lock);
1,646✔
1083
        return;
1,646✔
1084
    }
1,646✔
1085

9✔
1086
    become_waiting_for_access_token();
18✔
1087
    // Release the lock for SDKs with a single threaded
9✔
1088
    // networking implementation such as our test suite
9✔
1089
    // so that the update can trigger a state change from
9✔
1090
    // the completion handler.
9✔
1091
    m_state_mutex.unlock(lock);
18✔
1092
    initiate_access_token_refresh();
18✔
1093
}
18✔
1094

1095
void SyncSession::close()
1096
{
100✔
1097
    util::CheckedUniqueLock lock(m_state_mutex);
100✔
1098
    close(std::move(lock));
100✔
1099
}
100✔
1100

1101
void SyncSession::close(util::CheckedUniqueLock lock)
1102
{
1,443✔
1103
    switch (m_state) {
1,443✔
1104
        case State::Active: {
978✔
1105
            switch (config(&SyncConfig::stop_policy)) {
978✔
1106
                case SyncSessionStopPolicy::Immediately:
906✔
1107
                    become_inactive(std::move(lock));
906✔
1108
                    break;
906✔
1109
                case SyncSessionStopPolicy::LiveIndefinitely:
✔
1110
                    // Don't do anything; session lives forever.
1111
                    m_state_mutex.unlock(lock);
×
1112
                    break;
×
1113
                case SyncSessionStopPolicy::AfterChangesUploaded:
72✔
1114
                    // Wait for all pending changes to upload.
28✔
1115
                    become_dying(std::move(lock));
72✔
1116
                    break;
72✔
1117
            }
978✔
1118
            break;
978✔
1119
        }
978✔
1120
        case State::Dying:
459✔
1121
            m_state_mutex.unlock(lock);
18✔
1122
            break;
18✔
1123
        case State::Paused:
444✔
1124
        case State::Inactive: {
445✔
1125
            // We need to register from the sync manager if it still exists so that we don't end up
156✔
1126
            // holding the DBRef open after the session is closed. Otherwise we can end up preventing
156✔
1127
            // the user from deleting the realm when it's in the paused/inactive state.
156✔
1128
            if (m_sync_manager) {
445✔
1129
                m_sync_manager->unregister_session(m_db->get_path());
441✔
1130
            }
441✔
1131
            m_state_mutex.unlock(lock);
445✔
1132
            break;
445✔
1133
        }
158✔
1134
        case State::WaitingForAccessToken:
157✔
1135
            // Immediately kill the session.
1✔
1136
            become_inactive(std::move(lock));
2✔
1137
            break;
2✔
1138
    }
1,443✔
1139
}
1,443✔
1140

1141
void SyncSession::shutdown_and_wait()
1142
{
121✔
1143
    {
121✔
1144
        // Transition immediately to `inactive` state. Calling this function must gurantee that any
10✔
1145
        // sync::Session object in SyncSession::m_session that existed prior to the time of invocation
10✔
1146
        // must have been destroyed upon return. This allows the caller to follow up with a call to
10✔
1147
        // sync::Client::wait_for_session_terminations_or_client_stopped() in order to wait for the
10✔
1148
        // Realm file to be closed. This works so long as this SyncSession object remains in the
10✔
1149
        // `inactive` state after the invocation of shutdown_and_wait().
10✔
1150
        util::CheckedUniqueLock lock(m_state_mutex);
121✔
1151
        if (m_state != State::Inactive && m_state != State::Paused) {
121✔
1152
            become_inactive(std::move(lock));
65✔
1153
        }
65✔
1154
    }
121✔
1155
    m_client.wait_for_session_terminations();
121✔
1156
}
121✔
1157

1158
void SyncSession::update_access_token(const std::string& signed_token)
1159
{
14✔
1160
    util::CheckedUniqueLock lock(m_state_mutex);
14✔
1161
    // We don't expect there to be a session when waiting for access token, but if there is, refresh its token.
7✔
1162
    // If not, the latest token will be seeded from SyncUser::access_token() on session creation.
7✔
1163
    if (m_session) {
14✔
1164
        m_session->refresh(signed_token);
2✔
1165
    }
2✔
1166
    if (m_state == State::WaitingForAccessToken) {
14✔
1167
        become_active();
8✔
1168
    }
8✔
1169
}
14✔
1170

1171
void SyncSession::initiate_access_token_refresh()
1172
{
20✔
1173
    if (auto session_user = user()) {
20✔
1174
        session_user->refresh_custom_data(handle_refresh(shared_from_this(), false));
20✔
1175
    }
20✔
1176
}
20✔
1177

1178
void SyncSession::add_completion_callback(util::UniqueFunction<void(Status)> callback,
1179
                                          _impl::SyncProgressNotifier::NotifierType direction)
1180
{
2,105✔
1181
    bool is_download = (direction == _impl::SyncProgressNotifier::NotifierType::download);
2,105✔
1182

1,010✔
1183
    m_completion_request_counter++;
2,105✔
1184
    m_completion_callbacks.emplace_hint(m_completion_callbacks.end(), m_completion_request_counter,
2,105✔
1185
                                        std::make_pair(direction, std::move(callback)));
2,105✔
1186
    // If the state is inactive then just store the callback and return. The callback will get
1,010✔
1187
    // re-registered with the underlying session if/when the session ever becomes active again.
1,010✔
1188
    if (!m_session) {
2,105✔
1189
        return;
191✔
1190
    }
191✔
1191

922✔
1192
    auto waiter = is_download ? &sync::Session::async_wait_for_download_completion
1,914✔
1193
                              : &sync::Session::async_wait_for_upload_completion;
1,422✔
1194

922✔
1195
    (m_session.get()->*waiter)([weak_self = weak_from_this(), id = m_completion_request_counter](Status status) {
1,914✔
1196
        auto self = weak_self.lock();
1,914✔
1197
        if (!self)
1,914✔
1198
            return;
33✔
1199
        util::CheckedUniqueLock lock(self->m_state_mutex);
1,881✔
1200
        auto callback_node = self->m_completion_callbacks.extract(id);
1,881✔
1201
        lock.unlock();
1,881✔
1202
        if (callback_node) {
1,881✔
1203
            callback_node.mapped().second(std::move(status));
1,707✔
1204
        }
1,707✔
1205
    });
1,881✔
1206
}
1,914✔
1207

1208
void SyncSession::wait_for_upload_completion(util::UniqueFunction<void(Status)>&& callback)
1209
{
815✔
1210
    util::CheckedUniqueLock lock(m_state_mutex);
815✔
1211
    add_completion_callback(std::move(callback), ProgressDirection::upload);
815✔
1212
}
815✔
1213

1214
void SyncSession::wait_for_download_completion(util::UniqueFunction<void(Status)>&& callback)
1215
{
964✔
1216
    util::CheckedUniqueLock lock(m_state_mutex);
964✔
1217
    add_completion_callback(std::move(callback), ProgressDirection::download);
964✔
1218
}
964✔
1219

1220
uint64_t SyncSession::register_progress_notifier(std::function<ProgressNotifierCallback>&& notifier,
1221
                                                 ProgressDirection direction, bool is_streaming)
1222
{
4✔
1223
    return m_progress_notifier.register_callback(std::move(notifier), direction, is_streaming);
4✔
1224
}
4✔
1225

1226
void SyncSession::unregister_progress_notifier(uint64_t token)
1227
{
4✔
1228
    m_progress_notifier.unregister_callback(token);
4✔
1229
}
4✔
1230

1231
uint64_t SyncSession::register_connection_change_callback(std::function<ConnectionStateChangeCallback>&& callback)
1232
{
6✔
1233
    return m_connection_change_notifier.add_callback(std::move(callback));
6✔
1234
}
6✔
1235

1236
void SyncSession::unregister_connection_change_callback(uint64_t token)
1237
{
2✔
1238
    m_connection_change_notifier.remove_callback(token);
2✔
1239
}
2✔
1240

1241
SyncSession::~SyncSession() {}
1,342✔
1242

1243
SyncSession::State SyncSession::state() const
1244
{
13,286✔
1245
    util::CheckedUniqueLock lock(m_state_mutex);
13,286✔
1246
    return m_state;
13,286✔
1247
}
13,286✔
1248

1249
SyncSession::ConnectionState SyncSession::connection_state() const
1250
{
2,434✔
1251
    util::CheckedUniqueLock lock(m_connection_state_mutex);
2,434✔
1252
    return m_connection_state;
2,434✔
1253
}
2,434✔
1254

1255
std::string const& SyncSession::path() const
1256
{
1,508✔
1257
    return m_db->get_path();
1,508✔
1258
}
1,508✔
1259

1260
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_flx_subscription_store()
1261
{
729✔
1262
    util::CheckedLockGuard lock(m_state_mutex);
729✔
1263
    return m_flx_subscription_store;
729✔
1264
}
729✔
1265

1266
std::shared_ptr<sync::SubscriptionStore> SyncSession::get_subscription_store_base()
1267
{
2✔
1268
    util::CheckedLockGuard lock(m_state_mutex);
2✔
1269
    return m_subscription_store_base;
2✔
1270
}
2✔
1271

1272
sync::SaltedFileIdent SyncSession::get_file_ident() const
1273
{
134✔
1274
    auto repl = m_db->get_replication();
134✔
1275
    REALM_ASSERT(repl);
134✔
1276
    REALM_ASSERT(dynamic_cast<sync::ClientReplication*>(repl));
134✔
1277

67✔
1278
    sync::SaltedFileIdent ret;
134✔
1279
    sync::version_type unused_version;
134✔
1280
    sync::SyncProgress unused_progress;
134✔
1281
    static_cast<sync::ClientReplication*>(repl)->get_history().get_status(unused_version, ret, unused_progress);
134✔
1282
    return ret;
134✔
1283
}
134✔
1284

1285
std::string SyncSession::get_appservices_connection_id() const
1286
{
20✔
1287
    util::CheckedLockGuard lk(m_state_mutex);
20✔
1288
    if (!m_session) {
20✔
1289
        return {};
×
1290
    }
×
1291
    return m_session->get_appservices_connection_id();
20✔
1292
}
20✔
1293

1294
void SyncSession::update_configuration(SyncConfig new_config)
1295
{
8✔
1296
    while (true) {
18✔
1297
        util::CheckedUniqueLock state_lock(m_state_mutex);
18✔
1298
        if (m_state != State::Inactive && m_state != State::Paused) {
18✔
1299
            // Changing the state releases the lock, which means that by the
5✔
1300
            // time we reacquire the lock the state may have changed again
5✔
1301
            // (either due to one of the callbacks being invoked or another
5✔
1302
            // thread coincidentally doing something). We just attempt to keep
5✔
1303
            // switching it to inactive until it stays there.
5✔
1304
            become_inactive(std::move(state_lock));
10✔
1305
            continue;
10✔
1306
        }
10✔
1307

4✔
1308
        util::CheckedUniqueLock config_lock(m_config_mutex);
8✔
1309
        REALM_ASSERT(m_state == State::Inactive || m_state == State::Paused);
8!
1310
        REALM_ASSERT(!m_session);
8✔
1311
        REALM_ASSERT(m_config.sync_config->user == new_config.user);
8✔
1312
        // Since this is used for testing purposes only, just update the current sync_config
4✔
1313
        m_config.sync_config = std::make_shared<SyncConfig>(std::move(new_config));
8✔
1314
        break;
8✔
1315
    }
8✔
1316
    revive_if_needed();
8✔
1317
}
8✔
1318

1319
void SyncSession::apply_sync_config_after_migration_or_rollback()
1320
{
26✔
1321
    // Migration state changed - Update the configuration to
13✔
1322
    // match the new sync mode.
13✔
1323
    util::CheckedLockGuard cfg_lock(m_config_mutex);
26✔
1324
    if (!m_migrated_sync_config)
26✔
1325
        return;
2✔
1326

12✔
1327
    m_config.sync_config = m_migrated_sync_config;
24✔
1328
    m_migrated_sync_config.reset();
24✔
1329
}
24✔
1330

1331
void SyncSession::save_sync_config_after_migration_or_rollback()
1332
{
28✔
1333
    util::CheckedLockGuard cfg_lock(m_config_mutex);
28✔
1334
    m_migrated_sync_config = m_migration_store->convert_sync_config(m_original_sync_config);
28✔
1335
}
28✔
1336

1337
void SyncSession::update_subscription_store(bool flx_sync_requested)
1338
{
26✔
1339
    util::CheckedUniqueLock lock(m_state_mutex);
26✔
1340

13✔
1341
    // The session should be closed before updating the FLX subscription store
13✔
1342
    REALM_ASSERT(!m_session);
26✔
1343

13✔
1344
    // If the subscription store exists and switching to PBS, then clear the store
13✔
1345
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
26✔
1346
    if (!flx_sync_requested) {
26✔
1347
        if (m_flx_subscription_store) {
8✔
1348
            // Empty the subscription store and cancel any pending subscription notification
4✔
1349
            // waiters
4✔
1350
            auto subscription_store = std::move(m_flx_subscription_store);
8✔
1351
            lock.unlock();
8✔
1352
            subscription_store->terminate();
8✔
1353
            auto tr = m_db->start_write();
8✔
1354
            history.set_write_validator_factory(nullptr);
8✔
1355
            tr->rollback();
8✔
1356
        }
8✔
1357
        return;
8✔
1358
    }
8✔
1359

9✔
1360
    if (m_flx_subscription_store)
18✔
1361
        return; // Using FLX and subscription store already exists
2✔
1362

8✔
1363
    // Going from PBS -> FLX (or one doesn't exist yet), create a new subscription store
8✔
1364
    create_subscription_store();
16✔
1365

8✔
1366
    std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr(m_flx_subscription_store);
16✔
1367
    lock.unlock();
16✔
1368

8✔
1369
    // If migrated to FLX, create subscriptions in the local realm to cover the existing data.
8✔
1370
    // This needs to be done before setting the write validator to avoid NoSubscriptionForWrite errors.
8✔
1371
    make_active_subscription_set();
16✔
1372

8✔
1373
    auto tr = m_db->start_write();
16✔
1374
    set_write_validator_factory(weak_sub_mgr);
16✔
1375
    tr->rollback();
16✔
1376
}
16✔
1377

1378
void SyncSession::create_subscription_store()
1379
{
409✔
1380
    REALM_ASSERT(!m_flx_subscription_store);
409✔
1381

205✔
1382
    // Create the main subscription store instance when this is first called - this will
205✔
1383
    // remain valid afterwards for the life of the SyncSession, but m_flx_subscription_store
205✔
1384
    // will be reset when rolling back to PBS after a client FLX migration
205✔
1385
    if (!m_subscription_store_base) {
409✔
1386
        m_subscription_store_base = sync::SubscriptionStore::create(m_db, [this](int64_t new_version) {
499✔
1387
            util::CheckedLockGuard lk(m_state_mutex);
499✔
1388
            if (m_state != State::Active && m_state != State::WaitingForAccessToken) {
499✔
1389
                return;
134✔
1390
            }
134✔
1391
            // There may be no session yet (i.e., waiting to refresh the access token).
182✔
1392
            if (m_session) {
365✔
1393
                m_session->on_new_flx_sync_subscription(new_version);
363✔
1394
            }
363✔
1395
        });
365✔
1396
    }
407✔
1397

205✔
1398
    // m_subscription_store_base is always around for the life of SyncSession, but the
205✔
1399
    // m_flx_subscription_store is set when using FLX.
205✔
1400
    m_flx_subscription_store = m_subscription_store_base;
409✔
1401
}
409✔
1402

1403
void SyncSession::set_write_validator_factory(std::weak_ptr<sync::SubscriptionStore> weak_sub_mgr)
1404
{
409✔
1405
    auto& history = static_cast<sync::ClientReplication&>(*m_db->get_replication());
409✔
1406
    history.set_write_validator_factory(
409✔
1407
        [weak_sub_mgr](Transaction& tr) -> util::UniqueFunction<sync::SyncReplication::WriteValidator> {
4,834✔
1408
            auto sub_mgr = weak_sub_mgr.lock();
4,834✔
1409
            REALM_ASSERT_RELEASE(sub_mgr);
4,834✔
1410
            auto latest_sub_tables = sub_mgr->get_tables_for_latest(tr);
4,834✔
1411
            return [tables = std::move(latest_sub_tables)](const Table& table) {
2,826✔
1412
                if (table.get_table_type() != Table::Type::TopLevel) {
799✔
1413
                    return;
424✔
1414
                }
424✔
1415
                auto object_class_name = Group::table_name_to_class_name(table.get_name());
375✔
1416
                if (tables.find(object_class_name) == tables.end()) {
375✔
1417
                    throw NoSubscriptionForWrite(
2✔
1418
                        util::format("Cannot write to class %1 when no flexible sync subscription has been created.",
2✔
1419
                                     object_class_name));
2✔
1420
                }
2✔
1421
            };
375✔
1422
        });
4,834✔
1423
}
409✔
1424

1425
// Represents a reference to the SyncSession from outside of the sync subsystem.
1426
// We attempt to keep the SyncSession in an active state as long as it has an external reference.
1427
class SyncSession::ExternalReference {
1428
public:
1429
    ExternalReference(std::shared_ptr<SyncSession> session)
1430
        : m_session(std::move(session))
1431
    {
1,343✔
1432
    }
1,343✔
1433

1434
    ~ExternalReference()
1435
    {
1,343✔
1436
        m_session->did_drop_external_reference();
1,343✔
1437
    }
1,343✔
1438

1439
private:
1440
    std::shared_ptr<SyncSession> m_session;
1441
};
1442

1443
std::shared_ptr<SyncSession> SyncSession::external_reference()
1444
{
1,559✔
1445
    util::CheckedLockGuard lock(m_external_reference_mutex);
1,559✔
1446

693✔
1447
    if (auto external_reference = m_external_reference.lock())
1,559✔
1448
        return std::shared_ptr<SyncSession>(external_reference, this);
216✔
1449

586✔
1450
    auto external_reference = std::make_shared<ExternalReference>(shared_from_this());
1,343✔
1451
    m_external_reference = external_reference;
1,343✔
1452
    return std::shared_ptr<SyncSession>(external_reference, this);
1,343✔
1453
}
1,343✔
1454

1455
std::shared_ptr<SyncSession> SyncSession::existing_external_reference()
1456
{
2,208✔
1457
    util::CheckedLockGuard lock(m_external_reference_mutex);
2,208✔
1458

900✔
1459
    if (auto external_reference = m_external_reference.lock())
2,208✔
1460
        return std::shared_ptr<SyncSession>(external_reference, this);
865✔
1461

585✔
1462
    return nullptr;
1,343✔
1463
}
1,343✔
1464

1465
void SyncSession::did_drop_external_reference()
1466
{
1,343✔
1467
    util::CheckedUniqueLock lock1(m_state_mutex);
1,343✔
1468
    {
1,343✔
1469
        util::CheckedLockGuard lock2(m_external_reference_mutex);
1,343✔
1470

586✔
1471
        // If the session is being resurrected we should not close the session.
586✔
1472
        if (!m_external_reference.expired())
1,343✔
1473
            return;
×
1474
    }
1,343✔
1475

586✔
1476
    close(std::move(lock1));
1,343✔
1477
}
1,343✔
1478

1479
uint64_t SyncProgressNotifier::register_callback(std::function<ProgressNotifierCallback> notifier,
1480
                                                 NotifierType direction, bool is_streaming)
1481
{
48✔
1482
    util::UniqueFunction<void()> invocation;
48✔
1483
    uint64_t token_value = 0;
48✔
1484
    {
48✔
1485
        std::lock_guard<std::mutex> lock(m_mutex);
48✔
1486
        token_value = m_progress_notifier_token++;
48✔
1487
        NotifierPackage package{std::move(notifier), util::none, m_local_transaction_version, is_streaming,
48✔
1488
                                direction == NotifierType::download};
48✔
1489
        if (!m_current_progress) {
48✔
1490
            // Simply register the package, since we have no data yet.
6✔
1491
            m_packages.emplace(token_value, std::move(package));
12✔
1492
            return token_value;
12✔
1493
        }
12✔
1494
        bool skip_registration = false;
36✔
1495
        invocation = package.create_invocation(*m_current_progress, skip_registration);
36✔
1496
        if (skip_registration) {
36✔
1497
            token_value = 0;
10✔
1498
        }
10✔
1499
        else {
26✔
1500
            m_packages.emplace(token_value, std::move(package));
26✔
1501
        }
26✔
1502
    }
36✔
1503
    invocation();
36✔
1504
    return token_value;
36✔
1505
}
36✔
1506

1507
void SyncProgressNotifier::unregister_callback(uint64_t token)
1508
{
8✔
1509
    std::lock_guard<std::mutex> lock(m_mutex);
8✔
1510
    m_packages.erase(token);
8✔
1511
}
8✔
1512

1513
void SyncProgressNotifier::update(uint64_t downloaded, uint64_t downloadable, uint64_t uploaded, uint64_t uploadable,
1514
                                  uint64_t download_version, uint64_t snapshot_version)
1515
{
9,886✔
1516
    // Ignore progress messages from before we first receive a DOWNLOAD message
4,278✔
1517
    if (download_version == 0)
9,886✔
1518
        return;
4,144✔
1519

2,628✔
1520
    std::vector<util::UniqueFunction<void()>> invocations;
5,742✔
1521
    {
5,742✔
1522
        std::lock_guard<std::mutex> lock(m_mutex);
5,742✔
1523
        m_current_progress = Progress{uploadable, downloadable, uploaded, downloaded, snapshot_version};
5,742✔
1524

2,628✔
1525
        for (auto it = m_packages.begin(); it != m_packages.end();) {
5,808✔
1526
            bool should_delete = false;
66✔
1527
            invocations.emplace_back(it->second.create_invocation(*m_current_progress, should_delete));
66✔
1528
            it = should_delete ? m_packages.erase(it) : std::next(it);
57✔
1529
        }
66✔
1530
    }
5,742✔
1531
    // Run the notifiers only after we've released the lock.
2,628✔
1532
    for (auto& invocation : invocations)
5,742✔
1533
        invocation();
66✔
1534
}
5,742✔
1535

1536
void SyncProgressNotifier::set_local_version(uint64_t snapshot_version)
1537
{
5,877✔
1538
    std::lock_guard<std::mutex> lock(m_mutex);
5,877✔
1539
    m_local_transaction_version = snapshot_version;
5,877✔
1540
}
5,877✔
1541

1542
util::UniqueFunction<void()>
1543
SyncProgressNotifier::NotifierPackage::create_invocation(Progress const& current_progress, bool& is_expired)
1544
{
102✔
1545
    uint64_t transferred = is_download ? current_progress.downloaded : current_progress.uploaded;
83✔
1546
    uint64_t transferrable = is_download ? current_progress.downloadable : current_progress.uploadable;
83✔
1547
    if (!is_streaming) {
102✔
1548
        // If the sync client has not yet processed all of the local
35✔
1549
        // transactions then the uploadable data is incorrect and we should
35✔
1550
        // not invoke the callback
35✔
1551
        if (!is_download && snapshot_version > current_progress.snapshot_version)
70✔
1552
            return [] {};
2✔
1553

34✔
1554
        // The initial download size we get from the server is the uncompacted
34✔
1555
        // size, and so the download may complete before we actually receive
34✔
1556
        // that much data. When that happens, transferrable will drop and we
34✔
1557
        // need to use the new value instead of the captured one.
34✔
1558
        if (!captured_transferrable || *captured_transferrable > transferrable)
68✔
1559
            captured_transferrable = transferrable;
36✔
1560
        transferrable = *captured_transferrable;
68✔
1561
    }
68✔
1562

51✔
1563
    // A notifier is expired if at least as many bytes have been transferred
51✔
1564
    // as were originally considered transferrable.
51✔
1565
    is_expired = !is_streaming && transferred >= transferrable;
101✔
1566
    return [=, notifier = notifier] {
100✔
1567
        notifier(transferred, transferrable);
100✔
1568
    };
100✔
1569
}
102✔
1570

1571
uint64_t SyncSession::ConnectionChangeNotifier::add_callback(std::function<ConnectionStateChangeCallback> callback)
1572
{
6✔
1573
    std::lock_guard<std::mutex> lock(m_callback_mutex);
6✔
1574
    auto token = m_next_token++;
6✔
1575
    m_callbacks.push_back({std::move(callback), token});
6✔
1576
    return token;
6✔
1577
}
6✔
1578

1579
void SyncSession::ConnectionChangeNotifier::remove_callback(uint64_t token)
1580
{
2✔
1581
    Callback old;
2✔
1582
    {
2✔
1583
        std::lock_guard<std::mutex> lock(m_callback_mutex);
2✔
1584
        auto it = std::find_if(begin(m_callbacks), end(m_callbacks), [=](const auto& c) {
2✔
1585
            return c.token == token;
2✔
1586
        });
2✔
1587
        if (it == end(m_callbacks)) {
2✔
1588
            return;
×
1589
        }
×
1590

1✔
1591
        size_t idx = distance(begin(m_callbacks), it);
2✔
1592
        if (m_callback_index != npos) {
2✔
1593
            if (m_callback_index >= idx)
×
1594
                --m_callback_index;
×
1595
        }
×
1596
        --m_callback_count;
2✔
1597

1✔
1598
        old = std::move(*it);
2✔
1599
        m_callbacks.erase(it);
2✔
1600
    }
2✔
1601
}
2✔
1602

1603
void SyncSession::ConnectionChangeNotifier::invoke_callbacks(ConnectionState old_state, ConnectionState new_state)
1604
{
5,273✔
1605
    std::unique_lock lock(m_callback_mutex);
5,273✔
1606
    m_callback_count = m_callbacks.size();
5,273✔
1607
    for (++m_callback_index; m_callback_index < m_callback_count; ++m_callback_index) {
5,277✔
1608
        // acquire a local reference to the callback so that removing the
2✔
1609
        // callback from within it can't result in a dangling pointer
2✔
1610
        auto cb = m_callbacks[m_callback_index].fn;
4✔
1611
        lock.unlock();
4✔
1612
        cb(old_state, new_state);
4✔
1613
        lock.lock();
4✔
1614
    }
4✔
1615
    m_callback_index = npos;
5,273✔
1616
}
5,273✔
1617

1618
util::Future<std::string> SyncSession::send_test_command(std::string body)
1619
{
26✔
1620
    util::CheckedLockGuard lk(m_state_mutex);
26✔
1621
    if (!m_session) {
26✔
1622
        return Status{ErrorCodes::RuntimeError, "Session doesn't exist to send test command on"};
×
1623
    }
×
1624

13✔
1625
    return m_session->send_test_command(std::move(body));
26✔
1626
}
26✔
1627

1628
void SyncSession::make_active_subscription_set()
1629
{
16✔
1630
    util::CheckedUniqueLock lock(m_state_mutex);
16✔
1631

8✔
1632
    if (!m_active_subscriptions_after_migration)
16✔
1633
        return;
×
1634

8✔
1635
    REALM_ASSERT(m_flx_subscription_store);
16✔
1636

8✔
1637
    // Create subscription set from the subscriptions used to download the fresh realm after migration.
8✔
1638
    auto active_mut_sub = m_flx_subscription_store->get_active().make_mutable_copy();
16✔
1639
    active_mut_sub.import(*m_active_subscriptions_after_migration);
16✔
1640
    active_mut_sub.update_state(sync::SubscriptionSet::State::Complete);
16✔
1641
    active_mut_sub.commit();
16✔
1642

8✔
1643
    m_active_subscriptions_after_migration.reset();
16✔
1644
}
16✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc