• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2210

09 Apr 2024 03:41PM UTC coverage: 92.601% (+0.5%) from 92.106%
2210

push

Evergreen

web-flow
Merge pull request #7300 from realm/tg/rework-metadata-storage

Rework sync user handling and metadata storage

102800 of 195548 branches covered (52.57%)

3051 of 3153 new or added lines in 46 files covered. (96.76%)

41 existing lines in 11 files now uncovered.

249129 of 269035 relevant lines covered (92.6%)

46864217.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.74
/src/realm/object-store/impl/realm_coordinator.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2015 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/realm_coordinator.hpp>
20

21
#include <realm/object-store/impl/collection_notifier.hpp>
22
#include <realm/object-store/impl/external_commit_helper.hpp>
23
#include <realm/object-store/impl/transact_log_handler.hpp>
24
#include <realm/object-store/impl/weak_realm_notifier.hpp>
25
#include <realm/object-store/audit.hpp>
26
#include <realm/object-store/binding_context.hpp>
27
#include <realm/object-store/object_schema.hpp>
28
#include <realm/object-store/object_store.hpp>
29
#include <realm/object-store/property.hpp>
30
#include <realm/object-store/schema.hpp>
31
#include <realm/object-store/thread_safe_reference.hpp>
32
#include <realm/object-store/util/scheduler.hpp>
33

34
#if REALM_ENABLE_SYNC
35
#include <realm/object-store/sync/impl/sync_file.hpp>
36
#include <realm/object-store/sync/async_open_task.hpp>
37
#include <realm/object-store/sync/sync_manager.hpp>
38
#include <realm/object-store/sync/sync_session.hpp>
39
#include <realm/object-store/sync/sync_user.hpp>
40
#include <realm/sync/history.hpp>
41
#include <realm/sync/noinst/client_history_impl.hpp>
42
#endif
43

44
#include <realm/db.hpp>
45
#include <realm/history.hpp>
46
#include <realm/string_data.hpp>
47
#include <realm/util/fifo_helper.hpp>
48
#include <realm/sync/config.hpp>
49

50
#include <algorithm>
51
#include <unordered_map>
52

53
using namespace realm;
54
using namespace realm::_impl;
55

56
static auto& s_coordinator_mutex = *new std::mutex;
57
static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
58

59
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
27,864✔
60
{
448,919✔
61
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
421,055✔
62

234,606✔
63
    auto& weak_coordinator = s_coordinators_per_path[path];
448,919✔
64
    if (auto coordinator = weak_coordinator.lock()) {
435,333✔
65
        return coordinator;
236,996✔
66
    }
222,718✔
67

111,570✔
68
    auto coordinator = std::make_shared<RealmCoordinator>(Private());
211,923✔
69
    weak_coordinator = coordinator;
211,923✔
70
    return coordinator;
211,923✔
71
}
198,337✔
72

73
std::shared_ptr<RealmCoordinator>
74
RealmCoordinator::get_coordinator(const Realm::Config& config) NO_THREAD_SAFETY_ANALYSIS
55✔
75
{
935✔
76
    auto coordinator = get_coordinator(config.path);
935✔
77
    util::CheckedLockGuard lock(coordinator->m_realm_mutex);
935✔
78
    coordinator->set_config(config);
935✔
79
    coordinator->open_db();
935✔
80
    return coordinator;
935✔
81
}
880✔
82

83
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
7✔
84
{
119✔
85
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
119✔
86
    return s_coordinators_per_path[path].lock();
119✔
87
}
112✔
88

89
void RealmCoordinator::set_config(const Realm::Config& config)
17,162✔
90
{
267,602✔
91
    if (config.encryption_key.data() && config.encryption_key.size() != 64)
250,441✔
92
        throw InvalidEncryptionKey();
17,177✔
93
    if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
250,424✔
94
        throw InvalidArgument(ErrorCodes::IllegalCombination,
×
95
                              "Synchronized Realms cannot be opened in immutable mode");
17,161✔
96
    if ((config.schema_mode == SchemaMode::AdditiveDiscovered ||
267,540✔
97
         config.schema_mode == SchemaMode::AdditiveExplicit) &&
254,903✔
98
        config.migration_function)
159,882✔
99
        throw InvalidArgument(ErrorCodes::IllegalCombination,
34✔
100
                              "Realms opened in Additive-only schema mode do not use a migration function");
17,191✔
101
    if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
250,393✔
102
        throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
103
                              "Realms opened in immutable mode do not use a migration function");
17,174✔
104
    if (config.schema_mode == SchemaMode::ReadOnly && config.migration_function)
250,377✔
105
        throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
106
                              "Realms opened in read-only mode do not use a migration function");
17,173✔
107
    if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
250,361✔
108
        throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
109
                              "Realms opened in immutable mode do not use an initialization function");
17,172✔
110
    if (config.schema_mode == SchemaMode::ReadOnly && config.initialization_function)
250,345✔
111
        throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
112
                              "Realms opened in read-only mode do not use an initialization function");
17,171✔
113
    if (config.schema && config.schema_version == ObjectStore::NotVersioned)
250,329✔
114
        throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
115
                              "A schema version must be specified when the schema is specified");
17,170✔
116
    if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
250,313✔
117
        throw InvalidArgument(
17✔
118
            ErrorCodes::IllegalCombination,
17✔
119
            "In-memory realms initialized from memory buffers can only be opened in read-only mode");
17,169✔
120
    if (!config.realm_data.is_null() && !config.path.empty())
250,297✔
121
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Specifying both memory buffer and path is invalid");
17,168✔
122
    if (!config.realm_data.is_null() && !config.encryption_key.empty())
250,281✔
123
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Memory buffers do not support encryption");
17,167✔
124
    if (config.in_memory && !config.encryption_key.empty()) {
250,265✔
125
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Encryption is not supported for in-memory realms");
17✔
126
    }
16✔
127
    // ResetFile also won't use the migration function, but specifying one is
120,968✔
128
    // allowed to simplify temporarily switching modes during development
120,968✔
129

138,118✔
130
#if REALM_ENABLE_SYNC
267,398✔
131
    if (config.sync_config) {
251,216✔
132
        if (config.sync_config->flx_sync_requested && !config.sync_config->partition_value.empty()) {
14,409✔
133
            throw InvalidArgument(ErrorCodes::IllegalCombination,
17✔
134
                                  "Cannot specify a partition value when flexible sync is enabled");
17✔
135
        }
983✔
136
    }
250,232✔
137
#endif
250,232✔
138

120,960✔
139
    bool no_existing_realm =
267,381✔
140
        std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [](auto& notifier) {
166,961✔
141
            return notifier.expired();
51,772✔
142
        });
68,921✔
143
    if (no_existing_realm) {
253,785✔
144
        m_config = config;
202,017✔
145
        m_config.scheduler = nullptr;
202,017✔
146
    }
215,613✔
147
    else {
65,364✔
148
        if (m_config.immutable() != config.immutable()) {
65,364✔
149
            throw LogicError(
13,596✔
150
                ErrorCodes::MismatchedConfig,
3,553✔
151
                util::format("Realm at path '%1' already opened with different read permissions.", config.path));
3,553✔
152
        }
×
153
        if (m_config.in_memory != config.in_memory) {
51,768✔
154
            throw LogicError(
16✔
155
                ErrorCodes::MismatchedConfig,
16✔
156
                util::format("Realm at path '%1' already opened with different inMemory settings.", config.path));
3,569✔
157
        }
17✔
158
        if (m_config.encryption_key != config.encryption_key) {
51,753✔
159
            throw LogicError(
1✔
160
                ErrorCodes::MismatchedConfig,
1✔
161
                util::format("Realm at path '%1' already opened with a different encryption key.", config.path));
3,552✔
162
        }
×
163
        if (m_config.schema_mode != config.schema_mode) {
51,752✔
164
            throw LogicError(
16✔
165
                ErrorCodes::MismatchedConfig,
16✔
166
                util::format("Realm at path '%1' already opened with a different schema mode.", config.path));
3,568✔
167
        }
17✔
168
        util::CheckedLockGuard lock(m_schema_cache_mutex);
51,737✔
169
        if (config.schema && m_schema_version != ObjectStore::NotVersioned &&
51,737✔
170
            m_schema_version != config.schema_version) {
48,913✔
171
            throw LogicError(
3,567✔
172
                ErrorCodes::MismatchedConfig,
3,567✔
173
                util::format("Realm at path '%1' already opened with different schema version.", config.path));
3,214✔
174
        }
17✔
175

22,897✔
176
#if REALM_ENABLE_SYNC
51,721✔
177
        if (bool(m_config.sync_config) != bool(config.sync_config)) {
51,721✔
178
            throw LogicError(
179
                ErrorCodes::MismatchedConfig,
3,550✔
180
                util::format("Realm at path '%1' already opened with different sync configurations.", config.path));
3,550✔
181
        }
×
182

22,896✔
183
        if (config.sync_config) {
51,720✔
184
            auto old_user = m_config.sync_config->user;
2,448✔
185
            auto new_user = config.sync_config->user;
2,448✔
186
            if (old_user != new_user) {
5,998✔
187
                throw LogicError(
155✔
188
                    ErrorCodes::MismatchedConfig,
155✔
189
                    util::format("Realm at path '%1' already opened with different sync user.", config.path));
155✔
190
            }
×
191

1,208✔
192
            if (m_config.sync_config->partition_value != config.sync_config->partition_value) {
2,448✔
193
                throw LogicError(
×
194
                    ErrorCodes::MismatchedConfig,
155✔
UNCOV
195
                    util::format("Realm at path '%1' already opened with different partition value.", config.path));
×
196
            }
×
197
            if (m_config.sync_config->flx_sync_requested != config.sync_config->flx_sync_requested) {
2,448✔
198
                throw LogicError(ErrorCodes::MismatchedConfig,
×
199
                                 util::format("Realm at path '%1' already opened in a different synchronization mode",
155✔
200
                                              config.path));
×
201
            }
×
202
        }
51,720✔
203
#endif
51,720✔
204
        // Mixing cached and uncached Realms is allowed
26,446✔
205
        m_config.cache = config.cache;
55,270✔
206

22,896✔
207
        // Realm::update_schema() handles complaining about schema mismatches
26,446✔
208
    }
51,720✔
209
}
250,232✔
210

3,550✔
211
std::shared_ptr<Realm> RealmCoordinator::get_cached_realm(Realm::Config const& config,
17,149✔
212
                                                          std::shared_ptr<util::Scheduler> scheduler)
213
{
1,536✔
214
    if (!config.cache)
1,536✔
215
        return nullptr;
1,568✔
216
    util::CheckedUniqueLock lock(m_realm_mutex);
160✔
217
    return do_get_cached_realm(config, scheduler);
156✔
218
}
68✔
219

4✔
220
std::shared_ptr<Realm> RealmCoordinator::do_get_cached_realm(Realm::Config const& config,
4✔
221
                                                             std::shared_ptr<util::Scheduler> scheduler)
222
{
260,984✔
223
    if (!config.cache)
260,984✔
224
        return nullptr;
272,293✔
225

18,061✔
226
    if (!scheduler) {
23,549✔
227
        scheduler = config.scheduler;
6,384✔
228
    }
7,168✔
229

1,016✔
230
    if (!scheduler)
7,288✔
231
        return nullptr;
232

1,024✔
233
    for (auto& cached_realm : m_weak_realm_notifiers) {
33,903✔
234
        if (!cached_realm.is_cached_for_scheduler(scheduler))
33,887✔
235
            continue;
33,107✔
236
        // can be null if we jumped in between ref count hitting zero and
4,292✔
237
        // unregister_realm() getting the lock
3,678✔
238
        if (auto realm = cached_realm.realm()) {
4,992✔
239
            // If the file is uninitialized and was opened without a schema,
80✔
240
            // do the normal schema init
694✔
241
            if (realm->schema_version() == ObjectStore::NotVersioned)
4,992✔
242
                break;
32✔
243

678✔
244
            // Otherwise if we have a realm schema it needs to be an exact
66✔
245
            // match (even having the same properties but in different
64✔
246
            // orders isn't good enough)
64✔
247
            if (config.schema && realm->schema() != *config.schema)
4,960✔
248
                throw LogicError(
249
                    ErrorCodes::MismatchedConfig,
612✔
250
                    util::format("Realm at path '%1' already opened on current thread with different schema.",
×
251
                                 config.path));
×
252

64✔
253
            return realm;
4,960✔
254
        }
4,960✔
255
    }
5,604✔
256
    return nullptr;
2,228✔
257
}
7,126✔
258

172✔
259
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config, util::Optional<VersionID> version)
784✔
260
{
248,311✔
261
    REALM_ASSERT(!version || *version != VersionID());
248,311✔
262
    if (!config.scheduler)
265,339✔
263
        config.scheduler = version ? util::Scheduler::make_frozen(*version) : util::Scheduler::make_default();
206,806✔
264
    // realm must be declared before lock so that the mutex is released before
137,028✔
265
    // we release the strong reference to realm, as Realm's destructor may want
131,855✔
266
    // to acquire the same lock
120,000✔
267
    std::shared_ptr<Realm> realm;
248,311✔
268
    util::CheckedUniqueLock lock(m_realm_mutex);
248,311✔
269
    set_config(config);
265,339✔
270
    if ((realm = do_get_cached_realm(config))) {
265,339✔
271
        REALM_ASSERT(!version || realm->read_transaction_version() == *version);
21,956✔
272
        return realm;
21,956✔
273
    }
5,538✔
274
    do_get_realm(std::move(config), realm, version, lock);
243,993✔
275
    if (version) {
243,993✔
276
        realm->read_group();
19,234✔
277
    }
19,234✔
278
    return realm;
243,559✔
279
}
243,559✔
280

16,418✔
281
std::shared_ptr<Realm> RealmCoordinator::get_realm(std::shared_ptr<util::Scheduler> scheduler, bool first_time_open)
16,418✔
282
{
7,760✔
283
    std::shared_ptr<Realm> realm;
7,760✔
284
    util::CheckedUniqueLock lock(m_realm_mutex);
8,245✔
285
    auto config = m_config;
8,245✔
286
    config.scheduler = scheduler ? scheduler : util::Scheduler::make_default();
7,973✔
287
    if ((realm = do_get_cached_realm(config))) {
8,245✔
288
        return realm;
451✔
289
    }
485✔
290
    do_get_realm(std::move(config), realm, none, lock, first_time_open);
7,760✔
291
    return realm;
7,760✔
292
}
8,245✔
293

485✔
294
std::shared_ptr<Realm> RealmCoordinator::freeze_realm(const Realm& source_realm)
485✔
295
{
5,104✔
296
    std::shared_ptr<Realm> realm;
5,104✔
297
    util::CheckedUniqueLock lock(m_realm_mutex);
5,423✔
298

2,863✔
299
    auto version = source_realm.read_transaction_version();
5,423✔
300
    auto scheduler = util::Scheduler::make_frozen(version);
5,104✔
301
    if ((realm = do_get_cached_realm(source_realm.config(), scheduler))) {
5,423✔
302
        return realm;
335✔
303
    }
335✔
304

2,537✔
305
    auto config = source_realm.config();
5,089✔
306
    config.scheduler = scheduler;
5,088✔
307
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
5,406✔
308
    Realm::Internal::copy_schema(*realm, source_realm);
5,406✔
309
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
5,406✔
310
    return realm;
5,406✔
311
}
5,406✔
312

318✔
313
ThreadSafeReference RealmCoordinator::get_unbound_realm()
318✔
314
{
1,888✔
315
    std::shared_ptr<Realm> realm;
1,888✔
316
    util::CheckedUniqueLock lock(m_realm_mutex);
2,006✔
317
    do_get_realm(RealmConfig(m_config), realm, none, lock);
2,006✔
318
    return ThreadSafeReference(realm);
2,006✔
319
}
2,006✔
320

118✔
321
void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>& realm,
118✔
322
                                    util::Optional<VersionID> version, util::CheckedUniqueLock& realm_lock,
323
                                    bool first_time_open)
324
{
252,776✔
325
    const auto db_created = open_db();
252,776✔
326
#ifdef REALM_ENABLE_SYNC
269,782✔
327
    SyncConfig::SubscriptionInitializerCallback subscription_function = nullptr;
269,782✔
328
    bool rerun_on_open = false;
269,782✔
329
    if (config.sync_config && config.sync_config->flx_sync_requested &&
269,782✔
330
        config.sync_config->subscription_initializer) {
144,286✔
331
        subscription_function = config.sync_config->subscription_initializer;
18,158✔
332
        rerun_on_open = config.sync_config->rerun_init_subscription_on_open;
1,481✔
333
    }
1,224✔
334
#else
72✔
335
    static_cast<void>(first_time_open);
72✔
336
    static_cast<void>(db_created);
337
#endif
338

124,648✔
339
    auto schema = std::move(config.schema);
252,776✔
340
    auto migration_function = std::move(config.migration_function);
252,776✔
341
    auto initialization_function = std::move(config.initialization_function);
269,782✔
342
    config.schema = {};
269,782✔
343

141,654✔
344
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
269,782✔
345
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
252,776✔
346

141,654✔
347
#ifdef REALM_ENABLE_SYNC
269,782✔
348
    if (m_sync_session && m_sync_session->user()->is_logged_in())
252,776✔
349
        m_sync_session->revive_if_needed();
31,502✔
350

141,654✔
351
    if (realm->config().audit_config) {
253,749✔
352
        if (m_audit_context)
624✔
353
            m_audit_context->update_metadata(realm->config().audit_config->metadata);
17,038✔
354
        else
670✔
355
            m_audit_context = make_audit_context(m_db, realm->config());
596✔
356
    }
698✔
357
#else
74✔
358
    if (realm->config().audit_config)
78✔
359
        REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
360
#endif
361

124,648✔
362
    // Cached frozen Realms need to initialize their schema before releasing
124,648✔
363
    // the lock as otherwise they could be read from the cache on another thread
124,648✔
364
    // before the schema initialization happens. They'll never perform a write
124,648✔
365
    // transaction, so unlike with live Realms this is safe to do.
124,648✔
366
    if (config.cache && version && schema) {
252,776!
367
        realm->update_schema(std::move(*schema));
368
        schema.reset();
17,006!
369
    }
×
370

124,648✔
371
    realm_lock.unlock_unchecked();
252,776✔
372
    if (schema) {
252,776✔
373
        realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
218,726✔
374
                             std::move(initialization_function));
218,726✔
375
    }
215,564✔
376

138,492✔
377
#ifdef REALM_ENABLE_SYNC
266,620✔
378
    // run subscription initializer if the SDK has instructed core to do so. The subscription callback will be run if:
124,648✔
379
    // 1. this is the first time we are creating the realm file
141,654✔
380
    // 2. the database was already created, but this is the first time we are opening the db and the flag
124,648✔
381
    // rerun_on_open was set
124,648✔
382
    if (subscription_function) {
252,776✔
383
        const auto current_subscription = realm->get_latest_subscription_set();
1,120✔
384
        const auto subscription_version = current_subscription.version();
18,126✔
385
        // in case we are hitting this check while during a normal open, we need to take in
630✔
386
        // consideration if the db was created during this call. Since this may be the first time
630✔
387
        // we are actually creating a realm. For async open this does not apply, infact db_created
560✔
388
        // will always be false.
560✔
389
        if (!first_time_open)
1,120✔
390
            first_time_open = db_created;
808✔
391
        if (subscription_version == 0 || (first_time_open && rerun_on_open)) {
1,190✔
392
            bool was_in_read = realm->is_in_read_transaction();
643✔
393
            subscription_function(realm);
662✔
394
            if (!was_in_read)
629✔
395
                realm->invalidate();
629✔
396
        }
629✔
397
    }
1,157✔
398
#endif
252,813✔
399
}
252,846✔
400

17,006✔
401
void RealmCoordinator::bind_to_context(Realm& realm)
17,006✔
402
{
1,520✔
403
    util::CheckedLockGuard lock(m_realm_mutex);
1,520✔
404
    for (auto& cached_realm : m_weak_realm_notifiers) {
2,175✔
405
        if (!cached_realm.is_for_realm(&realm))
2,175✔
406
            continue;
690✔
407
        cached_realm.bind_to_scheduler();
1,650✔
408
        return;
1,555✔
409
    }
1,615✔
410
    REALM_TERMINATE("Invalid Realm passed to bind_to_context()");
95✔
411
}
95✔
412

413
#if REALM_ENABLE_SYNC
414
std::shared_ptr<AsyncOpenTask> RealmCoordinator::get_synchronized_realm(Realm::Config config)
415
{
1,248✔
416
    if (!config.sync_config)
1,248✔
417
        throw LogicError(ErrorCodes::IllegalOperation,
78✔
418
                         "This method is only available for fully synchronized Realms.");
78✔
419

624✔
420
    util::CheckedLockGuard lock(m_realm_mutex);
1,248✔
421
    set_config(config);
1,248✔
422
    const auto db_open_first_time = open_db();
1,326✔
423
    return std::make_shared<AsyncOpenTask>(AsyncOpenTask::Private(), shared_from_this(), m_sync_session,
1,326✔
424
                                           db_open_first_time);
1,326✔
425
}
1,326✔
426

78✔
427
#endif
78✔
428

429
bool RealmCoordinator::open_db()
430
{
255,064✔
431
    if (m_db)
255,064✔
432
        return false;
74,018✔
433

115,061✔
434
#if REALM_ENABLE_SYNC
201,767✔
435
    if (m_config.sync_config) {
198,195✔
436
        // If we previously opened this Realm, we may have a lingering sync
18,705✔
437
        // session which outlived its RealmCoordinator. If that happens we
18,705✔
438
        // want to reuse it instead of creating a new DB.
5,911✔
439
        m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
11,515✔
440
        if (m_sync_session) {
11,515✔
441
            m_db = SyncSession::Internal::get_db(*m_sync_session);
208✔
442
            init_external_helpers();
991✔
443
            return false;
988✔
444
        }
988✔
445
    }
198,770✔
446
#endif
197,999✔
447

97,823✔
448
    bool server_synchronization_mode = m_config.sync_config || m_config.force_sync_history;
197,999✔
449
    bool schema_mode_reset_file =
197,999✔
450
        m_config.schema_mode == SchemaMode::SoftResetFile || m_config.schema_mode == SchemaMode::HardResetFile;
211,552✔
451
    try {
211,552✔
452
        if (m_config.immutable() && m_config.realm_data) {
197,987✔
453
            m_db = DB::create(m_config.realm_data, false);
13,581✔
454
            return true;
13,581✔
455
        }
13,581✔
456
        std::unique_ptr<Replication> history;
211,536✔
457
        if (server_synchronization_mode) {
211,536✔
458
#if REALM_ENABLE_SYNC
68,180✔
459
            bool apply_server_changes = !m_config.sync_config || m_config.sync_config->apply_server_changes;
68,180✔
460
            history = std::make_unique<sync::ClientReplication>(apply_server_changes);
68,180✔
461
#else
13,564✔
462
            REALM_TERMINATE("Realm was not built with sync enabled");
13,564✔
463
#endif
4,328✔
464
        }
72,507✔
465
        else if (!m_config.immutable()) {
134,120✔
466
            history = make_in_realm_history();
129,272✔
467
        }
129,272✔
468

97,803✔
469
        DBOptions options;
202,299✔
470
#ifndef __EMSCRIPTEN__
207,207✔
471
        options.enable_async_writes = true;
207,177✔
472
#endif
207,177✔
473
        options.durability = m_config.in_memory ? DBOptions::Durability::MemOnly : DBOptions::Durability::Full;
161,067✔
474
        options.is_immutable = m_config.immutable();
211,535✔
475
        options.logger = util::Logger::get_default_logger();
211,535✔
476

111,367✔
477
        if (!m_config.fifo_files_fallback_path.empty()) {
211,535✔
478
            options.temp_dir = util::normalize_dir(m_config.fifo_files_fallback_path);
7,940✔
479
        }
13,596✔
480
        options.encryption_key = m_config.encryption_key.data();
211,535✔
481
        options.allow_file_format_upgrade = !m_config.disable_format_upgrade && !schema_mode_reset_file;
197,971✔
482
        options.clear_on_invalid_file = m_config.clear_on_invalid_file;
211,535✔
483
        if (history) {
197,973✔
484
            options.backup_at_file_format_change = m_config.backup_at_file_format_change;
197,453✔
485
#ifdef __EMSCRIPTEN__
13,564✔
486
            // Force the DB to be created in memory-only mode, ignoring the filesystem path supplied in the config.
13,564✔
487
            // This is so we can run an SDK on top without having to solve the browser persistence problem yet,
13,564✔
488
            // or teach RealmConfig and SDKs about pure in-memory realms.
13,564✔
489
            m_db = DB::create_in_memory(std::move(history), m_config.path, options);
13,534✔
490
#else
491
            if (m_config.path.size()) {
197,451✔
492
                m_db = DB::create(std::move(history), m_config.path, options);
129,691✔
493
            }
129,691✔
494
            else {
67,760✔
495
                m_db = DB::create(std::move(history), options);
67,760✔
496
            }
81,294✔
497
#endif
206,750✔
498
        }
206,750✔
499
        else {
4,755✔
500
            options.no_create = true;
4,755✔
501
            m_db = DB::create(m_config.path, options);
4,755✔
502
        }
14,054✔
503
    }
211,505✔
504
    catch (realm::FileFormatUpgradeRequired const&) {
97,841✔
505
        if (!schema_mode_reset_file) {
30!
506
            throw;
30✔
507
        }
30✔
508
        util::File::remove(m_config.path);
13,564✔
UNCOV
509
        return open_db();
×
510
    }
×
511
    catch (UnsupportedFileFormatVersion const&) {
×
512
        if (!schema_mode_reset_file) {
×
513
            throw;
×
514
        }
×
515
        util::File::remove(m_config.path);
×
516
        return open_db();
×
517
    }
×
518

97,763✔
519
    if (m_config.should_compact_on_launch_function) {
197,883✔
520
        size_t free_space = 0;
128✔
521
        size_t used_space = 0;
128✔
522
        if (auto tr = m_db->start_write(true)) {
128✔
523
            tr->commit();
128✔
524
            m_db->get_stats(free_space, used_space);
13,686✔
525
        }
136✔
526
        if (free_space > 0 && m_config.should_compact_on_launch_function(free_space + used_space, used_space))
136✔
527
            m_db->compact();
72✔
528
    }
136✔
529

97,771✔
530
    init_external_helpers();
197,891✔
531
    return true;
197,891✔
532
}
197,887✔
533

8✔
534
void RealmCoordinator::init_external_helpers()
535
{
211,649✔
536
    // There's a circular dependency between SyncSession and ExternalCommitHelper
111,422✔
537
    // where sync commits notify ECH and other commits notify sync via ECH. This
111,422✔
538
    // happens on background threads, so to avoid needing locking on every access
97,864✔
539
    // we have to wire things up in a specific order.
97,864✔
540
#if REALM_ENABLE_SYNC
211,661✔
541
    // We may have reused an existing sync session that outlived its original
97,864✔
542
    // RealmCoordinator. If not, we need to create a new one now.
97,864✔
543
    if (m_config.sync_config && !m_sync_session)
198,091✔
544
        m_sync_session = m_config.sync_config->user->sync_manager()->get_session(m_db, m_config);
11,267✔
545
#endif
211,661✔
546

97,864✔
547
    if (!m_notifier && !m_config.immutable() && m_config.automatic_change_notifications) {
198,091✔
548
        try {
63,271✔
549
            m_notifier = std::make_unique<ExternalCommitHelper>(*this, m_config);
50,472✔
550
        }
49,704✔
551
        catch (std::system_error const& ex) {
24,780✔
552
            throw FileAccessError(ErrorCodes::FileOperationFailed,
3✔
553
                                  util::format("Failed to create ExternalCommitHelper: %1", ex.what()), get_path(),
3✔
554
                                  ex.code().value());
3✔
555
        }
768✔
556
    }
198,843✔
557
    m_db->add_commit_listener(this);
198,843✔
558
}
198,843✔
559

13,570✔
560
void RealmCoordinator::close()
561
{
13,727✔
562
    m_db->close();
3,212✔
563
    m_db = nullptr;
3,212✔
564
}
3,212✔
565

566
void RealmCoordinator::delete_and_reopen()
567
{
160✔
568
    util::CheckedLockGuard lock(m_realm_mutex);
160✔
569
    close();
160✔
570
    util::File::remove(m_config.path);
13,726✔
571
    open_db();
13,726✔
572
}
13,726✔
573

574
TransactionRef RealmCoordinator::begin_read(VersionID version, bool frozen_transaction)
575
{
682,595✔
576
    REALM_ASSERT(m_db);
682,595✔
577
    return frozen_transaction ? m_db->start_frozen(version) : m_db->start_read(version);
679,603✔
578
}
682,595✔
579

580
uint64_t RealmCoordinator::get_schema_version() const noexcept
581
{
10✔
582
    util::CheckedLockGuard lock(m_schema_cache_mutex);
10✔
583
    return m_schema_version;
10✔
584
}
10✔
585

10✔
586
bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
10✔
587
                                         uint64_t& transaction) const noexcept
588
{
407,656✔
589
    util::CheckedLockGuard lock(m_schema_cache_mutex);
453,994✔
590
    if (!m_cached_schema)
453,994✔
591
        return false;
393,152✔
592
    schema = *m_cached_schema;
106,807✔
593
    schema_version = m_schema_version;
60,469✔
594
    transaction = m_schema_transaction_version_max;
60,469✔
595
    return true;
60,469✔
596
}
60,469✔
597

598
void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
599
                                    uint64_t transaction_version)
600
{
390,917✔
601
    util::CheckedLockGuard lock(m_schema_cache_mutex);
390,917✔
602
    if (transaction_version < m_schema_transaction_version_max)
417,603✔
603
        return;
29,022✔
604
    if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
415,267✔
605
        return;
175,213✔
606

120,723✔
607
    m_cached_schema = new_schema;
240,054✔
608
    m_schema_version = new_schema_version;
240,054✔
609
    m_schema_transaction_version_min = transaction_version;
240,054✔
610
    m_schema_transaction_version_max = transaction_version;
240,054✔
611
}
236,257✔
612

613
void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
614
{
178,429✔
615
    util::CheckedLockGuard lock(m_schema_cache_mutex);
178,429✔
616
    m_cached_schema = util::none;
178,429✔
617
    m_schema_version = new_schema_version;
152,938✔
618
}
178,283✔
619

9,482✔
620
void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
621
{
1,071,236✔
622
    util::CheckedLockGuard lock(m_schema_cache_mutex);
1,071,236✔
623
    if (!m_cached_schema)
1,071,236✔
624
        return;
166,125✔
625
    REALM_ASSERT(previous <= m_schema_transaction_version_max);
921,120✔
626
    if (next < m_schema_transaction_version_min)
905,111✔
627
        return;
16✔
628
    m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
914,648✔
629
    m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
914,648✔
630
}
914,648✔
631

9,553✔
632
RealmCoordinator::RealmCoordinator(Private) {}
207,892✔
633

634
RealmCoordinator::~RealmCoordinator()
635
{
261,544✔
636
    {
261,544✔
637
        std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
261,544✔
638
        for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end();) {
440,752✔
639
            if (it->second.expired()) {
286,846✔
640
                it = s_coordinators_per_path.erase(it);
251,805✔
641
            }
197,987✔
642
            else {
88,859✔
643
                ++it;
88,859✔
644
            }
88,859✔
645
        }
233,027✔
646
    }
211,925✔
647

97,984✔
648
    if (m_db) {
198,339✔
649
        m_db->remove_commit_listener(this);
211,533✔
650
    }
211,533✔
651

111,570✔
652
    // Waits for the worker thread to join
127,442✔
653
    m_notifier.reset();
214,211✔
654

111,548✔
655
    // If there's any active NotificationTokens they'll keep the notifiers alive,
111,548✔
656
    // so tell the notifiers to release their Transactions so that the DB can
100,292✔
657
    // be closed immediately.
100,292✔
658
    // No locking needed here because the worker thread is gone
100,292✔
659
    for (auto& notifier : m_new_notifiers)
214,211✔
660
        notifier->release_data();
14,018✔
661
    for (auto& notifier : m_notifiers)
198,339✔
662
        notifier->release_data();
14,227✔
663
}
211,900✔
664

13,561✔
665
void RealmCoordinator::unregister_realm(Realm* realm)
666
{
259,440✔
667
    util::CheckedLockGuard lock(m_realm_mutex);
273,026✔
668
    // Normally results notifiers are cleaned up by the background worker thread
127,976✔
669
    // but if that's disabled we need to ensure that any notifiers from this
127,976✔
670
    // Realm get cleaned up
127,976✔
671
    if (!m_config.automatic_change_notifications) {
259,440✔
672
        util::CheckedLockGuard lock(m_notifier_mutex);
194,806✔
673
        clean_up_dead_notifiers();
208,392✔
674
    }
194,833✔
675
    {
273,026✔
676
        auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [=](auto& notifier) {
371,280✔
677
            return notifier.expired() || notifier.is_for_realm(realm);
384,826✔
678
        });
371,240✔
679
        m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
259,440✔
680
    }
276,859✔
681
}
276,859✔
682

683
// Thread-safety analysis doesn't reasonably handle calling functions on different
684
// instances of this type
685
void RealmCoordinator::clear_cache() NO_THREAD_SAFETY_ANALYSIS
17,419✔
686
{
13,748✔
687
    std::vector<std::shared_ptr<RealmCoordinator>> coordinators;
13,748✔
688
    {
13,748✔
689
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
17,803✔
690
        for (auto& weak_coordinator : s_coordinators_per_path) {
25,169✔
691
            if (auto coordinator = weak_coordinator.second.lock()) {
25,169✔
692
                coordinators.push_back(coordinator);
25,169✔
693
            }
17,803✔
694
        }
17,803✔
695
        s_coordinators_per_path.clear();
17,803✔
696
    }
384✔
697

192✔
698
    for (auto& coordinator : coordinators) {
384✔
699
        coordinator->m_notifier = nullptr;
384✔
700

216✔
701
        std::vector<std::shared_ptr<Realm>> realms_to_close;
408✔
702
        {
408✔
703
            // Gather a list of all of the realms which will be removed
216✔
704
            util::CheckedLockGuard lock(coordinator->m_realm_mutex);
408✔
705
            for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
408✔
706
                if (auto realm = weak_realm_notifier.realm()) {
408✔
707
                    realms_to_close.push_back(realm);
408✔
708
                }
408✔
709
            }
408✔
710
        }
408✔
711

192✔
712
        // Close all of the previously cached Realms. This can't be done while
216✔
713
        // locks are held as it may try to re-lock them.
216✔
714
        for (auto& realm : realms_to_close)
384✔
715
            realm->close();
408✔
716
    }
408✔
717
}
384✔
718

24✔
719
void RealmCoordinator::clear_all_caches()
24✔
720
{
872✔
721
    std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
872✔
722
    {
872✔
723
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
872✔
724
        for (auto iter : s_coordinators_per_path) {
640✔
725
            to_clear.push_back(iter.second);
384✔
726
        }
384✔
727
    }
848✔
728
    for (auto weak_coordinator : to_clear) {
640✔
729
        if (auto coordinator = weak_coordinator.lock()) {
408✔
730
            coordinator->clear_cache();
408✔
731
        }
408✔
732
    }
384✔
733
}
848✔
734

53✔
735
void RealmCoordinator::assert_no_open_realms() noexcept
53✔
736
{
8,173✔
737
#ifdef REALM_DEBUG
8,173✔
738
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
8,144✔
739
    REALM_ASSERT(s_coordinators_per_path.empty());
8,144✔
740
#endif
8,144✔
741
}
8,173✔
742

24✔
743
void RealmCoordinator::wake_up_notifier_worker()
24✔
744
{
85,562✔
745
    if (m_notifier) {
85,562✔
746
        // FIXME: this wakes up the notification workers for all processes and
505✔
747
        // not just us. This might be worth optimizing in the future.
534✔
748
        m_notifier->notify_others();
946✔
749
    }
946✔
750
}
86,048✔
751

510✔
752
void RealmCoordinator::commit_write(Realm& realm, bool commit_to_disk)
510✔
753
{
527,048✔
754
    REALM_ASSERT(!m_config.immutable());
527,048✔
755
    REALM_ASSERT(realm.is_in_transaction());
527,048✔
756

257,865✔
757
    Transaction& tr = Realm::Internal::get_transaction(realm);
526,538✔
758
    VersionID new_version;
531,885✔
759
    {
531,885✔
760
        // Need to acquire this lock before committing or another process could
257,865✔
761
        // perform a write and notify us before we get the chance to set the
257,865✔
762
        // skip version
257,925✔
763
        util::CheckedLockGuard l(m_notifier_mutex);
526,598✔
764
        new_version = tr.commit_and_continue_as_read(commit_to_disk);
531,885✔
765

257,865✔
766
        // The skip version must always be the notifier transaction's current
257,865✔
767
        // version plus one, as we can only skip a prefix and not intermediate
289,750✔
768
        // transactions. If we have a notifier for the current Realm, then we
289,750✔
769
        // waited until it finished running in begin_transaction() and this
289,750✔
770
        // invariant holds. If we don't have any notifiers then we don't need
257,865✔
771
        // to set the skip version, but more importantly *can't* because we
289,750✔
772
        // didn't block when starting the write and the notifier transaction
289,750✔
773
        // may still be on an older version.
289,750✔
774
        //
257,865✔
775
        // Note that this relies on the fact that callbacks cannot be added from
257,865✔
776
        // within write transactions. If they could be, we could hit this point
257,865✔
777
        // with an implicit-created notifier which ran (and so is in m_notifiers
289,750✔
778
        // and not m_new_notifiers) but didn't have a callback at the start of
289,750✔
779
        // the write so we didn't block for it then, but does now have a callback.
257,865✔
780
        // If we add support for that, we'll need to update this logic.
257,865✔
781
        bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(), [&](auto&& notifier) {
300,578✔
782
            return notifier->is_for_realm(realm) && notifier->have_callbacks();
85,824✔
783
        });
85,824✔
784
        if (have_notifiers) {
526,538✔
785
            REALM_ASSERT(!m_notifier_skip_version);
19,250✔
786
            REALM_ASSERT(m_notifier_transaction);
19,250✔
787
            REALM_ASSERT_3(m_notifier_transaction->get_transact_stage(), ==, DB::transact_Reading);
19,250✔
788
            REALM_ASSERT_3(m_notifier_transaction->get_version() + 1, ==, new_version.version);
19,250✔
789
            m_notifier_skip_version = tr.duplicate();
19,250✔
790
        }
19,250✔
791
    }
526,538✔
792

257,865✔
793
    if (realm.m_binding_context) {
526,538✔
794
        realm.m_binding_context->did_change({}, {});
208✔
795
    }
5,626✔
796
    // note: no longer safe to access `realm` or `this` after this point as
263,283✔
797
    // did_change() may have closed the Realm.
263,283✔
798
}
558,423✔
799

1,214✔
800
void RealmCoordinator::enable_wait_for_change()
1,214✔
801
{
1,214✔
802
    m_db->enable_wait_for_change();
1,214✔
803
}
1,214✔
804

1,214✔
805
bool RealmCoordinator::wait_for_change(std::shared_ptr<Transaction> tr)
31,885✔
806
{
807
    return m_db->wait_for_change(tr);
31,885✔
808
}
13✔
809

13✔
810
void RealmCoordinator::wait_for_change_release()
811
{
812
    m_db->wait_for_change_release();
31,885✔
813
}
814

815
bool RealmCoordinator::can_advance(Realm& realm)
816
{
201,589✔
817
    return realm.last_seen_transaction_version() != m_db->get_version_of_latest_snapshot();
201,589✔
818
}
201,589✔
819

820
// Thread-safety analysis doesn't reasonably handle calling functions on different
821
// instances of this type
822
void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier) NO_THREAD_SAFETY_ANALYSIS
823
{
83,906✔
824
    auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
83,906✔
825
    {
83,906✔
826
        util::CheckedLockGuard lock(self.m_notifier_mutex);
83,906✔
827
        notifier->set_initial_transaction(self.m_new_notifiers);
83,906✔
828
        self.m_new_notifiers.push_back(std::move(notifier));
83,906✔
829
    }
83,906✔
830
}
98,021✔
831

14,115✔
832
void RealmCoordinator::clean_up_dead_notifiers()
14,115✔
833
{
693,875✔
834
    auto swap_remove = [&](auto& container) {
1,387,750✔
835
        bool did_remove = false;
1,387,750✔
836
        for (size_t i = 0; i < container.size(); ++i) {
1,950,600✔
837
            if (container[i]->is_alive())
568,095✔
838
                continue;
485,262✔
839

46,669✔
840
            // Ensure the notifier is destroyed here even if there's lingering refs
46,669✔
841
            // to the async notifier elsewhere
46,669✔
842
            container[i]->release_data();
88,078✔
843

46,669✔
844
            if (container.size() > i + 1)
88,078✔
845
                container[i] = std::move(container.back());
42,528✔
846
            container.pop_back();
82,833✔
847
            --i;
126,083✔
848
            did_remove = true;
169,333✔
849
        }
169,333✔
850
        return did_remove;
1,507,856✔
851
    };
1,421,356✔
852

384,726✔
853
    if (swap_remove(m_notifiers) && m_notifiers.empty()) {
693,875✔
854
        m_notifier_transaction = nullptr;
39,988✔
855
        m_notifier_handover_transaction = nullptr;
39,988✔
856
        m_notifier_skip_version.reset();
45,166✔
857
    }
39,988✔
858
    swap_remove(m_new_notifiers);
699,053✔
859
}
696,536✔
860

5,178✔
861
void RealmCoordinator::on_commit(DB::version_type)
5,178✔
862
{
685,045✔
863
    if (m_notifier) {
685,045✔
864
        m_notifier->notify_others();
314,993✔
865
    }
314,993✔
866
}
679,867✔
867

43,250✔
868
void RealmCoordinator::on_change()
2,498✔
869
{
375,486✔
870
#if REALM_ENABLE_SYNC
375,486✔
871
    // Invoke realm sync if another process has notified for a change
192,945✔
872
    if (m_sync_session) {
416,238✔
873
        auto version = m_db->get_version_of_latest_snapshot();
117,115✔
874
        SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
73,865✔
875
    }
73,865✔
876
#endif
414,651✔
877

232,110✔
878
    {
387,499✔
879
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
387,499✔
880
        run_async_notifiers();
414,651✔
881
    }
372,988✔
882

190,447✔
883
    util::CheckedLockGuard lock(m_realm_mutex);
395,802✔
884
    for (auto& realm : m_weak_realm_notifiers) {
551,421✔
885
        realm.notify();
528,607✔
886
    }
551,421✔
887
}
377,570✔
888

4,582✔
889
void RealmCoordinator::run_async_notifiers()
4,582✔
890
{
397,017✔
891
    util::CheckedUniqueLock lock(m_notifier_mutex);
374,203✔
892

214,197✔
893
    clean_up_dead_notifiers();
397,017✔
894

214,197✔
895
    if (m_notifiers.empty() && m_new_notifiers.empty()) {
397,017✔
896
        REALM_ASSERT(!m_notifier_skip_version);
224,048✔
897
        return;
246,862✔
898
    }
257,813✔
899

115,320✔
900
    if (!m_notifier_transaction) {
183,920✔
901
        REALM_ASSERT(m_notifiers.empty());
63,395✔
902
        REALM_ASSERT(!m_notifier_skip_version);
40,581✔
903
        m_notifier_transaction = m_db->start_read();
40,581✔
904
    }
63,432✔
905

104,406✔
906
    // We need to pick the final version to advance to while the lock is held
81,555✔
907
    // as otherwise if a commit is made while new notifiers are being advanced
104,406✔
908
    // we could end up advancing over the skip version. We create a transaction
81,555✔
909
    // object for it to make sure the version stays valid
104,406✔
910
    TransactionRef newest_transaction = m_db->start_read();
164,333✔
911
    VersionID version = newest_transaction->get_version_of_current_transaction();
164,333✔
912

95,733✔
913
    auto skip_version = std::move(m_notifier_skip_version);
150,155✔
914

90,228✔
915
    // Make a copy of the notifiers vector and then release the lock to avoid
84,090✔
916
    // blocking other threads trying to register or unregister notifiers while we run them
84,090✔
917
    decltype(m_notifiers) notifiers;
152,690✔
918
    if (version != m_notifier_transaction->get_version_of_current_transaction()) {
152,690✔
919
        // We only want to rerun the existing notifiers if the version has changed.
48,046✔
920
        // This is both a minor optimization and required for notification
48,046✔
921
        // skipping to work. The skip logic assumes that the notifier can't be
48,046✔
922
        // running when suppress_next() is called because it can only be called
48,046✔
923
        // from within a write transaction, and starting the write transaction
48,046✔
924
        // would have blocked until the notifier is done running. However, if we
56,719✔
925
        // run the notifiers at a point where the version isn't changing, that
56,719✔
926
        // could happen concurrently with a call to suppress_next(), and we
48,046✔
927
        // could unset skip_next on a callback from that zero-version run
56,719✔
928
        // rather than the intended one.
48,046✔
929
        //
48,046✔
930
        // Spurious wakeups can happen in a few ways: adding a new notifier,
48,046✔
931
        // adding a new notifier in a different process sharing this Realm file,
56,719✔
932
        // closing the Realm in a different process, and possibly some other cases.
56,719✔
933
        notifiers = m_notifiers;
83,703✔
934
    }
83,703✔
935
    else {
66,452✔
936
        REALM_ASSERT(!skip_version);
66,452✔
937
        if (m_new_notifiers.empty()) {
66,452✔
938
            // We were spuriously woken up and there isn't actually anything to do
12,919✔
939
            return;
25,289✔
940
        }
25,289✔
941
    }
124,866✔
942

68,636✔
943
    auto new_notifiers = std::move(m_new_notifiers);
124,866✔
944
    m_new_notifiers.clear();
124,866✔
945
    m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
124,866✔
946
    lock.unlock();
124,866✔
947

73,097✔
948
    // Advance all of the new notifiers to the most recent version, if any
73,097✔
949
    std::vector<TransactionChangeInfo> new_notifier_change_info;
129,078✔
950
    if (!new_notifiers.empty()) {
129,078✔
951
        new_notifier_change_info.reserve(new_notifiers.size());
45,392✔
952
        for (auto& notifier : new_notifiers) {
83,330✔
953
            if (notifier->version() == version)
84,968✔
954
                continue;
82,088✔
955
            new_notifier_change_info.emplace_back();
9,915✔
956
            notifier->add_required_change_info(new_notifier_change_info.back());
2,880✔
957
            transaction::parse(*newest_transaction, new_notifier_change_info.back(), notifier->version().version,
9,915✔
958
                               version.version);
9,915✔
959
        }
9,915✔
960
    }
48,215✔
961

68,636✔
962
    // If the skip version is set and we have more than one version to process,
68,636✔
963
    // we need to start with just the skip version so that any suppressed
75,671✔
964
    // callbacks can ignore the changes from it without missing changes from
75,671✔
965
    // later versions. If the skip version is set and there aren't any more
71,211✔
966
    // versions after it, we just want to process with normal processing. See
73,845✔
967
    // the above note about spurious wakeups for why this is required for
73,845✔
968
    // correctness and not just a very minor optimization.
73,665✔
969
    if (skip_version && skip_version->get_version_of_current_transaction() != version) {
125,046✔
970
        REALM_ASSERT(!notifiers.empty());
212✔
971
        REALM_ASSERT(version >= skip_version->get_version_of_current_transaction());
212✔
972
        TransactionChangeInfo info;
212✔
973
        for (auto& notifier : notifiers)
212✔
974
            notifier->add_required_change_info(info);
2,607✔
975
        transaction::advance(*m_notifier_transaction, info, skip_version->get_version_of_current_transaction());
32✔
976
        for (auto& notifier : notifiers)
32✔
977
            notifier->run();
32✔
978

16✔
979
        util::CheckedLockGuard lock(m_notifier_mutex);
32✔
980
        for (auto& notifier : notifiers)
32✔
981
            notifier->prepare_handover();
32✔
982
    }
32✔
983

75,671✔
984
    // Advance the non-new notifiers to the same version as we advanced the new
68,638✔
985
    // ones to (or the latest if there were no new ones)
68,638✔
986
    TransactionChangeInfo change_info;
124,868✔
987
    for (auto& notifier : notifiers) {
139,691✔
988
        notifier->add_required_change_info(change_info);
139,691✔
989
    }
139,691✔
990
    transaction::advance(*m_notifier_transaction, change_info, version);
124,868✔
991

68,638✔
992
    {
124,866✔
993
        // If there's multiple notifiers for a single collection, we only populate
68,638✔
994
        // the data for the first one during parsing and need to copy it to the
68,638✔
995
        // others. This is a reverse scan where each collection looks for the
68,638✔
996
        // first collection with the same id. It is O(N^2), but typically the
68,638✔
997
        // number of collections observed will be very small.
68,636✔
998
        auto id = [](auto const& c) {
89,340✔
999
            return std::tie(c.table_key, c.path, c.obj_key);
41,408✔
1000
        };
48,443✔
1001
        auto& collections = change_info.collections;
132,766✔
1002
        for (size_t i = collections.size(); i > 0; --i) {
164,641✔
1003
            for (size_t j = 0; j < i - 1; ++j) {
41,935✔
1004
                if (id(collections[i - 1]) == id(collections[j])) {
27,739✔
1005
                    collections[i - 1].changes->merge(CollectionChangeBuilder{*collections[j].changes});
18,544✔
1006
                    break;
25,579✔
1007
                }
18,544✔
1008
            }
20,704✔
1009
        }
31,875✔
1010
    }
124,866✔
1011

68,636✔
1012
    // Now that they're at the same version, switch the new notifiers over to
71,224✔
1013
    // the main Transaction used for background work rather than the temporary one
71,224✔
1014
    for (auto& notifier : new_notifiers) {
112,881✔
1015
        notifier->attach_to(m_notifier_transaction);
90,365✔
1016
        notifier->run();
92,358✔
1017
    }
85,458✔
1018

69,930✔
1019
    // Change info is now all ready, so the notifiers can now perform their
69,795✔
1020
    // background work
69,795✔
1021
    for (auto& notifier : notifiers) {
140,848✔
1022
        notifier->run();
140,983✔
1023
    }
141,682✔
1024

75,671✔
1025
    // Reacquire the lock while updating the fields that are actually read on
68,636✔
1026
    // other threads
68,636✔
1027
    util::CheckedLockGuard lock2(m_notifier_mutex);
124,866✔
1028
    for (auto& notifier : new_notifiers) {
115,502✔
1029
        notifier->prepare_handover();
88,539✔
1030
    }
88,539✔
1031
    for (auto& notifier : notifiers) {
144,898✔
1032
        notifier->prepare_handover();
139,689✔
1033
    }
139,689✔
1034
    clean_up_dead_notifiers();
124,866✔
1035
    if (!m_notifiers.empty())
132,766✔
1036
        m_notifier_handover_transaction = m_db->start_read(version);
132,717✔
1037
}
132,766✔
1038

1039
void RealmCoordinator::advance_to_ready(Realm& realm)
1040
{
57,827✔
1041
    // If callbacks close the Realm the last external reference may go away
30,637✔
1042
    // while we're in this function
28,811✔
1043
    auto self = shared_from_this();
63,036✔
1044
    auto tr = Realm::Internal::get_transaction_ref(realm);
63,036✔
1045
    auto current_version = tr->get_version_of_current_transaction();
65,727✔
1046

31,502✔
1047
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
65,727✔
1048

30,637✔
1049
    // Transaction which will pin the version we're packaging for deliver to,
30,637✔
1050
    // to ensure it's not cleaned up between when we release the mutex and when
30,634✔
1051
    // we actually advance (which is not done while holding a lock).
30,637✔
1052
    std::shared_ptr<Transaction> handover_version_tr;
57,827✔
1053
    {
57,827✔
1054
        util::CheckedLockGuard lock(m_notifier_mutex);
62,095✔
1055

23,602✔
1056
        // If there are any new notifiers for this Realm then by definition they
23,602✔
1057
        // haven't run yet and aren't ready
27,870✔
1058
        for (auto& notifier : m_new_notifiers) {
27,870✔
1059
            if (notifier->is_for_realm(realm))
4,268!
1060
                return;
1061
        }
4,268✔
1062

23,602✔
1063
        for (auto& notifier : m_notifiers) {
68,802✔
1064
            if (!notifier->is_for_realm(realm))
63,729✔
1065
                continue;
368✔
1066
            // If the notifier hasn't run it isn't ready and we should do nothing
38,661✔
1067
            if (!notifier->has_run())
67,629✔
1068
                return;
4,268✔
1069
            // package_for_delivery() returning false indicates that it's been
34,393✔
1070
            // unregistered but not yet cleaned up, so it effectively doesn't exist
34,393✔
1071
            if (!notifier->package_for_delivery())
63,361✔
UNCOV
1072
                continue;
✔
1073
            notifiers.push_back(notifier);
63,361!
1074
        }
63,361✔
1075

23,602✔
1076
        handover_version_tr = m_notifier_handover_transaction;
57,827✔
1077
    }
62,095✔
1078

27,236✔
1079
    if (notifiers.empty()) {
57,850✔
1080
        // If we have no notifiers for this Realm, just advance to latest
3,105✔
1081
        return transaction::advance(tr, realm.m_binding_context.get(), {});
25,872✔
1082
    }
22,261✔
1083

20,497✔
1084
    // If we have notifiers but no transaction, then they've never run before.
20,497✔
1085
    if (!handover_version_tr)
39,177✔
1086
        return;
×
1087

24,108✔
1088
    auto notifier_version = handover_version_tr->get_version_of_current_transaction();
39,177✔
1089
    // If the most recent write was performed via the Realm instance being
20,497✔
1090
    // advanced, the notifiers can be at an older version than the Realm.
24,765✔
1091
    // This means that there's no advancing to do
24,765✔
1092
    if (notifier_version < current_version)
35,566✔
1093
        return;
4,268✔
1094

20,497✔
1095
    // We can have notifications for the current version if it's the initial
22,892✔
1096
    // notification for a newly added callback or if the write was performed
22,892✔
1097
    // on this Realm instance. There might also be a newer version but we ignore
20,497✔
1098
    // it if so.
20,497✔
1099
    if (notifier_version == current_version) {
37,439✔
1100
        if (realm.m_binding_context)
117✔
1101
            realm.m_binding_context->will_send_notifications();
1102
        if (realm.is_closed())
1,990✔
1103
            return;
1104
        for (auto& notifier : notifiers)
117✔
1105
            notifier->after_advance();
117✔
1106
        if (realm.is_closed())
1,990✔
1107
            return;
×
1108
        if (realm.m_binding_context)
117✔
1109
            realm.m_binding_context->did_send_notifications();
1110
        return;
117✔
1111
    }
117✔
1112

20,423✔
1113
    // We have notifiers for a newer version, so advance to that
22,296✔
1114
    transaction::advance(tr, realm.m_binding_context.get(),
35,455✔
1115
                         _impl::NotifierPackage(std::move(notifiers), handover_version_tr));
35,449✔
1116
}
35,455✔
1117

1118
std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
6✔
1119
{
520,898✔
1120
    auto pred = [&](auto& notifier) {
322,138✔
1121
        return notifier->is_for_realm(realm);
134,797✔
1122
    };
134,803✔
1123
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
520,892✔
1124
    std::copy_if(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(ret), pred);
520,898✔
1125
    std::copy_if(m_notifiers.begin(), m_notifiers.end(), std::back_inserter(ret), pred);
520,898✔
1126
    return ret;
520,892✔
1127
}
520,892✔
1128

1,867✔
1129
bool RealmCoordinator::advance_to_latest(Realm& realm)
1,867✔
1130
{
100,673✔
1131
    // If callbacks close the Realm the last external reference may go away
49,195✔
1132
    // while we're in this function
49,195✔
1133
    auto self = shared_from_this();
129,111✔
1134
    auto tr = Realm::Internal::get_transaction_ref(realm);
107,308✔
1135

57,697✔
1136
    NotifierVector notifiers;
107,308✔
1137
    {
129,111✔
1138
        util::CheckedUniqueLock lock(m_notifier_mutex);
129,111✔
1139
        notifiers = notifiers_for_realm(realm);
129,111✔
1140
    }
129,111✔
1141
    auto pin_tr = package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());
129,111✔
1142

49,195✔
1143
    auto prev_version = tr->get_version_of_current_transaction();
98,806✔
1144
    transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers), pin_tr));
103,743✔
1145
    return !realm.is_closed() && prev_version != tr->get_version_of_current_transaction();
98,806✔
1146
}
98,806✔
1147

4,937✔
1148
void RealmCoordinator::promote_to_write(Realm& realm)
4,937✔
1149
{
422,086✔
1150
    REALM_ASSERT(!realm.is_in_transaction());
427,023✔
1151
    // If callbacks close the Realm the last external reference may go away
211,139✔
1152
    // while we're in this function
211,139✔
1153
    auto self = shared_from_this();
427,023✔
1154

211,139✔
1155
    util::CheckedUniqueLock lock(m_notifier_mutex);
427,023✔
1156
    auto notifiers = notifiers_for_realm(realm);
422,086✔
1157
    lock.unlock();
427,023✔
1158

211,139✔
1159
    transaction::begin(Realm::Internal::get_transaction_ref(realm), realm.m_binding_context.get(),
427,023✔
1160
                       {std::move(notifiers), this});
427,023✔
1161
}
422,086✔
1162

1163
void RealmCoordinator::process_available_async(Realm& realm)
25,368✔
1164
{
454,943✔
1165
    REALM_ASSERT(!realm.is_in_transaction());
429,575✔
1166
    // If callbacks close the Realm the last external reference may go away
205,788✔
1167
    // while we're in this function
231,156✔
1168
    auto self = shared_from_this();
429,575✔
1169

231,156✔
1170
    auto current_version = realm.current_transaction_version();
454,943✔
1171
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
454,943✔
1172

205,788✔
1173
    {
454,943✔
1174
        util::CheckedLockGuard lock(m_notifier_mutex);
454,943✔
1175
        // No handover transaction means there can't be anything waiting to deliver
231,156✔
1176
        if (!m_notifier_handover_transaction)
429,575✔
1177
            return;
371,233✔
1178
        // If we have a read transaction, it needs to be an exact match in version
54,990✔
1179
        // to the notifications as we're only delivering initial notifications
54,990✔
1180
        // and not advancing.
29,138✔
1181
        if (current_version &&
58,342✔
1182
            current_version != m_notifier_handover_transaction->get_version_of_current_transaction())
84,154✔
1183
            return;
314✔
1184

54,855✔
1185
        for (auto& notifier : m_notifiers) {
147,629✔
1186
            if (!notifier->is_for_realm(realm) || !notifier->has_run() || !notifier->package_for_delivery())
121,777✔
1187
                continue;
26,637✔
1188
            notifiers.push_back(notifier);
146,844✔
1189
        }
120,992✔
1190
    }
83,880✔
1191
    if (notifiers.empty())
80,225✔
1192
        return;
364✔
1193

28,789✔
1194
    if (realm.m_binding_context)
57,664✔
1195
        realm.m_binding_context->will_send_notifications();
3,687✔
1196
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
61,314✔
1197
        return;
38✔
1198
    for (auto& notifier : notifiers)
57,648✔
1199
        notifier->after_advance();
128,589✔
1200
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
65,261✔
1201
        return;
60✔
1202
    if (realm.m_binding_context)
65,201✔
1203
        realm.m_binding_context->did_send_notifications();
7,569✔
1204
}
61,265✔
1205

3,633✔
1206
TransactionRef RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::version_type target_version)
22✔
1207
{
150,838✔
1208
    auto ready = [&] {
165,787✔
1209
        util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
162,178✔
1210
        bool up_to_date =
165,787✔
1211
            m_notifier_handover_transaction &&
162,177✔
1212
            m_notifier_handover_transaction->get_version_of_current_transaction().version >= target_version;
118,519✔
1213
        return std::all_of(begin(notifiers), end(notifiers), [&](auto const& n) {
134,124✔
1214
            return !n->have_callbacks() || (n->has_run() && up_to_date);
95,850✔
1215
        });
92,241✔
1216
    };
165,785✔
1217

75,132✔
1218
    if (!ready()) {
154,447✔
1219
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
11,338✔
1220
        // The worker thread may have run the notifiers we need while we were
10,397✔
1221
        // waiting for the lock, so re-check
18,597✔
1222
        if (!ready())
19,644✔
1223
            run_async_notifiers();
9,521✔
1224
    }
19,644✔
1225

83,438✔
1226
    util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
154,569✔
1227
    // If the notifiers are still out of date, that means none of them have callbacks
80,320✔
1228
    // so we don't want to block the calling thread to run them.
80,320✔
1229
    if (!m_notifier_handover_transaction ||
156,026✔
1230
        m_notifier_handover_transaction->get_version_of_current_transaction().version < target_version) {
130,373✔
1231
        notifiers.clear();
92,566✔
1232
        return nullptr;
100,766✔
1233
    }
92,672✔
1234

29,501✔
1235
    auto package = [&](auto& notifier) {
80,832✔
1236
        return !notifier->has_run() || !notifier->package_for_delivery();
80,938✔
1237
    };
80,869✔
1238
    notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
58,378✔
1239
    return notifiers.empty() ? nullptr : m_notifier_handover_transaction;
58,176✔
1240
}
66,472✔
1241

1242
bool RealmCoordinator::compact()
1243
{
8,216✔
1244
    return m_db->compact();
4,548✔
1245
}
4,548✔
1246

4,532✔
1247
void RealmCoordinator::write_copy(StringData path, const char* key)
4,532✔
1248
{
160✔
1249
    m_db->write_copy(path, key);
5,238✔
1250
}
5,238✔
1251

5,078✔
1252
void RealmCoordinator::async_request_write_mutex(Realm& realm)
3,668✔
1253
{
7,960✔
1254
    auto tr = Realm::Internal::get_transaction_ref(realm);
7,972✔
1255
    m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
4,021✔
1256
        auto& scheduler = *realm->scheduler();
3,957✔
1257
        scheduler.invoke([realm = std::move(realm)] {
3,958✔
1258
            Realm::Internal::run_writes(*realm);
3,958✔
1259
        });
3,958✔
1260
    });
3,957✔
1261
}
4,304✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc