• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1853

20 Nov 2023 07:46PM UTC coverage: 91.683% (-0.02%) from 91.699%
1853

push

Evergreen

web-flow
Fix client reset cycle detection for PBS recovery errors (#7149)

Tracking that a client reset was in progress was done in the same write
transaction as the recovery operation, so if recovery failed the tracking was
rolled back too. This worked for FLX due to that codepath committing before
beginning recovery.

92262 of 169120 branches covered (0.0%)

31 of 31 new or added lines in 3 files covered. (100.0%)

143 existing lines in 15 files now uncovered.

231277 of 252257 relevant lines covered (91.68%)

6495482.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.9
/src/realm/object-store/sync/sync_manager.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2016 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/sync/sync_manager.hpp>
20

21
#include <realm/object-store/sync/impl/sync_client.hpp>
22
#include <realm/object-store/sync/impl/sync_file.hpp>
23
#include <realm/object-store/sync/impl/sync_metadata.hpp>
24
#include <realm/object-store/sync/sync_session.hpp>
25
#include <realm/object-store/sync/sync_user.hpp>
26
#include <realm/object-store/sync/app.hpp>
27
#include <realm/object-store/util/uuid.hpp>
28

29
#include <realm/util/sha_crypto.hpp>
30
#include <realm/util/hex_dump.hpp>
31

32
using namespace realm;
33
using namespace realm::_impl;
34

35
SyncClientTimeouts::SyncClientTimeouts()
36
    : connect_timeout(sync::Client::default_connect_timeout)
37
    , connection_linger_time(sync::Client::default_connection_linger_time)
38
    , ping_keepalive_period(sync::Client::default_ping_keepalive_period)
39
    , pong_keepalive_timeout(sync::Client::default_pong_keepalive_timeout)
40
    , fast_reconnect_limit(sync::Client::default_fast_reconnect_limit)
41
{
12,407✔
42
}
12,407✔
43

44
SyncManager::SyncManager() = default;
4,171✔
45

46
void SyncManager::configure(std::shared_ptr<app::App> app, const std::string& sync_route,
47
                            const SyncClientConfig& config)
48
{
4,169✔
49
    std::vector<std::shared_ptr<SyncUser>> users_to_add;
4,169✔
50
    {
4,169✔
51
        // Locking the mutex here ensures that it is released before locking m_user_mutex
2,049✔
52
        util::CheckedLockGuard lock(m_mutex);
4,169✔
53
        m_app = app;
4,169✔
54
        m_sync_route = sync_route;
4,169✔
55
        m_config = std::move(config);
4,169✔
56
        if (m_sync_client)
4,169✔
57
            return;
×
58

2,049✔
59
        // create a new logger - if the logger_factory is updated later, a new
2,049✔
60
        // logger will be created at that time.
2,049✔
61
        do_make_logger();
4,169✔
62

2,049✔
63
        {
4,169✔
64
            util::CheckedLockGuard lock(m_file_system_mutex);
4,169✔
65

2,049✔
66
            // Set up the file manager.
2,049✔
67
            if (m_file_manager) {
4,169✔
68
                // Changing the base path for tests requires calling reset_for_testing()
69
                // first, and otherwise isn't supported
70
                REALM_ASSERT(m_file_manager->base_path() == m_config.base_file_path);
×
71
            }
×
72
            else {
4,169✔
73
                m_file_manager = std::make_unique<SyncFileManager>(m_config.base_file_path, app->config().app_id);
4,169✔
74
            }
4,169✔
75

2,049✔
76
            // Set up the metadata manager, and perform initial loading/purging work.
2,049✔
77
            if (m_metadata_manager || m_config.metadata_mode == MetadataMode::NoMetadata) {
4,169✔
78
                return;
60✔
79
            }
60✔
80

2,019✔
81
            bool encrypt = m_config.metadata_mode == MetadataMode::Encryption;
4,109✔
82
            m_metadata_manager = std::make_unique<SyncMetadataManager>(m_file_manager->metadata_path(), encrypt,
4,109✔
83
                                                                       m_config.custom_encryption_key);
4,109✔
84

2,019✔
85
            REALM_ASSERT(m_metadata_manager);
4,109✔
86

2,019✔
87
            // Perform our "on next startup" actions such as deleting Realm files
2,019✔
88
            // which we couldn't delete immediately due to them being in use
2,019✔
89
            std::vector<SyncFileActionMetadata> completed_actions;
4,109✔
90
            SyncFileActionMetadataResults file_actions = m_metadata_manager->all_pending_actions();
4,109✔
91
            for (size_t i = 0; i < file_actions.size(); i++) {
4,159✔
92
                auto file_action = file_actions.get(i);
50✔
93
                if (run_file_action(file_action)) {
50✔
94
                    completed_actions.emplace_back(std::move(file_action));
46✔
95
                }
46✔
96
            }
50✔
97
            for (auto& action : completed_actions) {
2,042✔
98
                action.remove();
46✔
99
            }
46✔
100

2,019✔
101
            // Load persisted users into the users map.
2,019✔
102
            SyncUserMetadataResults users = m_metadata_manager->all_unmarked_users();
4,109✔
103
            for (size_t i = 0; i < users.size(); i++) {
4,123✔
104
                auto user_data = users.get(i);
14✔
105
                auto refresh_token = user_data.refresh_token();
14✔
106
                auto access_token = user_data.access_token();
14✔
107
                if (!refresh_token.empty() && !access_token.empty()) {
14✔
108
                    users_to_add.push_back(std::make_shared<SyncUser>(user_data, this));
12✔
109
                }
12✔
110
            }
14✔
111

2,019✔
112
            // Delete any users marked for death.
2,019✔
113
            std::vector<SyncUserMetadata> dead_users;
4,109✔
114
            SyncUserMetadataResults users_to_remove = m_metadata_manager->all_users_marked_for_removal();
4,109✔
115
            dead_users.reserve(users_to_remove.size());
4,109✔
116
            for (size_t i = 0; i < users_to_remove.size(); i++) {
4,113✔
117
                auto user = users_to_remove.get(i);
4✔
118
                // FIXME: delete user data in a different way? (This deletes a logged-out user's data as soon as the
2✔
119
                // app launches again, which might not be how some apps want to treat their data.)
2✔
120
                try {
4✔
121
                    m_file_manager->remove_user_realms(user.identity(), user.realm_file_paths());
4✔
122
                    dead_users.emplace_back(std::move(user));
4✔
123
                }
4✔
124
                catch (FileAccessError const&) {
2✔
125
                    continue;
×
126
                }
×
127
            }
4✔
128
            for (auto& user : dead_users) {
4,109✔
129
                user.remove();
4✔
130
            }
4✔
131
        }
4,109✔
132
    }
4,109✔
133
    {
4,109✔
134
        util::CheckedLockGuard lock(m_user_mutex);
4,109✔
135
        m_users.insert(m_users.end(), users_to_add.begin(), users_to_add.end());
4,109✔
136
    }
4,109✔
137
}
4,109✔
138

139
bool SyncManager::immediately_run_file_actions(const std::string& realm_path)
140
{
10✔
141
    util::CheckedLockGuard lock(m_file_system_mutex);
10✔
142
    if (!m_metadata_manager) {
10✔
143
        return false;
×
144
    }
×
145
    if (auto metadata = m_metadata_manager->get_file_action_metadata(realm_path)) {
10✔
146
        if (run_file_action(*metadata)) {
6✔
147
            metadata->remove();
6✔
148
            return true;
6✔
149
        }
6✔
150
    }
4✔
151
    return false;
4✔
152
}
4✔
153

154
// Perform a file action. Returns whether or not the file action can be removed.
155
bool SyncManager::run_file_action(SyncFileActionMetadata& md)
156
{
56✔
157
    switch (md.action()) {
56✔
158
        case SyncFileActionMetadata::Action::DeleteRealm:
14✔
159
            // Delete all the files for the given Realm.
7✔
160
            return m_file_manager->remove_realm(md.original_name());
14✔
161
        case SyncFileActionMetadata::Action::BackUpThenDeleteRealm:
42✔
162
            // Copy the primary Realm file to the recovery dir, and then delete the Realm.
21✔
163
            auto new_name = md.new_name();
42✔
164
            auto original_name = md.original_name();
42✔
165
            if (!util::File::exists(original_name)) {
42✔
166
                // The Realm file doesn't exist anymore.
9✔
167
                return true;
18✔
168
            }
18✔
169
            if (new_name && !util::File::exists(*new_name) &&
24✔
170
                m_file_manager->copy_realm_file(original_name, *new_name)) {
23✔
171
                // We successfully copied the Realm file to the recovery directory.
11✔
172
                bool did_remove = m_file_manager->remove_realm(original_name);
22✔
173
                // if the copy succeeded but not the delete, then running BackupThenDelete
11✔
174
                // a second time would fail, so change this action to just delete the original file.
11✔
175
                if (did_remove) {
22✔
176
                    return true;
20✔
177
                }
20✔
178
                md.set_action(SyncFileActionMetadata::Action::DeleteRealm);
2✔
179
                return false;
2✔
180
            }
2✔
181
            return false;
2✔
182
    }
×
183
    return false;
×
184
}
×
185

186
void SyncManager::reset_for_testing()
187
{
4,067✔
188
    {
4,067✔
189
        util::CheckedLockGuard lock(m_file_system_mutex);
4,067✔
190
        m_metadata_manager = nullptr;
4,067✔
191
    }
4,067✔
192

1,998✔
193
    {
4,067✔
194
        // Destroy all the users.
1,998✔
195
        util::CheckedLockGuard lock(m_user_mutex);
4,067✔
196
        for (auto& user : m_users) {
4,504✔
197
            user->detach_from_sync_manager();
4,504✔
198
        }
4,504✔
199
        m_users.clear();
4,067✔
200
        m_current_user = nullptr;
4,067✔
201
    }
4,067✔
202

1,998✔
203
    {
4,067✔
204
        util::CheckedLockGuard lock(m_mutex);
4,067✔
205
        // Stop the client. This will abort any uploads that inactive sessions are waiting for.
1,998✔
206
        if (m_sync_client)
4,067✔
207
            m_sync_client->stop();
4,067✔
208
    }
4,067✔
209

1,998✔
210
    {
4,067✔
211
        util::CheckedUniqueLock lock(m_session_mutex);
4,067✔
212

1,998✔
213
        bool no_sessions = !do_has_existing_sessions();
4,067✔
214
        // There's a race between this function and sessions tearing themselves down waiting for m_session_mutex.
1,998✔
215
        // So we give up to a 5 second grace period for any sessions being torn down to unregister themselves.
1,998✔
216
        auto since_poll_start = [start = std::chrono::steady_clock::now()] {
1,998✔
217
            return std::chrono::steady_clock::now() - start;
×
218
        };
×
219
        for (; !no_sessions && since_poll_start() < std::chrono::seconds(5);
4,067!
220
             no_sessions = !do_has_existing_sessions()) {
1,998✔
221
            lock.unlock();
×
222
            std::this_thread::sleep_for(std::chrono::milliseconds(10));
×
223
            lock.lock();
×
224
        }
×
225
        // Callers of `SyncManager::reset_for_testing` should ensure there are no existing sessions
1,998✔
226
        // prior to calling `reset_for_testing`.
1,998✔
227
        if (!no_sessions) {
4,067✔
228
            util::CheckedLockGuard lock(m_mutex);
×
229
            for (auto session : m_sessions) {
×
230
                m_logger_ptr->error("open session at path '%1'", session.first);
×
231
            }
×
232
        }
×
233
        REALM_ASSERT_RELEASE(no_sessions);
4,067✔
234

1,998✔
235
        // Destroy any inactive sessions.
1,998✔
236
        // FIXME: We shouldn't have any inactive sessions at this point! Sessions are expected to
1,998✔
237
        // remain inactive until their final upload completes, at which point they are unregistered
1,998✔
238
        // and destroyed. Our call to `sync::Client::stop` above aborts all uploads, so all sessions
1,998✔
239
        // should have already been destroyed.
1,998✔
240
        m_sessions.clear();
4,067✔
241
    }
4,067✔
242

1,998✔
243
    {
4,067✔
244
        util::CheckedLockGuard lock(m_mutex);
4,067✔
245
        // Destroy the client now that we have no remaining sessions.
1,998✔
246
        m_sync_client = nullptr;
4,067✔
247

1,998✔
248
        // Reset even more state.
1,998✔
249
        m_config = {};
4,067✔
250
        m_logger_ptr.reset();
4,067✔
251
        m_sync_route = "";
4,067✔
252
    }
4,067✔
253

1,998✔
254
    {
4,067✔
255
        util::CheckedLockGuard lock(m_file_system_mutex);
4,067✔
256
        if (m_file_manager)
4,067✔
257
            util::try_remove_dir_recursive(m_file_manager->base_path());
4,067✔
258
        m_file_manager = nullptr;
4,067✔
259
    }
4,067✔
260
}
4,067✔
261

262
void SyncManager::set_log_level(util::Logger::Level level) noexcept
263
{
×
264
    util::CheckedLockGuard lock(m_mutex);
×
265
    m_config.log_level = level;
×
266
    // Update the level threshold in the already created logger
267
    if (m_logger_ptr) {
×
268
        m_logger_ptr->set_level_threshold(level);
×
269
    }
×
270
}
×
271

272
void SyncManager::set_logger_factory(SyncClientConfig::LoggerFactory factory)
273
{
×
274
    util::CheckedLockGuard lock(m_mutex);
×
275
    m_config.logger_factory = std::move(factory);
×
276

277
    if (m_sync_client)
×
278
        throw std::logic_error("Cannot set the logger factory after creating the sync client");
×
279

280
    // Create a new logger using the new factory
281
    do_make_logger();
×
282
}
×
283

284
void SyncManager::do_make_logger()
285
{
4,169✔
286
    if (m_config.logger_factory) {
4,169✔
287
        m_logger_ptr = m_config.logger_factory(m_config.log_level);
×
288
    }
×
289
    else {
4,169✔
290
        m_logger_ptr = util::Logger::get_default_logger();
4,169✔
291
    }
4,169✔
292
}
4,169✔
293

294
const std::shared_ptr<util::Logger>& SyncManager::get_logger() const
295
{
584✔
296
    util::CheckedLockGuard lock(m_mutex);
584✔
297
    return m_logger_ptr;
584✔
298
}
584✔
299

300
void SyncManager::set_user_agent(std::string user_agent)
301
{
×
302
    util::CheckedLockGuard lock(m_mutex);
×
303
    m_config.user_agent_application_info = std::move(user_agent);
×
304
}
×
305

306
void SyncManager::set_timeouts(SyncClientTimeouts timeouts)
307
{
×
308
    util::CheckedLockGuard lock(m_mutex);
×
309
    m_config.timeouts = timeouts;
×
310
}
×
311

312
void SyncManager::reconnect() const
313
{
2✔
314
    util::CheckedLockGuard lock(m_session_mutex);
2✔
315
    for (auto& it : m_sessions) {
1✔
316
        it.second->handle_reconnect();
×
317
    }
×
318
}
2✔
319

320
util::Logger::Level SyncManager::log_level() const noexcept
321
{
×
322
    util::CheckedLockGuard lock(m_mutex);
×
323
    return m_config.log_level;
×
324
}
×
325

326
bool SyncManager::perform_metadata_update(util::FunctionRef<void(SyncMetadataManager&)> update_function) const
327
{
9,827✔
328
    util::CheckedLockGuard lock(m_file_system_mutex);
9,827✔
329
    if (!m_metadata_manager) {
9,827✔
330
        return false;
93✔
331
    }
93✔
332
    update_function(*m_metadata_manager);
9,734✔
333
    return true;
9,734✔
334
}
9,734✔
335

336
std::shared_ptr<SyncUser> SyncManager::get_user(const std::string& user_id, const std::string& refresh_token,
337
                                                const std::string& access_token, const std::string& device_id)
338
{
7,886✔
339
    util::CheckedLockGuard lock(m_user_mutex);
7,886✔
340
    auto it = std::find_if(m_users.begin(), m_users.end(), [&](const auto& user) {
6,081✔
341
        return user->identity() == user_id && user->state() != SyncUser::State::Removed;
4,347✔
342
    });
4,347✔
343
    if (it == m_users.end()) {
7,886✔
344
        // No existing user.
2,240✔
345
        auto new_user = std::make_shared<SyncUser>(refresh_token, user_id, access_token, device_id, this);
4,556✔
346
        m_users.emplace(m_users.begin(), new_user);
4,556✔
347
        {
4,556✔
348
            util::CheckedLockGuard lock(m_file_system_mutex);
4,556✔
349
            // m_current_user is normally set very indirectly via the metadata manger
2,240✔
350
            if (!m_metadata_manager)
4,556✔
351
                m_current_user = new_user;
44✔
352
        }
4,556✔
353
        return new_user;
4,556✔
354
    }
4,556✔
355
    else { // LoggedOut => LoggedIn
3,330✔
356
        auto user = *it;
3,330✔
357
        REALM_ASSERT(user->state() != SyncUser::State::Removed);
3,330✔
358
        user->log_in(access_token, refresh_token);
3,330✔
359
        return user;
3,330✔
360
    }
3,330✔
361
}
7,886✔
362

363
std::vector<std::shared_ptr<SyncUser>> SyncManager::all_users()
364
{
250✔
365
    util::CheckedLockGuard lock(m_user_mutex);
250✔
366
    m_users.erase(std::remove_if(m_users.begin(), m_users.end(),
250✔
367
                                 [](auto& user) {
245✔
368
                                     bool should_remove = (user->state() == SyncUser::State::Removed);
240✔
369
                                     if (should_remove) {
240✔
370
                                         user->detach_from_sync_manager();
22✔
371
                                     }
22✔
372
                                     return should_remove;
240✔
373
                                 }),
240✔
374
                  m_users.end());
250✔
375
    return m_users;
250✔
376
}
250✔
377

378
std::shared_ptr<SyncUser> SyncManager::get_user_for_identity(std::string const& identity) const noexcept
379
{
1,111✔
380
    auto is_active_user = [identity](auto& el) {
1,119✔
381
        return el->identity() == identity;
1,119✔
382
    };
1,119✔
383
    auto it = std::find_if(m_users.begin(), m_users.end(), is_active_user);
1,111✔
384
    return it == m_users.end() ? nullptr : *it;
1,098✔
385
}
1,111✔
386

387
std::shared_ptr<SyncUser> SyncManager::get_current_user() const
388
{
987✔
389
    util::CheckedLockGuard lock(m_user_mutex);
987✔
390

484✔
391
    if (m_current_user)
987✔
392
        return m_current_user;
927✔
393
    util::CheckedLockGuard fs_lock(m_file_system_mutex);
60✔
394
    if (!m_metadata_manager)
60✔
395
        return nullptr;
×
396

30✔
397
    auto cur_user_ident = m_metadata_manager->get_current_user_identity();
60✔
398
    return cur_user_ident ? get_user_for_identity(*cur_user_ident) : nullptr;
49✔
399
}
60✔
400

401
void SyncManager::log_out_user(const SyncUser& user)
402
{
99✔
403
    util::CheckedLockGuard lock(m_user_mutex);
99✔
404

49✔
405
    // Move this user to the end of the vector
49✔
406
    auto user_pos = std::partition(m_users.begin(), m_users.end(), [&](auto& u) {
130✔
407
        return u.get() != &user;
130✔
408
    });
130✔
409

49✔
410
    auto active_user = std::find_if(m_users.begin(), user_pos, [](auto& u) {
64✔
411
        return u->state() == SyncUser::State::LoggedIn;
29✔
412
    });
29✔
413

49✔
414
    util::CheckedLockGuard fs_lock(m_file_system_mutex);
99✔
415
    bool was_active = m_current_user.get() == &user ||
99✔
416
                      (m_metadata_manager && m_metadata_manager->get_current_user_identity() == user.identity());
66✔
417
    if (!was_active)
99✔
418
        return;
3✔
419

48✔
420
    // Set the current active user to the next logged in user, or null if none
48✔
421
    if (active_user != user_pos) {
96✔
422
        m_current_user = *active_user;
16✔
423
        if (m_metadata_manager)
16✔
424
            m_metadata_manager->set_current_user_identity((*active_user)->identity());
16✔
425
    }
16✔
426
    else {
80✔
427
        m_current_user = nullptr;
80✔
428
        if (m_metadata_manager)
80✔
429
            m_metadata_manager->set_current_user_identity("");
72✔
430
    }
80✔
431
}
96✔
432

433
void SyncManager::set_current_user(const std::string& user_id)
434
{
1,042✔
435
    util::CheckedLockGuard lock(m_user_mutex);
1,042✔
436

510✔
437
    m_current_user = get_user_for_identity(user_id);
1,042✔
438
    util::CheckedLockGuard fs_lock(m_file_system_mutex);
1,042✔
439
    if (m_metadata_manager)
1,042✔
440
        m_metadata_manager->set_current_user_identity(user_id);
1,040✔
441
}
1,042✔
442

443
void SyncManager::remove_user(const std::string& user_id)
444
{
25✔
445
    util::CheckedLockGuard lock(m_user_mutex);
25✔
446
    if (auto user = get_user_for_identity(user_id))
25✔
447
        user->invalidate();
25✔
448
}
25✔
449

450
void SyncManager::delete_user(const std::string& user_id)
451
{
10✔
452
    util::CheckedLockGuard lock(m_user_mutex);
10✔
453
    // Avoid iterating over m_users twice by not calling `get_user_for_identity`.
5✔
454
    auto it = std::find_if(m_users.begin(), m_users.end(), [&user_id](auto& user) {
10✔
455
        return user->identity() == user_id;
10✔
456
    });
10✔
457
    auto user = it == m_users.end() ? nullptr : *it;
10✔
458

5✔
459
    if (!user)
10✔
460
        return;
×
461

5✔
462
    // Deletion should happen immediately, not when we do the cleanup
5✔
463
    // task on next launch.
5✔
464
    m_users.erase(it);
10✔
465
    user->detach_from_sync_manager();
10✔
466

5✔
467
    if (m_current_user && m_current_user->identity() == user->identity())
10✔
468
        m_current_user = nullptr;
10✔
469

5✔
470
    util::CheckedLockGuard fs_lock(m_file_system_mutex);
10✔
471
    if (!m_metadata_manager)
10✔
472
        return;
×
473

5✔
474
    auto users = m_metadata_manager->all_unmarked_users();
10✔
475
    for (size_t i = 0; i < users.size(); i++) {
12✔
476
        auto metadata = users.get(i);
12✔
477
        if (user->identity() == metadata.identity()) {
12✔
478
            m_file_manager->remove_user_realms(metadata.identity(), metadata.realm_file_paths());
10✔
479
            metadata.remove();
10✔
480
            break;
10✔
481
        }
10✔
482
    }
12✔
483
}
10✔
484

485
SyncManager::~SyncManager() NO_THREAD_SAFETY_ANALYSIS
486
{
4,171✔
487
    // Grab the current sessions under a lock so we can shut them down. We have to
2,050✔
488
    // release the lock before calling them as shutdown_and_wait() will call
2,050✔
489
    // back into us.
2,050✔
490
    decltype(m_sessions) current_sessions;
4,171✔
491
    {
4,171✔
492
        util::CheckedLockGuard lk(m_session_mutex);
4,171✔
493
        m_sessions.swap(current_sessions);
4,171✔
494
    }
4,171✔
495

2,050✔
496
    for (auto& [_, session] : current_sessions) {
2,050✔
UNCOV
497
        session->detach_from_sync_manager();
×
UNCOV
498
    }
×
499

2,050✔
500
    {
4,171✔
501
        util::CheckedLockGuard lk(m_user_mutex);
4,171✔
502
        for (auto& user : m_users) {
2,065✔
503
            user->detach_from_sync_manager();
30✔
504
        }
30✔
505
    }
4,171✔
506

2,050✔
507
    {
4,171✔
508
        util::CheckedLockGuard lk(m_mutex);
4,171✔
509
        // Stop the client. This will abort any uploads that inactive sessions are waiting for.
2,050✔
510
        if (m_sync_client)
4,171✔
511
            m_sync_client->stop();
40✔
512
    }
4,171✔
513
}
4,171✔
514

515
std::shared_ptr<SyncUser> SyncManager::get_existing_logged_in_user(const std::string& user_id) const
516
{
6✔
517
    util::CheckedLockGuard lock(m_user_mutex);
6✔
518
    auto user = get_user_for_identity(user_id);
6✔
519
    return user && user->state() == SyncUser::State::LoggedIn ? user : nullptr;
6✔
520
}
6✔
521

522
struct UnsupportedBsonPartition : public std::logic_error {
523
    UnsupportedBsonPartition(std::string msg)
524
        : std::logic_error(msg)
525
    {
×
526
    }
×
527
};
528

529
static std::string string_from_partition(const std::string& partition)
530
{
54✔
531
    bson::Bson partition_value = bson::parse(partition);
54✔
532
    switch (partition_value.type()) {
54✔
533
        case bson::Bson::Type::Int32:
2✔
534
            return util::format("i_%1", static_cast<int32_t>(partition_value));
2✔
535
        case bson::Bson::Type::Int64:
2✔
536
            return util::format("l_%1", static_cast<int64_t>(partition_value));
2✔
537
        case bson::Bson::Type::String:
44✔
538
            return util::format("s_%1", static_cast<std::string>(partition_value));
44✔
539
        case bson::Bson::Type::ObjectId:
2✔
540
            return util::format("o_%1", static_cast<ObjectId>(partition_value).to_string());
2✔
541
        case bson::Bson::Type::Uuid:
2✔
542
            return util::format("u_%1", static_cast<UUID>(partition_value).to_string());
2✔
543
        case bson::Bson::Type::Null:
2✔
544
            return "null";
2✔
545
        default:
✔
546
            throw UnsupportedBsonPartition(util::format("Unsupported partition key value: '%1'. Only int, string "
×
547
                                                        "UUID and ObjectId types are currently supported.",
×
548
                                                        partition_value.to_string()));
×
549
    }
54✔
550
}
54✔
551

552
std::string SyncManager::path_for_realm(const SyncConfig& config, util::Optional<std::string> custom_file_name) const
553
{
74✔
554
    auto user = config.user;
74✔
555
    REALM_ASSERT(user);
74✔
556
    std::string path;
74✔
557
    {
74✔
558
        util::CheckedLockGuard lock(m_file_system_mutex);
74✔
559
        REALM_ASSERT(m_file_manager);
74✔
560

37✔
561
        // Attempt to make a nicer filename which will ease debugging when
37✔
562
        // locating files in the filesystem.
37✔
563
        auto file_name = [&]() -> std::string {
74✔
564
            if (custom_file_name) {
74✔
565
                return *custom_file_name;
18✔
566
            }
18✔
567
            if (config.flx_sync_requested) {
56✔
568
                REALM_ASSERT_DEBUG(config.partition_value.empty());
2✔
569
                return "flx_sync_default";
2✔
570
            }
2✔
571
            return string_from_partition(config.partition_value);
54✔
572
        }();
54✔
573
        path = m_file_manager->realm_file_path(user->identity(), user->legacy_identities(), file_name,
74✔
574
                                               config.partition_value);
74✔
575
    }
74✔
576
    // Report the use of a Realm for this user, so the metadata can track it for clean up.
37✔
577
    perform_metadata_update([&](const auto& manager) {
60✔
578
        auto metadata = manager.get_or_make_user_metadata(user->identity());
46✔
579
        metadata->add_realm_file_path(path);
46✔
580
    });
46✔
581
    return path;
74✔
582
}
74✔
583

584
std::string SyncManager::recovery_directory_path(util::Optional<std::string> const& custom_dir_name) const
585
{
76✔
586
    util::CheckedLockGuard lock(m_file_system_mutex);
76✔
587
    REALM_ASSERT(m_file_manager);
76✔
588
    return m_file_manager->recovery_directory_path(custom_dir_name);
76✔
589
}
76✔
590

591
std::vector<std::shared_ptr<SyncSession>> SyncManager::get_all_sessions() const
592
{
52✔
593
    util::CheckedLockGuard lock(m_session_mutex);
52✔
594
    std::vector<std::shared_ptr<SyncSession>> sessions;
52✔
595
    for (auto& [_, session] : m_sessions) {
102✔
596
        if (auto external_reference = session->existing_external_reference())
102✔
597
            sessions.push_back(std::move(external_reference));
100✔
598
    }
102✔
599
    return sessions;
52✔
600
}
52✔
601

602
std::shared_ptr<SyncSession> SyncManager::get_existing_active_session(const std::string& path) const
603
{
×
604
    util::CheckedLockGuard lock(m_session_mutex);
×
605
    if (auto session = get_existing_session_locked(path)) {
×
606
        if (auto external_reference = session->existing_external_reference())
×
607
            return external_reference;
×
608
    }
×
609
    return nullptr;
×
610
}
×
611

612
std::shared_ptr<SyncSession> SyncManager::get_existing_session_locked(const std::string& path) const
613
{
2,792✔
614
    auto it = m_sessions.find(path);
2,792✔
615
    return it == m_sessions.end() ? nullptr : it->second;
2,683✔
616
}
2,792✔
617

618
std::shared_ptr<SyncSession> SyncManager::get_existing_session(const std::string& path) const
619
{
1,411✔
620
    util::CheckedLockGuard lock(m_session_mutex);
1,411✔
621
    if (auto session = get_existing_session_locked(path))
1,411✔
622
        return session->external_reference();
209✔
623

522✔
624
    return nullptr;
1,202✔
625
}
1,202✔
626

627
std::shared_ptr<SyncSession> SyncManager::get_session(std::shared_ptr<DB> db, const RealmConfig& config)
628
{
1,381✔
629
    auto& client = get_sync_client(); // Throws
1,381✔
630
#ifndef __EMSCRIPTEN__
1,381✔
631
    auto path = db->get_path();
1,381✔
632
    REALM_ASSERT_EX(path == config.path, path, config.path);
1,381✔
633
#else
634
    auto path = config.path;
635
#endif
636
    REALM_ASSERT(config.sync_config);
1,381✔
637

604✔
638
    util::CheckedUniqueLock lock(m_session_mutex);
1,381✔
639
    if (auto session = get_existing_session_locked(path)) {
1,381✔
640
        config.sync_config->user->register_session(session);
8✔
641
        return session->external_reference();
8✔
642
    }
8✔
643

600✔
644
    auto shared_session = SyncSession::create(client, std::move(db), config, this);
1,373✔
645
    m_sessions[path] = shared_session;
1,373✔
646

600✔
647
    // Create the external reference immediately to ensure that the session will become
600✔
648
    // inactive if an exception is thrown in the following code.
600✔
649
    auto external_reference = shared_session->external_reference();
1,373✔
650
    // unlocking m_session_mutex here prevents a deadlock for synchronous network
600✔
651
    // transports such as the unit test suite, in the case where the log in request is
600✔
652
    // denied by the server: Active -> WaitingForAccessToken -> handle_refresh(401
600✔
653
    // error) -> user.log_out() -> unregister_session (locks m_session_mutex again)
600✔
654
    lock.unlock();
1,373✔
655
    config.sync_config->user->register_session(std::move(shared_session));
1,373✔
656

600✔
657
    return external_reference;
1,373✔
658
}
1,373✔
659

660
bool SyncManager::has_existing_sessions()
661
{
12✔
662
    util::CheckedLockGuard lock(m_session_mutex);
12✔
663
    return do_has_existing_sessions();
12✔
664
}
12✔
665

666
bool SyncManager::do_has_existing_sessions()
667
{
4,079✔
668
    return std::any_of(m_sessions.begin(), m_sessions.end(), [](auto& element) {
2,021✔
669
        return element.second->existing_external_reference();
38✔
670
    });
38✔
671
}
4,079✔
672

673
void SyncManager::wait_for_sessions_to_terminate()
674
{
58✔
675
    auto& client = get_sync_client(); // Throws
58✔
676
    client.wait_for_session_terminations();
58✔
677
}
58✔
678

679
void SyncManager::unregister_session(const std::string& path)
680
{
2,088✔
681
    util::CheckedUniqueLock lock(m_session_mutex);
2,088✔
682
    auto it = m_sessions.find(path);
2,088✔
683
    if (it == m_sessions.end()) {
2,088✔
684
        // The session may already be unregistered. This always happens in the
685
        // SyncManager destructor, and can also happen due to multiple threads
686
        // tearing things down at once.
687
        return;
1✔
688
    }
1✔
689

890✔
690
    // Sync session teardown calls this function, so we need to be careful with
890✔
691
    // locking here. We need to unlock `m_session_mutex` before we do anything
890✔
692
    // which could result in a re-entrant call or we'll deadlock, which in this
890✔
693
    // function means unlocking before we destroy a `shared_ptr<SyncSession>`
890✔
694
    // (either the external reference or internal reference versions).
890✔
695
    // The external reference version will only be the final reference if
890✔
696
    // another thread drops a reference while we're in this function.
890✔
697
    // Dropping the final internal reference does not appear to ever actually
890✔
698
    // result in a recursive call to this function at the time this comment was
890✔
699
    // written, but releasing the lock in that case as well is still safer.
890✔
700

890✔
701
    if (auto existing_session = it->second->existing_external_reference()) {
2,087✔
702
        // We got here because the session entered the inactive state, but
309✔
703
        // there's still someone referencing it so we should leave it be. This
309✔
704
        // can happen if the user was logged out, or if all Realms using the
309✔
705
        // session were destroyed but the SDK user is holding onto the session.
309✔
706

309✔
707
        // Explicit unlock so that `existing_session`'s destructor runs after
309✔
708
        // the unlock for the reasons noted above
309✔
709
        lock.unlock();
750✔
710
        return;
750✔
711
    }
750✔
712

581✔
713
    // Remove the session from the map while holding the lock, but then defer
581✔
714
    // destroying it until after we unlock the mutex for the reasons noted above.
581✔
715
    auto session = m_sessions.extract(it);
1,337✔
716
    lock.unlock();
1,337✔
717
}
1,337✔
718

719
void SyncManager::set_session_multiplexing(bool allowed)
720
{
4✔
721
    util::CheckedLockGuard lock(m_mutex);
4✔
722
    if (m_config.multiplex_sessions == allowed)
4✔
723
        return; // Already enabled, we can ignore
2✔
724

1✔
725
    if (m_sync_client)
2✔
726
        throw std::logic_error("Cannot enable session multiplexing after creating the sync client");
×
727

1✔
728
    m_config.multiplex_sessions = allowed;
2✔
729
}
2✔
730

731
SyncClient& SyncManager::get_sync_client() const
732
{
5,548✔
733
    util::CheckedLockGuard lock(m_mutex);
5,548✔
734
    if (!m_sync_client)
5,548✔
735
        m_sync_client = create_sync_client(); // Throws
4,107✔
736
    return *m_sync_client;
5,548✔
737
}
5,548✔
738

739
std::unique_ptr<SyncClient> SyncManager::create_sync_client() const
740
{
4,107✔
741
    return std::make_unique<SyncClient>(m_logger_ptr, m_config, weak_from_this());
4,107✔
742
}
4,107✔
743

744
util::Optional<SyncAppMetadata> SyncManager::app_metadata() const
745
{
5,249✔
746
    util::CheckedLockGuard lock(m_file_system_mutex);
5,249✔
747
    if (!m_metadata_manager) {
5,249✔
748
        return util::none;
68✔
749
    }
68✔
750
    return m_metadata_manager->get_app_metadata();
5,181✔
751
}
5,181✔
752

753
void SyncManager::close_all_sessions()
754
{
×
755
    // log_out() will call unregister_session(), which requires m_session_mutex,
756
    // so we need to iterate over them without holding the lock.
757
    decltype(m_sessions) sessions;
×
758
    {
×
759
        util::CheckedLockGuard lk(m_session_mutex);
×
760
        m_sessions.swap(sessions);
×
761
    }
×
762

763
    for (auto& [_, session] : sessions) {
×
764
        session->force_close();
×
765
    }
×
766

767
    get_sync_client().wait_for_session_terminations();
×
768
}
×
769

770
void SyncManager::OnlyForTesting::voluntary_disconnect_all_connections(SyncManager& mgr)
771
{
6✔
772
    mgr.get_sync_client().voluntary_disconnect_all_connections();
6✔
773
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc