• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2211

09 Apr 2024 03:41PM UTC coverage: 92.633% (+0.5%) from 92.106%
2211

push

Evergreen

web-flow
Merge pull request #7300 from realm/tg/rework-metadata-storage

Rework sync user handling and metadata storage

102820 of 195548 branches covered (52.58%)

3165 of 3247 new or added lines in 46 files covered. (97.47%)

31 existing lines in 8 files now uncovered.

249584 of 269432 relevant lines covered (92.63%)

49986309.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/src/realm/object-store/impl/realm_coordinator.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2015 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/realm_coordinator.hpp>
20

21
#include <realm/object-store/impl/collection_notifier.hpp>
22
#include <realm/object-store/impl/external_commit_helper.hpp>
23
#include <realm/object-store/impl/transact_log_handler.hpp>
24
#include <realm/object-store/impl/weak_realm_notifier.hpp>
25
#include <realm/object-store/audit.hpp>
26
#include <realm/object-store/binding_context.hpp>
27
#include <realm/object-store/object_schema.hpp>
28
#include <realm/object-store/object_store.hpp>
29
#include <realm/object-store/property.hpp>
30
#include <realm/object-store/schema.hpp>
31
#include <realm/object-store/thread_safe_reference.hpp>
32
#include <realm/object-store/util/scheduler.hpp>
33

34
#if REALM_ENABLE_SYNC
35
#include <realm/object-store/sync/impl/sync_file.hpp>
36
#include <realm/object-store/sync/async_open_task.hpp>
37
#include <realm/object-store/sync/sync_manager.hpp>
38
#include <realm/object-store/sync/sync_session.hpp>
39
#include <realm/object-store/sync/sync_user.hpp>
40
#include <realm/sync/history.hpp>
41
#include <realm/sync/noinst/client_history_impl.hpp>
42
#endif
43

44
#include <realm/db.hpp>
45
#include <realm/history.hpp>
46
#include <realm/string_data.hpp>
47
#include <realm/util/fifo_helper.hpp>
48
#include <realm/sync/config.hpp>
49

50
#include <algorithm>
51
#include <unordered_map>
52

53
using namespace realm;
54
using namespace realm::_impl;
55

56
static auto& s_coordinator_mutex = *new std::mutex;
57
static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
58

59
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
52,765✔
60
{
473,820✔
61
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
446,377✔
62

259,507✔
63
    auto& weak_coordinator = s_coordinators_per_path[path];
473,820✔
64
    if (auto coordinator = weak_coordinator.lock()) {
446,999✔
65
        return coordinator;
248,662✔
66
    }
235,953✔
67

124,805✔
68
    auto coordinator = std::make_shared<RealmCoordinator>(Private());
225,158✔
69
    weak_coordinator = coordinator;
225,158✔
70
    return coordinator;
225,158✔
71
}
198,337✔
72

73
std::shared_ptr<RealmCoordinator>
74
RealmCoordinator::get_coordinator(const Realm::Config& config) NO_THREAD_SAFETY_ANALYSIS
110✔
75
{
990✔
76
    auto coordinator = get_coordinator(config.path);
990✔
77
    util::CheckedLockGuard lock(coordinator->m_realm_mutex);
990✔
78
    coordinator->set_config(config);
990✔
79
    coordinator->open_db();
990✔
80
    return coordinator;
990✔
81
}
880✔
82

83
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
14✔
84
{
126✔
85
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
126✔
86
    return s_coordinators_per_path[path].lock();
126✔
87
}
112✔
88

89
void RealmCoordinator::set_config(const Realm::Config& config)
33,229✔
90
{
283,669✔
91
    if (config.encryption_key.data() && config.encryption_key.size() != 64)
250,442✔
92
        throw InvalidEncryptionKey();
33,243✔
93
    if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
250,424✔
94
        throw InvalidArgument(ErrorCodes::IllegalCombination,
×
95
                              "Synchronized Realms cannot be opened in immutable mode");
33,227✔
96
    if ((config.schema_mode == SchemaMode::AdditiveDiscovered ||
283,606✔
97
         config.schema_mode == SchemaMode::AdditiveExplicit) &&
270,969✔
98
        config.migration_function)
159,884✔
99
        throw InvalidArgument(ErrorCodes::IllegalCombination,
36✔
100
                              "Realms opened in Additive-only schema mode do not use a migration function");
33,255✔
101
    if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
250,394✔
102
        throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
103
                              "Realms opened in immutable mode do not use a migration function");
33,237✔
104
    if (config.schema_mode == SchemaMode::ReadOnly && config.migration_function)
250,378✔
105
        throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
106
                              "Realms opened in read-only mode do not use a migration function");
33,235✔
107
    if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
250,362✔
108
        throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
109
                              "Realms opened in immutable mode do not use an initialization function");
33,233✔
110
    if (config.schema_mode == SchemaMode::ReadOnly && config.initialization_function)
250,346✔
111
        throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
112
                              "Realms opened in read-only mode do not use an initialization function");
33,231✔
113
    if (config.schema && config.schema_version == ObjectStore::NotVersioned)
250,330✔
114
        throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
115
                              "A schema version must be specified when the schema is specified");
33,229✔
116
    if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
250,314✔
117
        throw InvalidArgument(
18✔
118
            ErrorCodes::IllegalCombination,
18✔
119
            "In-memory realms initialized from memory buffers can only be opened in read-only mode");
33,227✔
120
    if (!config.realm_data.is_null() && !config.path.empty())
250,298✔
121
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Specifying both memory buffer and path is invalid");
33,225✔
122
    if (!config.realm_data.is_null() && !config.encryption_key.empty())
250,282✔
123
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Memory buffers do not support encryption");
33,223✔
124
    if (config.in_memory && !config.encryption_key.empty()) {
250,266✔
125
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Encryption is not supported for in-memory realms");
18✔
126
    }
16,071✔
127
    // ResetFile also won't use the migration function, but specifying one is
137,023✔
128
    // allowed to simplify temporarily switching modes during development
137,023✔
129

154,173✔
130
#if REALM_ENABLE_SYNC
283,453✔
131
    if (config.sync_config) {
252,021✔
132
        if (config.sync_config->flx_sync_requested && !config.sync_config->partition_value.empty()) {
14,410✔
133
            throw InvalidArgument(ErrorCodes::IllegalCombination,
18✔
134
                                  "Cannot specify a partition value when flexible sync is enabled");
18✔
135
        }
1,787✔
136
    }
250,232✔
137
#endif
250,232✔
138

120,960✔
139
    bool no_existing_realm =
283,435✔
140
        std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [](auto& notifier) {
183,015✔
141
            return notifier.expired();
67,826✔
142
        });
84,975✔
143
    if (no_existing_realm) {
269,840✔
144
        m_config = config;
204,830✔
145
        m_config.scheduler = nullptr;
204,830✔
146
    }
231,667✔
147
    else {
78,606✔
148
        if (m_config.immutable() != config.immutable()) {
78,606✔
149
            throw LogicError(
26,838✔
150
                ErrorCodes::MismatchedConfig,
6,365✔
151
                util::format("Realm at path '%1' already opened with different read permissions.", config.path));
6,365✔
152
        }
×
153
        if (m_config.in_memory != config.in_memory) {
51,768✔
154
            throw LogicError(
16✔
155
                ErrorCodes::MismatchedConfig,
16✔
156
                util::format("Realm at path '%1' already opened with different inMemory settings.", config.path));
6,381✔
157
        }
18✔
158
        if (m_config.encryption_key != config.encryption_key) {
51,754✔
159
            throw LogicError(
2✔
160
                ErrorCodes::MismatchedConfig,
2✔
161
                util::format("Realm at path '%1' already opened with a different encryption key.", config.path));
6,363✔
162
        }
×
163
        if (m_config.schema_mode != config.schema_mode) {
51,752✔
164
            throw LogicError(
16✔
165
                ErrorCodes::MismatchedConfig,
16✔
166
                util::format("Realm at path '%1' already opened with a different schema mode.", config.path));
6,379✔
167
        }
18✔
168
        util::CheckedLockGuard lock(m_schema_cache_mutex);
51,738✔
169
        if (config.schema && m_schema_version != ObjectStore::NotVersioned &&
51,738✔
170
            m_schema_version != config.schema_version) {
48,914✔
171
            throw LogicError(
6,377✔
172
                ErrorCodes::MismatchedConfig,
6,377✔
173
                util::format("Realm at path '%1' already opened with different schema version.", config.path));
6,024✔
174
        }
18✔
175

22,898✔
176
#if REALM_ENABLE_SYNC
51,722✔
177
        if (bool(m_config.sync_config) != bool(config.sync_config)) {
51,722✔
178
            throw LogicError(
2,809✔
179
                ErrorCodes::MismatchedConfig,
6,359✔
180
                util::format("Realm at path '%1' already opened with different sync configurations.", config.path));
6,359✔
181
        }
×
182

22,896✔
183
        if (config.sync_config) {
51,720✔
184
            auto old_user = m_config.sync_config->user;
2,448✔
185
            auto new_user = config.sync_config->user;
5,257✔
186
            if (old_user != new_user) {
8,807✔
187
                throw LogicError(
306✔
188
                    ErrorCodes::MismatchedConfig,
306✔
189
                    util::format("Realm at path '%1' already opened with different sync user.", config.path));
306✔
190
            }
×
191

1,208✔
192
            if (m_config.sync_config->partition_value != config.sync_config->partition_value) {
2,448✔
193
                throw LogicError(
×
194
                    ErrorCodes::MismatchedConfig,
306✔
UNCOV
195
                    util::format("Realm at path '%1' already opened with different partition value.", config.path));
×
196
            }
×
197
            if (m_config.sync_config->flx_sync_requested != config.sync_config->flx_sync_requested) {
2,448✔
198
                throw LogicError(ErrorCodes::MismatchedConfig,
×
199
                                 util::format("Realm at path '%1' already opened in a different synchronization mode",
306✔
200
                                              config.path));
×
201
            }
×
202
        }
51,720✔
203
#endif
51,720✔
204
        // Mixing cached and uncached Realms is allowed
29,255✔
205
        m_config.cache = config.cache;
58,079✔
206

25,705✔
207
        // Realm::update_schema() handles complaining about schema mismatches
29,255✔
208
    }
54,529✔
209
}
253,041✔
210

6,359✔
211
std::shared_ptr<Realm> RealmCoordinator::get_cached_realm(Realm::Config const& config,
33,203✔
212
                                                          std::shared_ptr<util::Scheduler> scheduler)
213
{
1,536✔
214
    if (!config.cache)
1,536✔
215
        return nullptr;
1,664✔
216
    util::CheckedUniqueLock lock(m_realm_mutex);
256✔
217
    return do_get_cached_realm(config, scheduler);
248✔
218
}
72✔
219

8✔
220
std::shared_ptr<Realm> RealmCoordinator::do_get_cached_realm(Realm::Config const& config,
8✔
221
                                                             std::shared_ptr<util::Scheduler> scheduler)
222
{
260,984✔
223
    if (!config.cache)
260,984✔
224
        return nullptr;
289,015✔
225

34,783✔
226
    if (!scheduler) {
40,241✔
227
        scheduler = config.scheduler;
6,414✔
228
    }
7,198✔
229

1,038✔
230
    if (!scheduler)
7,310✔
231
        return nullptr;
30✔
232

1,054✔
233
    for (auto& cached_realm : m_weak_realm_notifiers) {
33,903✔
234
        if (!cached_realm.is_cached_for_scheduler(scheduler))
33,917✔
235
            continue;
33,127✔
236
        // can be null if we jumped in between ref count hitting zero and
4,310✔
237
        // unregister_realm() getting the lock
3,686✔
238
        if (auto realm = cached_realm.realm()) {
5,002✔
239
            // If the file is uninitialized and was opened without a schema,
90✔
240
            // do the normal schema init
704✔
241
            if (realm->schema_version() == ObjectStore::NotVersioned)
5,002✔
242
                break;
42✔
243

688✔
244
            // Otherwise if we have a realm schema it needs to be an exact
68✔
245
            // match (even having the same properties but in different
72✔
246
            // orders isn't good enough)
72✔
247
            if (config.schema && realm->schema() != *config.schema)
4,968✔
248
                throw LogicError(
8✔
249
                    ErrorCodes::MismatchedConfig,
620✔
250
                    util::format("Realm at path '%1' already opened on current thread with different schema.",
×
251
                                 config.path));
×
252

64✔
253
            return realm;
4,960✔
254
        }
4,968✔
255
    }
5,612✔
256
    return nullptr;
2,236✔
257
}
7,136✔
258

202✔
259
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config, util::Optional<VersionID> version)
814✔
260
{
248,311✔
261
    REALM_ASSERT(!version || *version != VersionID());
248,311✔
262
    if (!config.scheduler)
281,274✔
263
        config.scheduler = version ? util::Scheduler::make_frozen(*version) : util::Scheduler::make_default();
222,741✔
264
    // realm must be declared before lock so that the mutex is released before
152,963✔
265
    // we release the strong reference to realm, as Realm's destructor may want
143,626✔
266
    // to acquire the same lock
135,934✔
267
    std::shared_ptr<Realm> realm;
264,245✔
268
    util::CheckedUniqueLock lock(m_realm_mutex);
264,245✔
269
    set_config(config);
281,274✔
270
    if ((realm = do_get_cached_realm(config))) {
281,274✔
271
        REALM_ASSERT(!version || realm->read_transaction_version() == *version);
37,891✔
272
        return realm;
37,891✔
273
    }
5,544✔
274
    do_get_realm(std::move(config), realm, version, lock);
243,999✔
275
    if (version) {
243,999✔
276
        realm->read_group();
35,163✔
277
    }
35,163✔
278
    return realm;
243,735✔
279
}
243,735✔
280

32,347✔
281
std::shared_ptr<Realm> RealmCoordinator::get_realm(std::shared_ptr<util::Scheduler> scheduler, bool first_time_open)
32,347✔
282
{
7,760✔
283
    std::shared_ptr<Realm> realm;
7,760✔
284
    util::CheckedUniqueLock lock(m_realm_mutex);
8,730✔
285
    auto config = m_config;
8,730✔
286
    config.scheduler = scheduler ? scheduler : util::Scheduler::make_default();
8,458✔
287
    if ((realm = do_get_cached_realm(config))) {
8,730✔
288
        return realm;
936✔
289
    }
970✔
290
    do_get_realm(std::move(config), realm, none, lock, first_time_open);
7,760✔
291
    return realm;
7,760✔
292
}
8,730✔
293

970✔
294
std::shared_ptr<Realm> RealmCoordinator::freeze_realm(const Realm& source_realm)
970✔
295
{
5,104✔
296
    std::shared_ptr<Realm> realm;
5,104✔
297
    util::CheckedUniqueLock lock(m_realm_mutex);
5,738✔
298

3,178✔
299
    auto version = source_realm.read_transaction_version();
5,738✔
300
    auto scheduler = util::Scheduler::make_frozen(version);
5,419✔
301
    if ((realm = do_get_cached_realm(source_realm.config(), scheduler))) {
5,738✔
302
        return realm;
650✔
303
    }
650✔
304

2,538✔
305
    auto config = source_realm.config();
5,090✔
306
    config.scheduler = scheduler;
5,402✔
307
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
5,720✔
308
    Realm::Internal::copy_schema(*realm, source_realm);
5,720✔
309
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
5,720✔
310
    return realm;
5,720✔
311
}
5,720✔
312

632✔
313
ThreadSafeReference RealmCoordinator::get_unbound_realm()
632✔
314
{
1,888✔
315
    std::shared_ptr<Realm> realm;
1,888✔
316
    util::CheckedUniqueLock lock(m_realm_mutex);
2,124✔
317
    do_get_realm(RealmConfig(m_config), realm, none, lock);
2,124✔
318
    return ThreadSafeReference(realm);
2,124✔
319
}
2,124✔
320

236✔
321
void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>& realm,
236✔
322
                                    util::Optional<VersionID> version, util::CheckedUniqueLock& realm_lock,
323
                                    bool first_time_open)
324
{
252,776✔
325
    const auto db_created = open_db();
252,776✔
326
#ifdef REALM_ENABLE_SYNC
286,297✔
327
    SyncConfig::SubscriptionInitializerCallback subscription_function = nullptr;
286,297✔
328
    bool rerun_on_open = false;
286,297✔
329
    if (config.sync_config && config.sync_config->flx_sync_requested &&
286,297✔
330
        config.sync_config->subscription_initializer) {
160,801✔
331
        subscription_function = config.sync_config->subscription_initializer;
34,673✔
332
        rerun_on_open = config.sync_config->rerun_init_subscription_on_open;
17,996✔
333
    }
1,296✔
334
#else
144✔
335
    static_cast<void>(first_time_open);
144✔
336
    static_cast<void>(db_created);
337
#endif
338

124,648✔
339
    auto schema = std::move(config.schema);
252,776✔
340
    auto migration_function = std::move(config.migration_function);
269,291✔
341
    auto initialization_function = std::move(config.initialization_function);
286,297✔
342
    config.schema = {};
286,297✔
343

158,169✔
344
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
286,297✔
345
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
269,291✔
346

158,169✔
347
#ifdef REALM_ENABLE_SYNC
286,297✔
348
    if (m_sync_session && m_sync_session->user()->is_logged_in())
269,291✔
349
        m_sync_session->revive_if_needed();
48,017✔
350

158,169✔
351
    if (realm->config().audit_config) {
254,560✔
352
        if (m_audit_context)
17,139✔
353
            m_audit_context->update_metadata(realm->config().audit_config->metadata);
33,553✔
354
        else
670✔
355
            m_audit_context = make_audit_context(m_db, realm->config());
596✔
356
    }
698✔
357
#else
74✔
358
    if (realm->config().audit_config)
78✔
359
        REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
360
#endif
361

124,648✔
362
    // Cached frozen Realms need to initialize their schema before releasing
124,648✔
363
    // the lock as otherwise they could be read from the cache on another thread
141,163✔
364
    // before the schema initialization happens. They'll never perform a write
141,163✔
365
    // transaction, so unlike with live Realms this is safe to do.
141,163✔
366
    if (config.cache && version && schema) {
269,291!
367
        realm->update_schema(std::move(*schema));
16,515✔
368
        schema.reset();
33,521!
369
    }
×
370

124,648✔
371
    realm_lock.unlock_unchecked();
252,776✔
372
    if (schema) {
269,291✔
373
        realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
235,241✔
374
                             std::move(initialization_function));
235,241✔
375
    }
228,927✔
376

151,855✔
377
#ifdef REALM_ENABLE_SYNC
279,983✔
378
    // run subscription initializer if the SDK has instructed core to do so. The subscription callback will be run if:
141,163✔
379
    // 1. this is the first time we are creating the realm file
158,169✔
380
    // 2. the database was already created, but this is the first time we are opening the db and the flag
141,163✔
381
    // rerun_on_open was set
141,163✔
382
    if (subscription_function) {
269,291✔
383
        const auto current_subscription = realm->get_latest_subscription_set();
17,635✔
384
        const auto subscription_version = current_subscription.version();
34,641✔
385
        // in case we are hitting this check while during a normal open, we need to take in
700✔
386
        // consideration if the db was created during this call. Since this may be the first time
700✔
387
        // we are actually creating a realm. For async open this does not apply, infact db_created
630✔
388
        // will always be false.
630✔
389
        if (!first_time_open)
1,190✔
390
            first_time_open = db_created;
878✔
391
        if (subscription_version == 0 || (first_time_open && rerun_on_open)) {
1,260✔
392
            bool was_in_read = realm->is_in_read_transaction();
693✔
393
            subscription_function(realm);
732✔
394
            if (!was_in_read)
666✔
395
                realm->invalidate();
666✔
396
        }
666✔
397
    }
1,194✔
398
#endif
252,850✔
399
}
252,916✔
400

33,521✔
401
void RealmCoordinator::bind_to_context(Realm& realm)
33,521✔
402
{
1,520✔
403
    util::CheckedLockGuard lock(m_realm_mutex);
1,520✔
404
    for (auto& cached_realm : m_weak_realm_notifiers) {
2,270✔
405
        if (!cached_realm.is_for_realm(&realm))
2,270✔
406
            continue;
820✔
407
        cached_realm.bind_to_scheduler();
1,780✔
408
        return;
1,590✔
409
    }
1,710✔
410
    REALM_TERMINATE("Invalid Realm passed to bind_to_context()");
190✔
411
}
190✔
412

413
#if REALM_ENABLE_SYNC
414
std::shared_ptr<AsyncOpenTask> RealmCoordinator::get_synchronized_realm(Realm::Config config)
415
{
1,248✔
416
    if (!config.sync_config)
1,248✔
417
        throw LogicError(ErrorCodes::IllegalOperation,
156✔
418
                         "This method is only available for fully synchronized Realms.");
156✔
419

624✔
420
    util::CheckedLockGuard lock(m_realm_mutex);
1,248✔
421
    set_config(config);
1,326✔
422
    const auto db_open_first_time = open_db();
1,404✔
423
    return std::make_shared<AsyncOpenTask>(AsyncOpenTask::Private(), shared_from_this(), m_sync_session,
1,404✔
424
                                           db_open_first_time);
1,404✔
425
}
1,404✔
426

156✔
427
#endif
156✔
428

429
bool RealmCoordinator::open_db()
430
{
255,064✔
431
    if (m_db)
255,064✔
432
        return false;
90,676✔
433

131,719✔
434
#if REALM_ENABLE_SYNC
205,198✔
435
    if (m_config.sync_config) {
211,421✔
436
        // If we previously opened this Realm, we may have a lingering sync
31,932✔
437
        // session which outlived its RealmCoordinator. If that happens we
31,932✔
438
        // want to reuse it instead of creating a new DB.
6,539✔
439
        m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
12,142✔
440
        if (m_sync_session) {
12,142✔
441
            m_db = SyncSession::Internal::get_db(*m_sync_session);
835✔
442
            init_external_helpers();
1,619✔
443
            return false;
1,614✔
444
        }
1,614✔
445
    }
199,398✔
446
#endif
198,013✔
447

97,837✔
448
    bool server_synchronization_mode = m_config.sync_config || m_config.force_sync_history;
198,013✔
449
    bool schema_mode_reset_file =
198,013✔
450
        m_config.schema_mode == SchemaMode::SoftResetFile || m_config.schema_mode == SchemaMode::HardResetFile;
224,765✔
451
    try {
224,765✔
452
        if (m_config.immutable() && m_config.realm_data) {
211,200✔
453
            m_db = DB::create(m_config.realm_data, false);
26,794✔
454
            return true;
26,794✔
455
        }
26,794✔
456
        std::unique_ptr<Replication> history;
224,749✔
457
        if (server_synchronization_mode) {
224,749✔
458
#if REALM_ENABLE_SYNC
68,181✔
459
            bool apply_server_changes = !m_config.sync_config || m_config.sync_config->apply_server_changes;
68,181✔
460
            history = std::make_unique<sync::ClientReplication>(apply_server_changes);
68,181✔
461
#else
26,776✔
462
            REALM_TERMINATE("Realm was not built with sync enabled");
26,776✔
463
#endif
8,499✔
464
        }
76,678✔
465
        else if (!m_config.immutable()) {
138,291✔
466
            history = make_in_realm_history();
129,272✔
467
        }
129,272✔
468

97,803✔
469
        DBOptions options;
206,470✔
470
#ifndef __EMSCRIPTEN__
216,248✔
471
        options.enable_async_writes = true;
216,188✔
472
#endif
216,188✔
473
        options.durability = m_config.in_memory ? DBOptions::Durability::MemOnly : DBOptions::Durability::Full;
174,279✔
474
        options.is_immutable = m_config.immutable();
224,747✔
475
        options.logger = util::Logger::get_default_logger();
224,747✔
476

124,579✔
477
        if (!m_config.fifo_files_fallback_path.empty()) {
224,747✔
478
            options.temp_dir = util::normalize_dir(m_config.fifo_files_fallback_path);
21,152✔
479
        }
26,808✔
480
        options.encryption_key = m_config.encryption_key.data();
224,747✔
481
        options.allow_file_format_upgrade = !m_config.disable_format_upgrade && !schema_mode_reset_file;
211,183✔
482
        options.clear_on_invalid_file = m_config.clear_on_invalid_file;
224,747✔
483
        if (history) {
197,975✔
484
            options.backup_at_file_format_change = m_config.backup_at_file_format_change;
197,455✔
485
#ifdef __EMSCRIPTEN__
26,776✔
486
            // Force the DB to be created in memory-only mode, ignoring the filesystem path supplied in the config.
26,776✔
487
            // This is so we can run an SDK on top without having to solve the browser persistence problem yet,
26,776✔
488
            // or teach RealmConfig and SDKs about pure in-memory realms.
26,776✔
489
            m_db = DB::create_in_memory(std::move(history), m_config.path, options);
26,716✔
490
#else
491
            if (m_config.path.size()) {
197,451✔
492
                m_db = DB::create(std::move(history), m_config.path, options);
129,691✔
493
            }
129,691✔
494
            else {
67,760✔
495
                m_db = DB::create(std::move(history), options);
67,760✔
496
            }
94,476✔
497
#endif
215,697✔
498
        }
215,697✔
499
        else {
8,990✔
500
            options.no_create = true;
8,990✔
501
            m_db = DB::create(m_config.path, options);
8,990✔
502
        }
27,236✔
503
    }
224,687✔
504
    catch (realm::FileFormatUpgradeRequired const&) {
97,871✔
505
        if (!schema_mode_reset_file) {
60!
506
            throw;
60✔
507
        }
60✔
508
        util::File::remove(m_config.path);
26,776✔
509
        return open_db();
13,213✔
510
    }
×
511
    catch (UnsupportedFileFormatVersion const&) {
×
512
        if (!schema_mode_reset_file) {
×
513
            throw;
×
514
        }
×
515
        util::File::remove(m_config.path);
×
516
        return open_db();
×
517
    }
×
518

97,763✔
519
    if (m_config.should_compact_on_launch_function) {
197,883✔
520
        size_t free_space = 0;
128✔
521
        size_t used_space = 0;
128✔
522
        if (auto tr = m_db->start_write(true)) {
128✔
523
            tr->commit();
13,334✔
524
            m_db->get_stats(free_space, used_space);
26,892✔
525
        }
144✔
526
        if (free_space > 0 && m_config.should_compact_on_launch_function(free_space + used_space, used_space))
144✔
527
            m_db->compact();
80✔
528
    }
144✔
529

97,779✔
530
    init_external_helpers();
197,899✔
531
    return true;
197,899✔
532
}
197,891✔
533

16✔
534
void RealmCoordinator::init_external_helpers()
13,206✔
535
{
224,855✔
536
    // There's a circular dependency between SyncSession and ExternalCommitHelper
124,628✔
537
    // where sync commits notify ECH and other commits notify sync via ECH. This
124,628✔
538
    // happens on background threads, so to avoid needing locking on every access
97,864✔
539
    // we have to wire things up in a specific order.
97,864✔
540
#if REALM_ENABLE_SYNC
224,881✔
541
    // We may have reused an existing sync session that outlived its original
111,083✔
542
    // RealmCoordinator. If not, we need to create a new one now.
111,083✔
543
    if (m_config.sync_config && !m_sync_session)
211,310✔
544
        m_sync_session = m_config.sync_config->user->sync_manager()->get_session(m_db, m_config);
24,486✔
545
#endif
224,881✔
546

111,083✔
547
    if (!m_notifier && !m_config.immutable() && m_config.automatic_change_notifications) {
211,310✔
548
        try {
76,491✔
549
            m_notifier = std::make_unique<ExternalCommitHelper>(*this, m_config);
51,086✔
550
        }
49,706✔
551
        catch (std::system_error const& ex) {
24,782✔
552
            throw FileAccessError(ErrorCodes::FileOperationFailed,
5✔
553
                                  util::format("Failed to create ExternalCommitHelper: %1", ex.what()), get_path(),
5✔
554
                                  ex.code().value());
5✔
555
        }
1,380✔
556
    }
199,455✔
557
    m_db->add_commit_listener(this);
199,455✔
558
}
199,455✔
559

26,790✔
560
void RealmCoordinator::close()
13,219✔
561
{
26,947✔
562
    m_db->close();
6,248✔
563
    m_db = nullptr;
6,248✔
564
}
6,248✔
565

3,035✔
566
void RealmCoordinator::delete_and_reopen()
567
{
160✔
568
    util::CheckedLockGuard lock(m_realm_mutex);
160✔
569
    close();
160✔
570
    util::File::remove(m_config.path);
26,943✔
571
    open_db();
26,943✔
572
}
26,943✔
573

574
TransactionRef RealmCoordinator::begin_read(VersionID version, bool frozen_transaction)
575
{
682,605✔
576
    REALM_ASSERT(m_db);
682,605✔
577
    return frozen_transaction ? m_db->start_frozen(version) : m_db->start_read(version);
679,613✔
578
}
682,605✔
579

580
uint64_t RealmCoordinator::get_schema_version() const noexcept
581
{
20✔
582
    util::CheckedLockGuard lock(m_schema_cache_mutex);
20✔
583
    return m_schema_version;
20✔
584
}
20✔
585

20✔
586
bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
20✔
587
                                         uint64_t& transaction) const noexcept
588
{
407,656✔
589
    util::CheckedLockGuard lock(m_schema_cache_mutex);
499,016✔
590
    if (!m_cached_schema)
499,016✔
591
        return false;
438,174✔
592
    schema = *m_cached_schema;
151,829✔
593
    schema_version = m_schema_version;
60,469✔
594
    transaction = m_schema_transaction_version_max;
60,469✔
595
    return true;
60,469✔
596
}
60,469✔
597

598
void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
599
                                    uint64_t transaction_version)
600
{
390,917✔
601
    util::CheckedLockGuard lock(m_schema_cache_mutex);
390,917✔
602
    if (transaction_version < m_schema_transaction_version_max)
443,626✔
603
        return;
55,045✔
604
    if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
441,290✔
605
        return;
197,580✔
606

124,379✔
607
    m_cached_schema = new_schema;
243,710✔
608
    m_schema_version = new_schema_version;
243,710✔
609
    m_schema_transaction_version_min = transaction_version;
243,710✔
610
    m_schema_transaction_version_max = transaction_version;
243,710✔
611
}
236,257✔
612

613
void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
614
{
203,506✔
615
    util::CheckedLockGuard lock(m_schema_cache_mutex);
203,506✔
616
    m_cached_schema = util::none;
203,506✔
617
    m_schema_version = new_schema_version;
153,084✔
618
}
203,214✔
619

18,781✔
620
void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
15,643✔
621
{
1,086,868✔
622
    util::CheckedLockGuard lock(m_schema_cache_mutex);
1,086,868✔
623
    if (!m_cached_schema)
1,086,868✔
624
        return;
181,757✔
625
    REALM_ASSERT(previous <= m_schema_transaction_version_max);
936,752✔
626
    if (next < m_schema_transaction_version_min)
905,111✔
627
        return;
16✔
628
    m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
924,028✔
629
    m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
924,028✔
630
}
924,028✔
631

18,933✔
632
RealmCoordinator::RealmCoordinator(Private) {}
217,272✔
633

634
RealmCoordinator::~RealmCoordinator()
635
{
321,035✔
636
    {
321,035✔
637
        std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
321,035✔
638
        for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end();) {
449,966✔
639
            if (it->second.expired()) {
337,123✔
640
                it = s_coordinators_per_path.erase(it);
302,082✔
641
            }
197,988✔
642
            else {
139,135✔
643
                ++it;
139,135✔
644
            }
139,135✔
645
        }
233,027✔
646
    }
225,161✔
647

97,984✔
648
    if (m_db) {
198,339✔
649
        m_db->remove_commit_listener(this);
224,769✔
650
    }
224,769✔
651

124,806✔
652
    // Waits for the worker thread to join
156,105✔
653
    m_notifier.reset();
229,638✔
654

124,761✔
655
    // If there's any active NotificationTokens they'll keep the notifiers alive,
124,761✔
656
    // so tell the notifiers to release their Transactions so that the DB can
102,506✔
657
    // be closed immediately.
102,506✔
658
    // No locking needed here because the worker thread is gone
102,506✔
659
    for (auto& notifier : m_new_notifiers)
229,638✔
660
        notifier->release_data();
27,254✔
661
    for (auto& notifier : m_notifiers)
211,574✔
662
        notifier->release_data();
27,463✔
663
}
225,111✔
664

26,772✔
665
void RealmCoordinator::unregister_realm(Realm* realm)
13,235✔
666
{
272,675✔
667
    util::CheckedLockGuard lock(m_realm_mutex);
286,262✔
668
    // Normally results notifiers are cleaned up by the background worker thread
141,211✔
669
    // but if that's disabled we need to ensure that any notifiers from this
141,211✔
670
    // Realm get cleaned up
141,211✔
671
    if (!m_config.automatic_change_notifications) {
272,675✔
672
        util::CheckedLockGuard lock(m_notifier_mutex);
208,041✔
673
        clean_up_dead_notifiers();
221,628✔
674
    }
194,860✔
675
    {
286,262✔
676
        auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [=](auto& notifier) {
371,322✔
677
            return notifier.expired() || notifier.is_for_realm(realm);
398,062✔
678
        });
371,240✔
679
        m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
259,440✔
680
    }
293,784✔
681
}
293,784✔
682

16,925✔
683
// Thread-safety analysis doesn't reasonably handle calling functions on different
16,925✔
684
// instances of this type
16,925✔
685
void RealmCoordinator::clear_cache() NO_THREAD_SAFETY_ANALYSIS
34,344✔
686
{
26,773✔
687
    std::vector<std::shared_ptr<RealmCoordinator>> coordinators;
26,773✔
688
    {
26,773✔
689
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
34,728✔
690
        for (auto& weak_coordinator : s_coordinators_per_path) {
48,332✔
691
            if (auto coordinator = weak_coordinator.second.lock()) {
48,332✔
692
                coordinators.push_back(coordinator);
48,332✔
693
            }
34,728✔
694
        }
34,728✔
695
        s_coordinators_per_path.clear();
34,728✔
696
    }
384✔
697

192✔
698
    for (auto& coordinator : coordinators) {
384✔
699
        coordinator->m_notifier = nullptr;
384✔
700

240✔
701
        std::vector<std::shared_ptr<Realm>> realms_to_close;
432✔
702
        {
432✔
703
            // Gather a list of all of the realms which will be removed
240✔
704
            util::CheckedLockGuard lock(coordinator->m_realm_mutex);
432✔
705
            for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
432✔
706
                if (auto realm = weak_realm_notifier.realm()) {
432✔
707
                    realms_to_close.push_back(realm);
432✔
708
                }
432✔
709
            }
432✔
710
        }
432✔
711

216✔
712
        // Close all of the previously cached Realms. This can't be done while
240✔
713
        // locks are held as it may try to re-lock them.
240✔
714
        for (auto& realm : realms_to_close)
408✔
715
            realm->close();
432✔
716
    }
432✔
717
}
408✔
718

48✔
719
void RealmCoordinator::clear_all_caches()
48✔
720
{
896✔
721
    std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
896✔
722
    {
896✔
723
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
896✔
724
        for (auto iter : s_coordinators_per_path) {
664✔
725
            to_clear.push_back(iter.second);
408✔
726
        }
408✔
727
    }
872✔
728
    for (auto weak_coordinator : to_clear) {
664✔
729
        if (auto coordinator = weak_coordinator.lock()) {
432✔
730
            coordinator->clear_cache();
432✔
731
        }
432✔
732
    }
384✔
733
}
848✔
734

106✔
735
void RealmCoordinator::assert_no_open_realms() noexcept
106✔
736
{
8,226✔
737
#ifdef REALM_DEBUG
8,226✔
738
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
8,197✔
739
    REALM_ASSERT(s_coordinators_per_path.empty());
8,168✔
740
#endif
8,168✔
741
}
8,226✔
742

77✔
743
void RealmCoordinator::wake_up_notifier_worker()
48✔
744
{
85,586✔
745
    if (m_notifier) {
85,586✔
746
        // FIXME: this wakes up the notification workers for all processes and
529✔
747
        // not just us. This might be worth optimizing in the future.
587✔
748
        m_notifier->notify_others();
946✔
749
    }
946✔
750
}
86,553✔
751

1,015✔
752
void RealmCoordinator::commit_write(Realm& realm, bool commit_to_disk)
1,015✔
753
{
527,553✔
754
    REALM_ASSERT(!m_config.immutable());
527,553✔
755
    REALM_ASSERT(realm.is_in_transaction());
527,553✔
756

257,865✔
757
    Transaction& tr = Realm::Internal::get_transaction(realm);
526,538✔
758
    VersionID new_version;
537,236✔
759
    {
537,236✔
760
        // Need to acquire this lock before committing or another process could
257,929✔
761
        // perform a write and notify us before we get the chance to set the
257,929✔
762
        // skip version
257,989✔
763
        util::CheckedLockGuard l(m_notifier_mutex);
526,662✔
764
        new_version = tr.commit_and_continue_as_read(commit_to_disk);
537,236✔
765

257,865✔
766
        // The skip version must always be the notifier transaction's current
257,865✔
767
        // version plus one, as we can only skip a prefix and not intermediate
320,325✔
768
        // transactions. If we have a notifier for the current Realm, then we
320,325✔
769
        // waited until it finished running in begin_transaction() and this
320,325✔
770
        // invariant holds. If we don't have any notifiers then we don't need
288,440✔
771
        // to set the skip version, but more importantly *can't* because we
320,325✔
772
        // didn't block when starting the write and the notifier transaction
320,325✔
773
        // may still be on an older version.
320,325✔
774
        //
288,440✔
775
        // Note that this relies on the fact that callbacks cannot be added from
288,440✔
776
        // within write transactions. If they could be, we could hit this point
288,440✔
777
        // with an implicit-created notifier which ran (and so is in m_notifiers
320,325✔
778
        // and not m_new_notifiers) but didn't have a callback at the start of
320,325✔
779
        // the write so we didn't block for it then, but does now have a callback.
288,440✔
780
        // If we add support for that, we'll need to update this logic.
288,440✔
781
        bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(), [&](auto&& notifier) {
331,153✔
782
            return notifier->is_for_realm(realm) && notifier->have_callbacks();
116,399✔
783
        });
116,399✔
784
        if (have_notifiers) {
557,113✔
785
            REALM_ASSERT(!m_notifier_skip_version);
49,825✔
786
            REALM_ASSERT(m_notifier_transaction);
49,825✔
787
            REALM_ASSERT_3(m_notifier_transaction->get_transact_stage(), ==, DB::transact_Reading);
49,825✔
788
            REALM_ASSERT_3(m_notifier_transaction->get_version() + 1, ==, new_version.version);
49,825✔
789
            m_notifier_skip_version = tr.duplicate();
49,825✔
790
        }
49,825✔
791
    }
557,113✔
792

288,440✔
793
    if (realm.m_binding_context) {
557,113✔
794
        realm.m_binding_context->did_change({}, {});
30,783✔
795
    }
36,080✔
796
    // note: no longer safe to access `realm` or `this` after this point as
268,668✔
797
    // did_change() may have closed the Realm.
268,668✔
798
}
588,998✔
799

2,408✔
800
void RealmCoordinator::enable_wait_for_change()
2,408✔
801
{
2,408✔
802
    m_db->enable_wait_for_change();
2,408✔
803
}
2,408✔
804

2,408✔
805
bool RealmCoordinator::wait_for_change(std::shared_ptr<Transaction> tr)
62,460✔
806
{
30,575✔
807
    return m_db->wait_for_change(tr);
62,460✔
808
}
26✔
809

26✔
810
void RealmCoordinator::wait_for_change_release()
30,575✔
811
{
30,575✔
812
    m_db->wait_for_change_release();
62,460✔
813
}
814

815
bool RealmCoordinator::can_advance(Realm& realm)
816
{
201,589✔
817
    return realm.last_seen_transaction_version() != m_db->get_version_of_latest_snapshot();
201,589✔
818
}
201,589✔
819

820
// Thread-safety analysis doesn't reasonably handle calling functions on different
821
// instances of this type
822
void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier) NO_THREAD_SAFETY_ANALYSIS
823
{
83,906✔
824
    auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
83,906✔
825
    {
83,906✔
826
        util::CheckedLockGuard lock(self.m_notifier_mutex);
83,906✔
827
        notifier->set_initial_transaction(self.m_new_notifiers);
83,906✔
828
        self.m_new_notifiers.push_back(std::move(notifier));
83,906✔
829
    }
83,906✔
830
}
109,121✔
831

25,215✔
832
void RealmCoordinator::clean_up_dead_notifiers()
25,215✔
833
{
693,875✔
834
    auto swap_remove = [&](auto& container) {
1,387,750✔
835
        bool did_remove = false;
1,387,750✔
836
        for (size_t i = 0; i < container.size(); ++i) {
1,950,600✔
837
            if (container[i]->is_alive())
573,344✔
838
                continue;
490,511✔
839

51,918✔
840
            // Ensure the notifier is destroyed here even if there's lingering refs
51,918✔
841
            // to the async notifier elsewhere
51,918✔
842
            container[i]->release_data();
93,327✔
843

51,918✔
844
            if (container.size() > i + 1)
93,327✔
845
                container[i] = std::move(container.back());
42,528✔
846
            container.pop_back();
82,833✔
847
            --i;
169,205✔
848
            did_remove = true;
255,577✔
849
        }
255,577✔
850
        return did_remove;
1,628,722✔
851
    };
1,455,978✔
852

414,168✔
853
    if (swap_remove(m_notifiers) && m_notifiers.empty()) {
699,056✔
854
        m_notifier_transaction = nullptr;
45,169✔
855
        m_notifier_handover_transaction = nullptr;
45,169✔
856
        m_notifier_skip_version.reset();
50,346✔
857
    }
45,169✔
858
    swap_remove(m_new_notifiers);
704,233✔
859
}
699,196✔
860

10,358✔
861
void RealmCoordinator::on_commit(DB::version_type)
10,358✔
862
{
690,225✔
863
    if (m_notifier) {
690,225✔
864
        m_notifier->notify_others();
401,237✔
865
    }
401,237✔
866
}
723,413✔
867

86,372✔
868
void RealmCoordinator::on_change()
4,995✔
869
{
377,983✔
870
#if REALM_ENABLE_SYNC
377,983✔
871
    // Invoke realm sync if another process has notified for a change
195,442✔
872
    if (m_sync_session) {
459,360✔
873
        auto version = m_db->get_version_of_latest_snapshot();
160,237✔
874
        SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
73,865✔
875
    }
73,865✔
876
#endif
454,501✔
877

271,960✔
878
    {
401,412✔
879
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
401,412✔
880
        run_async_notifiers();
454,501✔
881
    }
372,988✔
882

190,447✔
883
    util::CheckedLockGuard lock(m_realm_mutex);
417,387✔
884
    for (auto& realm : m_weak_realm_notifiers) {
573,006✔
885
        realm.notify();
550,619✔
886
    }
573,006✔
887
}
382,070✔
888

9,082✔
889
void RealmCoordinator::run_async_notifiers()
9,082✔
890
{
418,602✔
891
    util::CheckedUniqueLock lock(m_notifier_mutex);
396,215✔
892

235,782✔
893
    clean_up_dead_notifiers();
418,602✔
894

235,782✔
895
    if (m_notifiers.empty() && m_new_notifiers.empty()) {
418,602✔
896
        REALM_ASSERT(!m_notifier_skip_version);
246,060✔
897
        return;
268,447✔
898
    }
287,439✔
899

144,946✔
900
    if (!m_notifier_transaction) {
213,546✔
901
        REALM_ASSERT(m_notifiers.empty());
84,980✔
902
        REALM_ASSERT(!m_notifier_skip_version);
40,581✔
903
        m_notifier_transaction = m_db->start_read();
40,581✔
904
    }
85,146✔
905

126,120✔
906
    // We need to pick the final version to advance to while the lock is held
103,695✔
907
    // as otherwise if a commit is made while new notifiers are being advanced
126,120✔
908
    // we could end up advancing over the skip version. We create a transaction
103,695✔
909
    // object for it to make sure the version stays valid
126,120✔
910
    TransactionRef newest_transaction = m_db->start_read();
177,917✔
911
    VersionID version = newest_transaction->get_version_of_current_transaction();
177,917✔
912

109,317✔
913
    auto skip_version = std::move(m_notifier_skip_version);
158,702✔
914

98,358✔
915
    // Make a copy of the notifiers vector and then release the lock to avoid
86,626✔
916
    // blocking other threads trying to register or unregister notifiers while we run them
86,626✔
917
    decltype(m_notifiers) notifiers;
155,226✔
918
    if (version != m_notifier_transaction->get_version_of_current_transaction()) {
155,226✔
919
        // We only want to rerun the existing notifiers if the version has changed.
56,593✔
920
        // This is both a minor optimization and required for notification
56,593✔
921
        // skipping to work. The skip logic assumes that the notifier can't be
56,593✔
922
        // running when suppress_next() is called because it can only be called
56,593✔
923
        // from within a write transaction, and starting the write transaction
56,593✔
924
        // would have blocked until the notifier is done running. However, if we
64,849✔
925
        // run the notifiers at a point where the version isn't changing, that
64,849✔
926
        // could happen concurrently with a call to suppress_next(), and we
56,593✔
927
        // could unset skip_next on a callback from that zero-version run
64,849✔
928
        // rather than the intended one.
56,593✔
929
        //
56,593✔
930
        // Spurious wakeups can happen in a few ways: adding a new notifier,
56,593✔
931
        // adding a new notifier in a different process sharing this Realm file,
64,849✔
932
        // closing the Realm in a different process, and possibly some other cases.
64,849✔
933
        notifiers = m_notifiers;
89,506✔
934
    }
89,506✔
935
    else {
72,255✔
936
        REALM_ASSERT(!skip_version);
72,255✔
937
        if (m_new_notifiers.empty()) {
72,255✔
938
            // We were spuriously woken up and there isn't actually anything to do
18,722✔
939
            return;
31,092✔
940
        }
31,092✔
941
    }
130,669✔
942

74,439✔
943
    auto new_notifiers = std::move(m_new_notifiers);
130,669✔
944
    m_new_notifiers.clear();
130,669✔
945
    m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
130,669✔
946
    lock.unlock();
130,669✔
947

78,903✔
948
    // Advance all of the new notifiers to the most recent version, if any
78,903✔
949
    std::vector<TransactionChangeInfo> new_notifier_change_info;
131,402✔
950
    if (!new_notifiers.empty()) {
131,402✔
951
        new_notifier_change_info.reserve(new_notifiers.size());
47,716✔
952
        for (auto& notifier : new_notifiers) {
83,496✔
953
            if (notifier->version() == version)
84,715✔
954
                continue;
81,835✔
955
            new_notifier_change_info.emplace_back();
18,298✔
956
            notifier->add_required_change_info(new_notifier_change_info.back());
11,261✔
957
            transaction::parse(*newest_transaction, new_notifier_change_info.back(), notifier->version().version,
18,298✔
958
                               version.version);
18,298✔
959
        }
18,298✔
960
    }
56,598✔
961

77,017✔
962
    // If the skip version is set and we have more than one version to process,
77,017✔
963
    // we need to start with just the skip version so that any suppressed
84,054✔
964
    // callbacks can ignore the changes from it without missing changes from
84,054✔
965
    // later versions. If the skip version is set and there aren't any more
73,789✔
966
    // versions after it, we just want to process with normal processing. See
79,058✔
967
    // the above note about spurious wakeups for why this is required for
79,058✔
968
    // correctness and not just a very minor optimization.
78,698✔
969
    if (skip_version && skip_version->get_version_of_current_transaction() != version) {
125,226✔
970
        REALM_ASSERT(!notifiers.empty());
392✔
971
        REALM_ASSERT(version >= skip_version->get_version_of_current_transaction());
392✔
972
        TransactionChangeInfo info;
392✔
973
        for (auto& notifier : notifiers)
392✔
974
            notifier->add_required_change_info(info);
5,185✔
975
        transaction::advance(*m_notifier_transaction, info, skip_version->get_version_of_current_transaction());
8,413✔
976
        for (auto& notifier : notifiers)
8,413✔
977
            notifier->run();
8,413✔
978

8,397✔
979
        util::CheckedLockGuard lock(m_notifier_mutex);
8,413✔
980
        for (auto& notifier : notifiers)
8,413✔
981
            notifier->prepare_handover();
8,413✔
982
    }
8,413✔
983

84,054✔
984
    // Advance the non-new notifiers to the same version as we advanced the new
68,640✔
985
    // ones to (or the latest if there were no new ones)
68,640✔
986
    TransactionChangeInfo change_info;
124,870✔
987
    for (auto& notifier : notifiers) {
139,693✔
988
        notifier->add_required_change_info(change_info);
139,693✔
989
    }
139,693✔
990
    transaction::advance(*m_notifier_transaction, change_info, version);
124,870✔
991

68,640✔
992
    {
124,868✔
993
        // If there's multiple notifiers for a single collection, we only populate
68,640✔
994
        // the data for the first one during parsing and need to copy it to the
68,640✔
995
        // others. This is a reverse scan where each collection looks for the
68,640✔
996
        // first collection with the same id. It is O(N^2), but typically the
68,640✔
997
        // number of collections observed will be very small.
77,017✔
998
        auto id = [](auto const& c) {
97,721✔
999
            return std::tie(c.table_key, c.path, c.obj_key);
49,789✔
1000
        };
56,826✔
1001
        auto& collections = change_info.collections;
142,140✔
1002
        for (size_t i = collections.size(); i > 0; --i) {
174,015✔
1003
            for (size_t j = 0; j < i - 1; ++j) {
51,309✔
1004
                if (id(collections[i - 1]) == id(collections[j])) {
36,122✔
1005
                    collections[i - 1].changes->merge(CollectionChangeBuilder{*collections[j].changes});
26,925✔
1006
                    break;
33,962✔
1007
                }
26,925✔
1008
            }
29,085✔
1009
        }
40,256✔
1010
    }
133,247✔
1011

77,017✔
1012
    // Now that they're at the same version, switch the new notifiers over to
79,605✔
1013
    // the main Transaction used for background work rather than the temporary one
73,812✔
1014
    for (auto& notifier : new_notifiers) {
115,469✔
1015
        notifier->attach_to(m_notifier_transaction);
98,748✔
1016
        notifier->run();
102,734✔
1017
    }
87,586✔
1018

71,224✔
1019
    // Change info is now all ready, so the notifiers can now perform their
70,954✔
1020
    // background work
70,954✔
1021
    for (auto& notifier : notifiers) {
142,007✔
1022
        notifier->run();
142,277✔
1023
    }
143,675✔
1024

84,054✔
1025
    // Reacquire the lock while updating the fields that are actually read on
77,017✔
1026
    // other threads
77,017✔
1027
    util::CheckedLockGuard lock2(m_notifier_mutex);
133,247✔
1028
    for (auto& notifier : new_notifiers) {
123,883✔
1029
        notifier->prepare_handover();
93,752✔
1030
    }
93,752✔
1031
    for (auto& notifier : notifiers) {
150,111✔
1032
        notifier->prepare_handover();
148,070✔
1033
    }
148,070✔
1034
    clean_up_dead_notifiers();
133,247✔
1035
    if (!m_notifiers.empty())
142,140✔
1036
        m_notifier_handover_transaction = m_db->start_read(version);
142,091✔
1037
}
142,140✔
1038

8,381✔
1039
void RealmCoordinator::advance_to_ready(Realm& realm)
8,381✔
1040
{
66,208✔
1041
    // If callbacks close the Realm the last external reference may go away
39,020✔
1042
    // while we're in this function
37,192✔
1043
    auto self = shared_from_this();
68,249✔
1044
    auto tr = Realm::Internal::get_transaction_ref(realm);
68,249✔
1045
    auto current_version = tr->get_version_of_current_transaction();
75,101✔
1046

40,876✔
1047
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
75,101✔
1048

39,020✔
1049
    // Transaction which will pin the version we're packaging for deliver to,
39,020✔
1050
    // to ensure it's not cleaned up between when we release the mutex and when
39,013✔
1051
    // we actually advance (which is not done while holding a lock).
39,020✔
1052
    std::shared_ptr<Transaction> handover_version_tr;
57,827✔
1053
    {
57,827✔
1054
        util::CheckedLockGuard lock(m_notifier_mutex);
65,062✔
1055

26,553✔
1056
        // If there are any new notifiers for this Realm then by definition they
26,553✔
1057
        // haven't run yet and aren't ready
30,837✔
1058
        for (auto& notifier : m_new_notifiers) {
30,837✔
1059
            if (notifier->is_for_realm(realm))
7,235!
1060
                return;
2,951✔
1061
        }
7,235✔
1062

26,553✔
1063
        for (auto& notifier : m_notifiers) {
71,753✔
1064
            if (!notifier->is_for_realm(realm))
66,680✔
1065
                continue;
3,319✔
1066
            // If the notifier hasn't run it isn't ready and we should do nothing
41,628✔
1067
            if (!notifier->has_run())
70,596✔
1068
                return;
7,235✔
1069
            // package_for_delivery() returning false indicates that it's been
37,344✔
1070
            // unregistered but not yet cleaned up, so it effectively doesn't exist
37,344✔
1071
            if (!notifier->package_for_delivery())
66,312✔
1072
                continue;
2,951✔
1073
            notifiers.push_back(notifier);
63,361!
1074
        }
63,361✔
1075

23,602✔
1076
        handover_version_tr = m_notifier_handover_transaction;
60,778✔
1077
    }
66,435✔
1078

31,583✔
1079
    if (notifiers.empty()) {
57,873✔
1080
        // If we have no notifiers for this Realm, just advance to latest
7,406✔
1081
        return transaction::advance(tr, realm.m_binding_context.get(), {});
30,196✔
1082
    }
22,261✔
1083

24,798✔
1084
    // If we have notifiers but no transaction, then they've never run before.
24,798✔
1085
    if (!handover_version_tr)
43,501✔
1086
        return;
×
1087

28,432✔
1088
    auto notifier_version = handover_version_tr->get_version_of_current_transaction();
43,501✔
1089
    // If the most recent write was performed via the Realm instance being
23,448✔
1090
    // advanced, the notifiers can be at an older version than the Realm.
27,732✔
1091
    // This means that there's no advancing to do
27,732✔
1092
    if (notifier_version < current_version)
38,517✔
1093
        return;
7,235✔
1094

20,884✔
1095
    // We can have notifications for the current version if it's the initial
23,272✔
1096
    // notification for a newly added callback or if the write was performed
23,272✔
1097
    // on this Realm instance. There might also be a newer version but we ignore
23,061✔
1098
    // it if so.
23,061✔
1099
    if (notifier_version == current_version) {
40,026✔
1100
        if (realm.m_binding_context)
117✔
1101
            realm.m_binding_context->will_send_notifications();
2,564✔
1102
        if (realm.is_closed())
4,577✔
1103
            return;
2,564✔
1104
        for (auto& notifier : notifiers)
2,681✔
1105
            notifier->after_advance();
2,681✔
1106
        if (realm.is_closed())
4,577✔
1107
            return;
×
1108
        if (realm.m_binding_context)
2,681✔
1109
            realm.m_binding_context->did_send_notifications();
2,564✔
1110
        return;
2,681✔
1111
    }
2,681✔
1112

22,987✔
1113
    // We have notifiers for a newer version, so advance to that
24,883✔
1114
    transaction::advance(tr, realm.m_binding_context.get(),
35,461✔
1115
                         _impl::NotifierPackage(std::move(notifiers), handover_version_tr));
35,449✔
1116
}
35,461✔
1117

1118
std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
12✔
1119
{
520,904✔
1120
    auto pred = [&](auto& notifier) {
322,144✔
1121
        return notifier->is_for_realm(realm);
134,797✔
1122
    };
134,809✔
1123
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
520,892✔
1124
    std::copy_if(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(ret), pred);
520,904✔
1125
    std::copy_if(m_notifiers.begin(), m_notifiers.end(), std::back_inserter(ret), pred);
520,904✔
1126
    return ret;
523,450✔
1127
}
523,450✔
1128

4,448✔
1129
bool RealmCoordinator::advance_to_latest(Realm& realm)
4,448✔
1130
{
103,254✔
1131
    // If callbacks close the Realm the last external reference may go away
49,195✔
1132
    // while we're in this function
49,195✔
1133
    auto self = shared_from_this();
158,059✔
1134
    auto tr = Realm::Internal::get_transaction_ref(realm);
136,560✔
1135

66,495✔
1136
    NotifierVector notifiers;
116,106✔
1137
    {
158,059✔
1138
        util::CheckedUniqueLock lock(m_notifier_mutex);
158,059✔
1139
        notifiers = notifiers_for_realm(realm);
158,059✔
1140
    }
158,059✔
1141
    auto pin_tr = package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());
158,059✔
1142

49,195✔
1143
    auto prev_version = tr->get_version_of_current_transaction();
98,806✔
1144
    transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers), pin_tr));
108,477✔
1145
    return !realm.is_closed() && prev_version != tr->get_version_of_current_transaction();
103,971✔
1146
}
103,971✔
1147

9,671✔
1148
void RealmCoordinator::promote_to_write(Realm& realm)
9,671✔
1149
{
427,251✔
1150
    REALM_ASSERT(!realm.is_in_transaction());
431,757✔
1151
    // If callbacks close the Realm the last external reference may go away
215,873✔
1152
    // while we're in this function
215,873✔
1153
    auto self = shared_from_this();
431,757✔
1154

215,873✔
1155
    util::CheckedUniqueLock lock(m_notifier_mutex);
431,757✔
1156
    auto notifiers = notifiers_for_realm(realm);
427,251✔
1157
    lock.unlock();
431,757✔
1158

215,873✔
1159
    transaction::begin(Realm::Internal::get_transaction_ref(realm), realm.m_binding_context.get(),
431,757✔
1160
                       {std::move(notifiers), this});
431,757✔
1161
}
422,086✔
1162

1163
void RealmCoordinator::process_available_async(Realm& realm)
49,582✔
1164
{
479,157✔
1165
    REALM_ASSERT(!realm.is_in_transaction());
453,782✔
1166
    // If callbacks close the Realm the last external reference may go away
229,995✔
1167
    // while we're in this function
255,370✔
1168
    auto self = shared_from_this();
453,782✔
1169

255,370✔
1170
    auto current_version = realm.current_transaction_version();
479,157✔
1171
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
479,157✔
1172

229,995✔
1173
    {
479,157✔
1174
        util::CheckedLockGuard lock(m_notifier_mutex);
479,157✔
1175
        // No handover transaction means there can't be anything waiting to deliver
255,370✔
1176
        if (!m_notifier_handover_transaction)
429,575✔
1177
            return;
371,233✔
1178
        // If we have a read transaction, it needs to be an exact match in version
78,641✔
1179
        // to the notifications as we're only delivering initial notifications
78,641✔
1180
        // and not advancing.
52,796✔
1181
        if (current_version &&
82,000✔
1182
            current_version != m_notifier_handover_transaction->get_version_of_current_transaction())
107,805✔
1183
            return;
23,972✔
1184

78,506✔
1185
        for (auto& notifier : m_notifiers) {
171,280✔
1186
            if (!notifier->is_for_realm(realm) || !notifier->has_run() || !notifier->package_for_delivery())
145,435✔
1187
                continue;
50,288✔
1188
            notifiers.push_back(notifier);
170,495✔
1189
        }
144,650✔
1190
    }
107,531✔
1191
    if (notifiers.empty())
100,229✔
1192
        return;
4,012✔
1193

32,437✔
1194
    if (realm.m_binding_context)
61,312✔
1195
        realm.m_binding_context->will_send_notifications();
7,334✔
1196
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
64,961✔
1197
        return;
53✔
1198
    for (auto& notifier : notifiers)
61,276✔
1199
        notifier->after_advance();
136,214✔
1200
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
72,886✔
1201
        return;
128✔
1202
    if (realm.m_binding_context)
72,758✔
1203
        realm.m_binding_context->did_send_notifications();
15,126✔
1204
}
64,897✔
1205

7,265✔
1206
TransactionRef RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::version_type target_version)
55✔
1207
{
154,437✔
1208
    auto ready = [&] {
169,386✔
1209
        util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
162,180✔
1210
        bool up_to_date =
169,386✔
1211
            m_notifier_handover_transaction &&
162,178✔
1212
            m_notifier_handover_transaction->get_version_of_current_transaction().version >= target_version;
122,117✔
1213
        return std::all_of(begin(notifiers), end(notifiers), [&](auto const& n) {
141,680✔
1214
            return !n->have_callbacks() || (n->has_run() && up_to_date);
99,448✔
1215
        });
92,242✔
1216
    };
169,382✔
1217

75,132✔
1218
    if (!ready()) {
158,044✔
1219
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
11,338✔
1220
        // The worker thread may have run the notifiers we need while we were
10,397✔
1221
        // waiting for the lock, so re-check
26,574✔
1222
        if (!ready())
29,194✔
1223
            run_async_notifiers();
19,071✔
1224
    }
29,194✔
1225

92,988✔
1226
    util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
164,566✔
1227
    // If the notifiers are still out of date, that means none of them have callbacks
90,317✔
1228
    // so we don't want to block the calling thread to run them.
87,293✔
1229
    if (!m_notifier_handover_transaction ||
162,999✔
1230
        m_notifier_handover_transaction->get_version_of_current_transaction().version < target_version) {
139,923✔
1231
        notifiers.clear();
100,974✔
1232
        return nullptr;
108,743✔
1233
    }
94,245✔
1234

31,080✔
1235
    auto package = [&](auto& notifier) {
82,411✔
1236
        return !notifier->has_run() || !notifier->package_for_delivery();
82,511✔
1237
    };
80,998✔
1238
    notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
59,951✔
1239
    return notifiers.empty() ? nullptr : m_notifier_handover_transaction;
66,584✔
1240
}
74,449✔
1241

8,408✔
1242
bool RealmCoordinator::compact()
8,408✔
1243
{
16,193✔
1244
    return m_db->compact();
12,510✔
1245
}
8,540✔
1246

8,524✔
1247
void RealmCoordinator::write_copy(StringData path, const char* key)
8,524✔
1248
{
4,130✔
1249
    m_db->write_copy(path, key);
10,633✔
1250
}
10,633✔
1251

10,473✔
1252
void RealmCoordinator::async_request_write_mutex(Realm& realm)
7,653✔
1253
{
11,945✔
1254
    auto tr = Realm::Internal::get_transaction_ref(realm);
11,957✔
1255
    m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
4,021✔
1256
        auto& scheduler = *realm->scheduler();
3,957✔
1257
        scheduler.invoke([realm = std::move(realm)] {
3,959✔
1258
            Realm::Internal::run_writes(*realm);
3,959✔
1259
        });
3,959✔
1260
    });
3,957✔
1261
}
4,304✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc