• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_275

09 Apr 2024 03:33AM UTC coverage: 92.608% (+0.5%) from 92.088%
thomas.goyne_275

Pull #7300

Evergreen

tgoyne
Extract some duplicated code in PushClient
Pull Request #7300: Rework sync user handling and metadata storage

102672 of 194970 branches covered (52.66%)

3165 of 3247 new or added lines in 46 files covered. (97.47%)

34 existing lines in 9 files now uncovered.

249420 of 269329 relevant lines covered (92.61%)

45087511.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/src/realm/object-store/impl/realm_coordinator.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2015 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/realm_coordinator.hpp>
20

21
#include <realm/object-store/impl/collection_notifier.hpp>
22
#include <realm/object-store/impl/external_commit_helper.hpp>
23
#include <realm/object-store/impl/transact_log_handler.hpp>
24
#include <realm/object-store/impl/weak_realm_notifier.hpp>
25
#include <realm/object-store/audit.hpp>
26
#include <realm/object-store/binding_context.hpp>
27
#include <realm/object-store/object_schema.hpp>
28
#include <realm/object-store/object_store.hpp>
29
#include <realm/object-store/property.hpp>
30
#include <realm/object-store/schema.hpp>
31
#include <realm/object-store/thread_safe_reference.hpp>
32
#include <realm/object-store/util/scheduler.hpp>
33

34
#if REALM_ENABLE_SYNC
35
#include <realm/object-store/sync/impl/sync_file.hpp>
36
#include <realm/object-store/sync/async_open_task.hpp>
37
#include <realm/object-store/sync/sync_manager.hpp>
38
#include <realm/object-store/sync/sync_session.hpp>
39
#include <realm/object-store/sync/sync_user.hpp>
40
#include <realm/sync/history.hpp>
41
#include <realm/sync/noinst/client_history_impl.hpp>
42
#endif
43

44
#include <realm/db.hpp>
45
#include <realm/history.hpp>
46
#include <realm/string_data.hpp>
47
#include <realm/util/fifo_helper.hpp>
48
#include <realm/sync/config.hpp>
49

50
#include <algorithm>
51
#include <unordered_map>
52

53
using namespace realm;
54
using namespace realm::_impl;
55

56
static auto& s_coordinator_mutex = *new std::mutex;
57
static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
58

59
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
54,804✔
60
{
424,446✔
61
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
396,938✔
62

237,084✔
63
    auto& weak_coordinator = s_coordinators_per_path[path];
424,446✔
64
    if (auto coordinator = weak_coordinator.lock()) {
397,626✔
65
        return coordinator;
224,083✔
66
    }
209,333✔
67

112,555✔
68
    auto coordinator = std::make_shared<RealmCoordinator>(Private());
200,363✔
69
    weak_coordinator = coordinator;
200,363✔
70
    return coordinator;
200,363✔
71
}
173,543✔
72

73
std::shared_ptr<RealmCoordinator>
74
RealmCoordinator::get_coordinator(const Realm::Config& config) NO_THREAD_SAFETY_ANALYSIS
110✔
75
{
880✔
76
    auto coordinator = get_coordinator(config.path);
880✔
77
    util::CheckedLockGuard lock(coordinator->m_realm_mutex);
880✔
78
    coordinator->set_config(config);
880✔
79
    coordinator->open_db();
880✔
80
    return coordinator;
880✔
81
}
770✔
82

83
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
14✔
84
{
112✔
85
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
112✔
86
    return s_coordinators_per_path[path].lock();
112✔
87
}
98✔
88

89
void RealmCoordinator::set_config(const Realm::Config& config)
33,217✔
90
{
252,337✔
91
    if (config.encryption_key.data() && config.encryption_key.size() != 64)
219,122✔
92
        throw InvalidEncryptionKey();
33,229✔
93
    if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
219,106✔
94
        throw InvalidArgument(ErrorCodes::IllegalCombination,
×
95
                              "Synchronized Realms cannot be opened in immutable mode");
33,215✔
96
    if ((config.schema_mode == SchemaMode::AdditiveDiscovered ||
252,276✔
97
         config.schema_mode == SchemaMode::AdditiveExplicit) &&
239,685✔
98
        config.migration_function)
139,891✔
99
        throw InvalidArgument(ErrorCodes::IllegalCombination,
32✔
100
                              "Realms opened in Additive-only schema mode do not use a migration function");
33,239✔
101
    if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
219,080✔
102
        throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
103
                              "Realms opened in immutable mode do not use a migration function");
33,223✔
104
    if (config.schema_mode == SchemaMode::ReadOnly && config.migration_function)
219,066✔
105
        throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
106
                              "Realms opened in read-only mode do not use a migration function");
33,221✔
107
    if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
219,052✔
108
        throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
109
                              "Realms opened in immutable mode do not use an initialization function");
33,219✔
110
    if (config.schema_mode == SchemaMode::ReadOnly && config.initialization_function)
219,038✔
111
        throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
112
                              "Realms opened in read-only mode do not use an initialization function");
33,217✔
113
    if (config.schema && config.schema_version == ObjectStore::NotVersioned)
219,024✔
114
        throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
115
                              "A schema version must be specified when the schema is specified");
33,215✔
116
    if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
219,010✔
117
        throw InvalidArgument(
16✔
118
            ErrorCodes::IllegalCombination,
16✔
119
            "In-memory realms initialized from memory buffers can only be opened in read-only mode");
33,213✔
120
    if (!config.realm_data.is_null() && !config.path.empty())
218,996✔
121
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Specifying both memory buffer and path is invalid");
33,211✔
122
    if (!config.realm_data.is_null() && !config.encryption_key.empty())
218,982✔
123
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Memory buffers do not support encryption");
33,209✔
124
    if (config.in_memory && !config.encryption_key.empty()) {
218,968✔
125
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Encryption is not supported for in-memory realms");
16✔
126
    }
16,058✔
127
    // ResetFile also won't use the migration function, but specifying one is
121,883✔
128
    // allowed to simplify temporarily switching modes during development
121,883✔
129

139,032✔
130
#if REALM_ENABLE_SYNC
252,145✔
131
    if (config.sync_config) {
220,725✔
132
        if (config.sync_config->flx_sync_requested && !config.sync_config->partition_value.empty()) {
12,609✔
133
            throw InvalidArgument(ErrorCodes::IllegalCombination,
16✔
134
                                  "Cannot specify a partition value when flexible sync is enabled");
16✔
135
        }
1,785✔
136
    }
218,938✔
137
#endif
218,938✔
138

105,832✔
139
    bool no_existing_realm =
252,129✔
140
        std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [](auto& notifier) {
164,262✔
141
            return notifier.expired();
61,330✔
142
        });
78,478✔
143
    if (no_existing_realm) {
238,535✔
144
        m_config = config;
180,010✔
145
        m_config.scheduler = nullptr;
180,010✔
146
    }
206,845✔
147
    else {
72,120✔
148
        if (m_config.immutable() != config.immutable()) {
72,120✔
149
            throw LogicError(
26,836✔
150
                ErrorCodes::MismatchedConfig,
6,355✔
151
                util::format("Realm at path '%1' already opened with different read permissions.", config.path));
6,355✔
152
        }
×
153
        if (m_config.in_memory != config.in_memory) {
45,284✔
154
            throw LogicError(
14✔
155
                ErrorCodes::MismatchedConfig,
14✔
156
                util::format("Realm at path '%1' already opened with different inMemory settings.", config.path));
6,369✔
157
        }
16✔
158
        if (m_config.encryption_key != config.encryption_key) {
45,272✔
159
            throw LogicError(
2✔
160
                ErrorCodes::MismatchedConfig,
2✔
161
                util::format("Realm at path '%1' already opened with a different encryption key.", config.path));
6,353✔
162
        }
×
163
        if (m_config.schema_mode != config.schema_mode) {
45,270✔
164
            throw LogicError(
14✔
165
                ErrorCodes::MismatchedConfig,
14✔
166
                util::format("Realm at path '%1' already opened with a different schema mode.", config.path));
6,367✔
167
        }
16✔
168
        util::CheckedLockGuard lock(m_schema_cache_mutex);
45,258✔
169
        if (config.schema && m_schema_version != ObjectStore::NotVersioned &&
45,258✔
170
            m_schema_version != config.schema_version) {
42,787✔
171
            throw LogicError(
6,365✔
172
                ErrorCodes::MismatchedConfig,
6,365✔
173
                util::format("Realm at path '%1' already opened with different schema version.", config.path));
6,012✔
174
        }
16✔
175

20,029✔
176
#if REALM_ENABLE_SYNC
45,244✔
177
        if (bool(m_config.sync_config) != bool(config.sync_config)) {
45,244✔
178
            throw LogicError(
2,799✔
179
                ErrorCodes::MismatchedConfig,
6,349✔
180
                util::format("Realm at path '%1' already opened with different sync configurations.", config.path));
6,349✔
181
        }
×
182

20,027✔
183
        if (config.sync_config) {
45,242✔
184
            auto old_user = m_config.sync_config->user;
2,142✔
185
            auto new_user = config.sync_config->user;
4,941✔
186
            if (old_user != new_user) {
8,491✔
187
                throw LogicError(
306✔
188
                    ErrorCodes::MismatchedConfig,
306✔
189
                    util::format("Realm at path '%1' already opened with different sync user.", config.path));
306✔
190
            }
×
191

1,057✔
192
            if (m_config.sync_config->partition_value != config.sync_config->partition_value) {
2,142✔
193
                throw LogicError(
×
194
                    ErrorCodes::MismatchedConfig,
306✔
UNCOV
195
                    util::format("Realm at path '%1' already opened with different partition value.", config.path));
×
196
            }
×
197
            if (m_config.sync_config->flx_sync_requested != config.sync_config->flx_sync_requested) {
2,142✔
198
                throw LogicError(ErrorCodes::MismatchedConfig,
×
199
                                 util::format("Realm at path '%1' already opened in a different synchronization mode",
306✔
200
                                              config.path));
×
201
            }
×
202
        }
45,242✔
203
#endif
45,242✔
204
        // Mixing cached and uncached Realms is allowed
26,376✔
205
        m_config.cache = config.cache;
51,591✔
206

22,826✔
207
        // Realm::update_schema() handles complaining about schema mismatches
26,376✔
208
    }
48,041✔
209
}
221,737✔
210

6,349✔
211
std::shared_ptr<Realm> RealmCoordinator::get_cached_realm(Realm::Config const& config,
33,191✔
212
                                                          std::shared_ptr<util::Scheduler> scheduler)
213
{
1,344✔
214
    if (!config.cache)
1,344✔
215
        return nullptr;
1,480✔
216
    util::CheckedUniqueLock lock(m_realm_mutex);
248✔
217
    return do_get_cached_realm(config, scheduler);
240✔
218
}
64✔
219

8✔
220
std::shared_ptr<Realm> RealmCoordinator::do_get_cached_realm(Realm::Config const& config,
8✔
221
                                                             std::shared_ptr<util::Scheduler> scheduler)
222
{
228,359✔
223
    if (!config.cache)
228,359✔
224
        return nullptr;
257,202✔
225

34,751✔
226
    if (!scheduler) {
39,425✔
227
        scheduler = config.scheduler;
5,616✔
228
    }
6,400✔
229

1,008✔
230
    if (!scheduler)
6,496✔
231
        return nullptr;
30✔
232

1,024✔
233
    for (auto& cached_realm : m_weak_realm_notifiers) {
29,670✔
234
        if (!cached_realm.is_cached_for_scheduler(scheduler))
29,686✔
235
            continue;
29,532✔
236
        // can be null if we jumped in between ref count hitting zero and
4,312✔
237
        // unregister_realm() getting the lock
3,688✔
238
        if (auto realm = cached_realm.realm()) {
4,378✔
239
            // If the file is uninitialized and was opened without a schema,
80✔
240
            // do the normal schema init
694✔
241
            if (realm->schema_version() == ObjectStore::NotVersioned)
4,378✔
242
                break;
38✔
243

680✔
244
            // Otherwise if we have a realm schema it needs to be an exact
60✔
245
            // match (even having the same properties but in different
64✔
246
            // orders isn't good enough)
64✔
247
            if (config.schema && realm->schema() != *config.schema)
4,348✔
248
                throw LogicError(
8✔
249
                    ErrorCodes::MismatchedConfig,
620✔
250
                    util::format("Realm at path '%1' already opened on current thread with different schema.",
×
251
                                 config.path));
×
252

56✔
253
            return realm;
4,340✔
254
        }
4,348✔
255
    }
4,988✔
256
    return nullptr;
2,034✔
257
}
6,322✔
258

202✔
259
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config, util::Optional<VersionID> version)
814✔
260
{
217,258✔
261
    REALM_ASSERT(!version || *version != VersionID());
217,258✔
262
    if (!config.scheduler)
250,209✔
263
        config.scheduler = version ? util::Scheduler::make_frozen(*version) : util::Scheduler::make_default();
198,992✔
264
    // realm must be declared before lock so that the mutex is released before
137,943✔
265
    // we release the strong reference to realm, as Realm's destructor may want
128,606✔
266
    // to acquire the same lock
120,915✔
267
    std::shared_ptr<Realm> realm;
233,181✔
268
    util::CheckedUniqueLock lock(m_realm_mutex);
233,181✔
269
    set_config(config);
250,209✔
270
    if ((realm = do_get_cached_realm(config))) {
250,209✔
271
        REALM_ASSERT(!version || realm->read_transaction_version() == *version);
37,263✔
272
        return realm;
37,263✔
273
    }
4,928✔
274
    do_get_realm(std::move(config), realm, version, lock);
213,562✔
275
    if (version) {
213,562✔
276
        realm->read_group();
34,799✔
277
    }
34,799✔
278
    return realm;
213,298✔
279
}
213,298✔
280

32,335✔
281
std::shared_ptr<Realm> RealmCoordinator::get_realm(std::shared_ptr<util::Scheduler> scheduler, bool first_time_open)
32,335✔
282
{
6,790✔
283
    std::shared_ptr<Realm> realm;
6,790✔
284
    util::CheckedUniqueLock lock(m_realm_mutex);
7,760✔
285
    auto config = m_config;
7,760✔
286
    config.scheduler = scheduler ? scheduler : util::Scheduler::make_default();
7,522✔
287
    if ((realm = do_get_cached_realm(config))) {
7,760✔
288
        return realm;
936✔
289
    }
970✔
290
    do_get_realm(std::move(config), realm, none, lock, first_time_open);
6,790✔
291
    return realm;
6,790✔
292
}
7,760✔
293

970✔
294
std::shared_ptr<Realm> RealmCoordinator::freeze_realm(const Realm& source_realm)
970✔
295
{
4,479✔
296
    std::shared_ptr<Realm> realm;
4,479✔
297
    util::CheckedUniqueLock lock(m_realm_mutex);
5,123✔
298

2,877✔
299
    auto version = source_realm.read_transaction_version();
5,123✔
300
    auto scheduler = util::Scheduler::make_frozen(version);
4,804✔
301
    if ((realm = do_get_cached_realm(source_realm.config(), scheduler))) {
5,123✔
302
        return realm;
658✔
303
    }
658✔
304

2,228✔
305
    auto config = source_realm.config();
4,467✔
306
    config.scheduler = scheduler;
4,789✔
307
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
5,107✔
308
    Realm::Internal::copy_schema(*realm, source_realm);
5,107✔
309
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
5,107✔
310
    return realm;
5,107✔
311
}
5,107✔
312

642✔
313
ThreadSafeReference RealmCoordinator::get_unbound_realm()
642✔
314
{
1,652✔
315
    std::shared_ptr<Realm> realm;
1,652✔
316
    util::CheckedUniqueLock lock(m_realm_mutex);
1,888✔
317
    do_get_realm(RealmConfig(m_config), realm, none, lock);
1,888✔
318
    return ThreadSafeReference(realm);
1,888✔
319
}
1,888✔
320

236✔
321
void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>& realm,
236✔
322
                                    util::Optional<VersionID> version, util::CheckedUniqueLock& realm_lock,
323
                                    bool first_time_open)
324
{
221,164✔
325
    const auto db_created = open_db();
221,164✔
326
#ifdef REALM_ENABLE_SYNC
254,673✔
327
    SyncConfig::SubscriptionInitializerCallback subscription_function = nullptr;
254,673✔
328
    bool rerun_on_open = false;
254,673✔
329
    if (config.sync_config && config.sync_config->flx_sync_requested &&
254,673✔
330
        config.sync_config->subscription_initializer) {
144,871✔
331
        subscription_function = config.sync_config->subscription_initializer;
34,517✔
332
        rerun_on_open = config.sync_config->rerun_init_subscription_on_open;
17,841✔
333
    }
1,152✔
334
#else
144✔
335
    static_cast<void>(first_time_open);
144✔
336
    static_cast<void>(db_created);
337
#endif
338

109,059✔
339
    auto schema = std::move(config.schema);
221,164✔
340
    auto migration_function = std::move(config.migration_function);
237,668✔
341
    auto initialization_function = std::move(config.initialization_function);
254,673✔
342
    config.schema = {};
254,673✔
343

142,568✔
344
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
254,673✔
345
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
237,668✔
346

142,568✔
347
#ifdef REALM_ENABLE_SYNC
254,673✔
348
    if (m_sync_session && m_sync_session->user()->is_logged_in())
237,668✔
349
        m_sync_session->revive_if_needed();
46,193✔
350

142,568✔
351
    if (realm->config().audit_config) {
222,948✔
352
        if (m_audit_context)
17,050✔
353
            m_audit_context->update_metadata(realm->config().audit_config->metadata);
33,537✔
354
        else
596✔
355
            m_audit_context = make_audit_context(m_db, realm->config());
522✔
356
    }
620✔
357
#else
74✔
358
    if (realm->config().audit_config)
78✔
359
        REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
360
#endif
361

109,059✔
362
    // Cached frozen Realms need to initialize their schema before releasing
109,059✔
363
    // the lock as otherwise they could be read from the cache on another thread
125,563✔
364
    // before the schema initialization happens. They'll never perform a write
125,563✔
365
    // transaction, so unlike with live Realms this is safe to do.
125,563✔
366
    if (config.cache && version && schema) {
237,668!
367
        realm->update_schema(std::move(*schema));
16,504✔
368
        schema.reset();
33,509!
369
    }
×
370

109,059✔
371
    realm_lock.unlock_unchecked();
221,164✔
372
    if (schema) {
237,668✔
373
        realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
209,999✔
374
                             std::move(initialization_function));
209,999✔
375
    }
203,685✔
376

136,254✔
377
#ifdef REALM_ENABLE_SYNC
248,359✔
378
    // run subscription initializer if the SDK has instructed core to do so. The subscription callback will be run if:
125,563✔
379
    // 1. this is the first time we are creating the realm file
142,568✔
380
    // 2. the database was already created, but this is the first time we are opening the db and the flag
125,563✔
381
    // rerun_on_open was set
125,563✔
382
    if (subscription_function) {
237,668✔
383
        const auto current_subscription = realm->get_latest_subscription_set();
17,484✔
384
        const auto subscription_version = current_subscription.version();
34,489✔
385
        // in case we are hitting this check while during a normal open, we need to take in
630✔
386
        // consideration if the db was created during this call. Since this may be the first time
630✔
387
        // we are actually creating a realm. For async open this does not apply, infact db_created
560✔
388
        // will always be false.
560✔
389
        if (!first_time_open)
1,050✔
390
            first_time_open = db_created;
777✔
391
        if (subscription_version == 0 || (first_time_open && rerun_on_open)) {
1,120✔
392
            bool was_in_read = realm->is_in_read_transaction();
619✔
393
            subscription_function(realm);
658✔
394
            if (!was_in_read)
592✔
395
                realm->invalidate();
592✔
396
        }
592✔
397
    }
1,054✔
398
#endif
221,238✔
399
}
221,304✔
400

33,509✔
401
void RealmCoordinator::bind_to_context(Realm& realm)
33,509✔
402
{
1,330✔
403
    util::CheckedLockGuard lock(m_realm_mutex);
1,330✔
404
    for (auto& cached_realm : m_weak_realm_notifiers) {
2,010✔
405
        if (!cached_realm.is_for_realm(&realm))
2,010✔
406
            continue;
750✔
407
        cached_realm.bind_to_scheduler();
1,590✔
408
        return;
1,400✔
409
    }
1,520✔
410
    REALM_TERMINATE("Invalid Realm passed to bind_to_context()");
190✔
411
}
190✔
412

413
#if REALM_ENABLE_SYNC
414
std::shared_ptr<AsyncOpenTask> RealmCoordinator::get_synchronized_realm(Realm::Config config)
415
{
1,092✔
416
    if (!config.sync_config)
1,092✔
417
        throw LogicError(ErrorCodes::IllegalOperation,
156✔
418
                         "This method is only available for fully synchronized Realms.");
156✔
419

546✔
420
    util::CheckedLockGuard lock(m_realm_mutex);
1,092✔
421
    set_config(config);
1,170✔
422
    const auto db_open_first_time = open_db();
1,248✔
423
    return std::make_shared<AsyncOpenTask>(AsyncOpenTask::Private(), shared_from_this(), m_sync_session,
1,248✔
424
                                           db_open_first_time);
1,248✔
425
}
1,248✔
426

156✔
427
#endif
156✔
428

429
bool RealmCoordinator::open_db()
430
{
223,166✔
431
    if (m_db)
223,166✔
432
        return false;
83,543✔
433

119,467✔
434
#if REALM_ENABLE_SYNC
180,411✔
435
    if (m_config.sync_config) {
186,643✔
436
        // If we previously opened this Realm, we may have a lingering sync
31,289✔
437
        // session which outlived its RealmCoordinator. If that happens we
31,289✔
438
        // want to reuse it instead of creating a new DB.
5,898✔
439
        m_sync_session = m_config.sync_config->user->sync_manager()->get_existing_session(m_config.path);
10,702✔
440
        if (m_sync_session) {
10,702✔
441
            m_db = SyncSession::Internal::get_db(*m_sync_session);
808✔
442
            init_external_helpers();
1,592✔
443
            return false;
1,587✔
444
        }
1,587✔
445
    }
174,648✔
446
#endif
173,262✔
447

85,609✔
448
    bool server_synchronization_mode = m_config.sync_config || m_config.force_sync_history;
173,262✔
449
    bool schema_mode_reset_file =
173,262✔
450
        m_config.schema_mode == SchemaMode::SoftResetFile || m_config.schema_mode == SchemaMode::HardResetFile;
200,014✔
451
    try {
200,014✔
452
        if (m_config.immutable() && m_config.realm_data) {
186,450✔
453
            m_db = DB::create(m_config.realm_data, false);
26,791✔
454
            return true;
26,791✔
455
        }
26,791✔
456
        std::unique_ptr<Replication> history;
200,000✔
457
        if (server_synchronization_mode) {
200,000✔
458
#if REALM_ENABLE_SYNC
59,659✔
459
            bool apply_server_changes = !m_config.sync_config || m_config.sync_config->apply_server_changes;
59,659✔
460
            history = std::make_unique<sync::ClientReplication>(apply_server_changes);
59,659✔
461
#else
26,775✔
462
            REALM_TERMINATE("Realm was not built with sync enabled");
26,775✔
463
#endif
8,500✔
464
        }
68,157✔
465
        else if (!m_config.immutable()) {
122,066✔
466
            history = make_in_realm_history();
113,111✔
467
        }
113,111✔
468

85,577✔
469
        DBOptions options;
181,723✔
470
#ifndef __EMSCRIPTEN__
191,498✔
471
        options.enable_async_writes = true;
191,438✔
472
#endif
191,438✔
473
        options.durability = m_config.in_memory ? DBOptions::Durability::MemOnly : DBOptions::Durability::Full;
154,145✔
474
        options.is_immutable = m_config.immutable();
199,998✔
475
        options.logger = util::Logger::get_default_logger();
199,998✔
476

112,352✔
477
        if (!m_config.fifo_files_fallback_path.empty()) {
199,998✔
478
            options.temp_dir = util::normalize_dir(m_config.fifo_files_fallback_path);
21,148✔
479
        }
26,803✔
480
        options.encryption_key = m_config.encryption_key.data();
199,998✔
481
        options.allow_file_format_upgrade = !m_config.disable_format_upgrade && !schema_mode_reset_file;
186,435✔
482
        options.clear_on_invalid_file = m_config.clear_on_invalid_file;
199,998✔
483
        if (history) {
173,227✔
484
            options.backup_at_file_format_change = m_config.backup_at_file_format_change;
172,772✔
485
#ifdef __EMSCRIPTEN__
26,775✔
486
            // Force the DB to be created in memory-only mode, ignoring the filesystem path supplied in the config.
26,775✔
487
            // This is so we can run an SDK on top without having to solve the browser persistence problem yet,
26,775✔
488
            // or teach RealmConfig and SDKs about pure in-memory realms.
26,775✔
489
            m_db = DB::create_in_memory(std::move(history), m_config.path, options);
26,715✔
490
#else
491
            if (m_config.path.size()) {
172,768✔
492
                m_db = DB::create(std::move(history), m_config.path, options);
113,478✔
493
            }
113,478✔
494
            else {
59,290✔
495
                m_db = DB::create(std::move(history), options);
59,290✔
496
            }
86,005✔
497
#endif
191,013✔
498
        }
191,013✔
499
        else {
8,925✔
500
            options.no_create = true;
8,925✔
501
            m_db = DB::create(m_config.path, options);
8,925✔
502
        }
27,170✔
503
    }
199,938✔
504
    catch (realm::FileFormatUpgradeRequired const&) {
85,644✔
505
        if (!schema_mode_reset_file) {
60!
506
            throw;
60✔
507
        }
60✔
508
        util::File::remove(m_config.path);
26,775✔
509
        return open_db();
13,213✔
510
    }
×
511
    catch (UnsupportedFileFormatVersion const&) {
×
512
        if (!schema_mode_reset_file) {
×
513
            throw;
×
514
        }
×
515
        util::File::remove(m_config.path);
×
516
        return open_db();
×
517
    }
×
518

85,542✔
519
    if (m_config.should_compact_on_launch_function) {
173,146✔
520
        size_t free_space = 0;
112✔
521
        size_t used_space = 0;
112✔
522
        if (auto tr = m_db->start_write(true)) {
112✔
523
            tr->commit();
13,318✔
524
            m_db->get_stats(free_space, used_space);
26,875✔
525
        }
128✔
526
        if (free_space > 0 && m_config.should_compact_on_launch_function(free_space + used_space, used_space))
128✔
527
            m_db->compact();
72✔
528
    }
128✔
529

85,558✔
530
    init_external_helpers();
173,162✔
531
    return true;
173,162✔
532
}
173,154✔
533

16✔
534
void RealmCoordinator::init_external_helpers()
13,206✔
535
{
200,090✔
536
    // There's a circular dependency between SyncSession and ExternalCommitHelper
112,393✔
537
    // where sync commits notify ECH and other commits notify sync via ECH. This
112,393✔
538
    // happens on background threads, so to avoid needing locking on every access
85,630✔
539
    // we have to wire things up in a specific order.
85,630✔
540
#if REALM_ENABLE_SYNC
200,115✔
541
    // We may have reused an existing sync session that outlived its original
98,848✔
542
    // RealmCoordinator. If not, we need to create a new one now.
98,848✔
543
    if (m_config.sync_config && !m_sync_session)
186,545✔
544
        m_sync_session = m_config.sync_config->user->sync_manager()->get_session(m_db, m_config);
23,077✔
545
#endif
200,115✔
546

98,848✔
547
    if (!m_notifier && !m_config.immutable() && m_config.automatic_change_notifications) {
186,545✔
548
        try {
70,274✔
549
            m_notifier = std::make_unique<ExternalCommitHelper>(*this, m_config);
44,872✔
550
        }
43,491✔
551
        catch (std::system_error const& ex) {
21,684✔
552
            throw FileAccessError(ErrorCodes::FileOperationFailed,
5✔
553
                                  util::format("Failed to create ExternalCommitHelper: %1", ex.what()), get_path(),
5✔
554
                                  ex.code().value());
5✔
555
        }
1,381✔
556
    }
174,694✔
557
    m_db->add_commit_listener(this);
174,694✔
558
}
174,694✔
559

26,788✔
560
void RealmCoordinator::close()
13,218✔
561
{
26,925✔
562
    m_db->close();
6,226✔
563
    m_db = nullptr;
6,226✔
564
}
6,226✔
565

3,034✔
566
void RealmCoordinator::delete_and_reopen()
567
{
140✔
568
    util::CheckedLockGuard lock(m_realm_mutex);
140✔
569
    close();
140✔
570
    util::File::remove(m_config.path);
26,921✔
571
    open_db();
26,921✔
572
}
26,921✔
573

574
TransactionRef RealmCoordinator::begin_read(VersionID version, bool frozen_transaction)
575
{
597,262✔
576
    REALM_ASSERT(m_db);
597,262✔
577
    return frozen_transaction ? m_db->start_frozen(version) : m_db->start_read(version);
594,638✔
578
}
597,262✔
579

580
uint64_t RealmCoordinator::get_schema_version() const noexcept
581
{
20✔
582
    util::CheckedLockGuard lock(m_schema_cache_mutex);
20✔
583
    return m_schema_version;
20✔
584
}
20✔
585

20✔
586
bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
20✔
587
                                         uint64_t& transaction) const noexcept
588
{
356,682✔
589
    util::CheckedLockGuard lock(m_schema_cache_mutex);
448,027✔
590
    if (!m_cached_schema)
448,027✔
591
        return false;
394,756✔
592
    schema = *m_cached_schema;
144,243✔
593
    schema_version = m_schema_version;
52,898✔
594
    transaction = m_schema_transaction_version_max;
52,898✔
595
    return true;
52,898✔
596
}
52,898✔
597

598
void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
599
                                    uint64_t transaction_version)
600
{
342,045✔
601
    util::CheckedLockGuard lock(m_schema_cache_mutex);
342,045✔
602
    if (transaction_version < m_schema_transaction_version_max)
394,740✔
603
        return;
54,739✔
604
    if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
392,696✔
605
        return;
178,535✔
606

109,752✔
607
    m_cached_schema = new_schema;
214,161✔
608
    m_schema_version = new_schema_version;
214,161✔
609
    m_schema_transaction_version_min = transaction_version;
214,161✔
610
    m_schema_transaction_version_max = transaction_version;
214,161✔
611
}
206,718✔
612

613
void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
614
{
184,408✔
615
    util::CheckedLockGuard lock(m_schema_cache_mutex);
184,408✔
616
    m_cached_schema = util::none;
184,408✔
617
    m_schema_version = new_schema_version;
133,983✔
618
}
184,116✔
619

18,778✔
620
void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
15,643✔
621
{
954,686✔
622
    util::CheckedLockGuard lock(m_schema_cache_mutex);
954,686✔
623
    if (!m_cached_schema)
954,686✔
624
        return;
162,995✔
625
    REALM_ASSERT(previous <= m_schema_transaction_version_max);
823,338✔
626
    if (next < m_schema_transaction_version_min)
791,691✔
627
        return;
14✔
628
    m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
810,608✔
629
    m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
810,608✔
630
}
810,608✔
631

18,931✔
632
RealmCoordinator::RealmCoordinator(Private) {}
192,475✔
633

634
RealmCoordinator::~RealmCoordinator()
635
{
295,351✔
636
    {
295,351✔
637
        std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
295,351✔
638
        for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end();) {
396,039✔
639
            if (it->second.expired()) {
307,104✔
640
                it = s_coordinators_per_path.erase(it);
276,443✔
641
            }
173,237✔
642
            else {
133,867✔
643
                ++it;
133,867✔
644
            }
133,867✔
645
        }
203,896✔
646
    }
200,364✔
647

85,735✔
648
    if (m_db) {
173,544✔
649
        m_db->remove_commit_listener(this);
200,021✔
650
    }
200,021✔
651

112,555✔
652
    // Waits for the worker thread to join
143,852✔
653
    m_notifier.reset();
204,841✔
654

112,511✔
655
    // If there's any active NotificationTokens they'll keep the notifiers alive,
112,511✔
656
    // so tell the notifiers to release their Transactions so that the DB can
90,256✔
657
    // be closed immediately.
90,256✔
658
    // No locking needed here because the worker thread is gone
90,256✔
659
    for (auto& notifier : m_new_notifiers)
204,841✔
660
        notifier->release_data();
27,198✔
661
    for (auto& notifier : m_notifiers)
186,778✔
662
        notifier->release_data();
27,379✔
663
}
200,314✔
664

26,770✔
665
void RealmCoordinator::unregister_realm(Realm* realm)
13,234✔
666
{
240,242✔
667
    util::CheckedLockGuard lock(m_realm_mutex);
253,828✔
668
    // Normally results notifiers are cleaned up by the background worker thread
125,212✔
669
    // but if that's disabled we need to ensure that any notifiers from this
125,212✔
670
    // Realm get cleaned up
125,212✔
671
    if (!m_config.automatic_change_notifications) {
240,242✔
672
        util::CheckedLockGuard lock(m_notifier_mutex);
183,689✔
673
        clean_up_dead_notifiers();
197,275✔
674
    }
170,509✔
675
    {
253,828✔
676
        auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [=](auto& notifier) {
324,905✔
677
            return notifier.expired() || notifier.is_for_realm(realm);
351,646✔
678
        });
324,826✔
679
        m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
227,008✔
680
    }
261,350✔
681
}
261,350✔
682

16,924✔
683
// Thread-safety analysis doesn't reasonably handle calling functions on different
16,924✔
684
// instances of this type
16,924✔
685
void RealmCoordinator::clear_cache() NO_THREAD_SAFETY_ANALYSIS
34,342✔
686
{
26,725✔
687
    std::vector<std::shared_ptr<RealmCoordinator>> coordinators;
26,725✔
688
    {
26,725✔
689
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
34,678✔
690
        for (auto& weak_coordinator : s_coordinators_per_path) {
48,311✔
691
            if (auto coordinator = weak_coordinator.second.lock()) {
48,311✔
692
                coordinators.push_back(coordinator);
48,311✔
693
            }
34,678✔
694
        }
34,678✔
695
        s_coordinators_per_path.clear();
34,678✔
696
    }
336✔
697

168✔
698
    for (auto& coordinator : coordinators) {
336✔
699
        coordinator->m_notifier = nullptr;
336✔
700

216✔
701
        std::vector<std::shared_ptr<Realm>> realms_to_close;
384✔
702
        {
384✔
703
            // Gather a list of all of the realms which will be removed
216✔
704
            util::CheckedLockGuard lock(coordinator->m_realm_mutex);
384✔
705
            for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
384✔
706
                if (auto realm = weak_realm_notifier.realm()) {
384✔
707
                    realms_to_close.push_back(realm);
384✔
708
                }
384✔
709
            }
384✔
710
        }
384✔
711

192✔
712
        // Close all of the previously cached Realms. This can't be done while
216✔
713
        // locks are held as it may try to re-lock them.
216✔
714
        for (auto& realm : realms_to_close)
360✔
715
            realm->close();
384✔
716
    }
384✔
717
}
360✔
718

48✔
719
void RealmCoordinator::clear_all_caches()
48✔
720
{
790✔
721
    std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
790✔
722
    {
790✔
723
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
790✔
724
        for (auto iter : s_coordinators_per_path) {
587✔
725
            to_clear.push_back(iter.second);
360✔
726
        }
360✔
727
    }
766✔
728
    for (auto weak_coordinator : to_clear) {
587✔
729
        if (auto coordinator = weak_coordinator.lock()) {
384✔
730
            coordinator->clear_cache();
384✔
731
        }
384✔
732
    }
336✔
733
}
742✔
734

106✔
735
void RealmCoordinator::assert_no_open_realms() noexcept
106✔
736
{
7,211✔
737
#ifdef REALM_DEBUG
7,211✔
738
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
7,182✔
739
    REALM_ASSERT(s_coordinators_per_path.empty());
7,153✔
740
#endif
7,153✔
741
}
7,211✔
742

77✔
743
void RealmCoordinator::wake_up_notifier_worker()
48✔
744
{
74,879✔
745
    if (m_notifier) {
74,879✔
746
        // FIXME: this wakes up the notification workers for all processes and
461✔
747
        // not just us. This might be worth optimizing in the future.
519✔
748
        m_notifier->notify_others();
813✔
749
    }
813✔
750
}
75,846✔
751

1,015✔
752
void RealmCoordinator::commit_write(Realm& realm, bool commit_to_disk)
1,015✔
753
{
461,730✔
754
    REALM_ASSERT(!m_config.immutable());
461,730✔
755
    REALM_ASSERT(realm.is_in_transaction());
461,730✔
756

225,629✔
757
    Transaction& tr = Realm::Internal::get_transaction(realm);
460,715✔
758
    VersionID new_version;
471,401✔
759
    {
471,401✔
760
        // Need to acquire this lock before committing or another process could
225,682✔
761
        // perform a write and notify us before we get the chance to set the
225,682✔
762
        // skip version
225,741✔
763
        util::CheckedLockGuard l(m_notifier_mutex);
460,827✔
764
        new_version = tr.commit_and_continue_as_read(commit_to_disk);
471,401✔
765

225,629✔
766
        // The skip version must always be the notifier transaction's current
225,629✔
767
        // version plus one, as we can only skip a prefix and not intermediate
288,083✔
768
        // transactions. If we have a notifier for the current Realm, then we
288,083✔
769
        // waited until it finished running in begin_transaction() and this
288,083✔
770
        // invariant holds. If we don't have any notifiers then we don't need
256,201✔
771
        // to set the skip version, but more importantly *can't* because we
288,083✔
772
        // didn't block when starting the write and the notifier transaction
288,083✔
773
        // may still be on an older version.
288,083✔
774
        //
256,201✔
775
        // Note that this relies on the fact that callbacks cannot be added from
256,201✔
776
        // within write transactions. If they could be, we could hit this point
256,201✔
777
        // with an implicit-created notifier which ran (and so is in m_notifiers
288,083✔
778
        // and not m_new_notifiers) but didn't have a callback at the start of
288,083✔
779
        // the write so we didn't block for it then, but does now have a callback.
256,201✔
780
        // If we add support for that, we'll need to update this logic.
256,201✔
781
        bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(), [&](auto&& notifier) {
292,960✔
782
            return notifier->is_for_realm(realm) && notifier->have_callbacks();
104,697✔
783
        });
104,697✔
784
        if (have_notifiers) {
491,287✔
785
            REALM_ASSERT(!m_notifier_skip_version);
47,414✔
786
            REALM_ASSERT(m_notifier_transaction);
47,414✔
787
            REALM_ASSERT_3(m_notifier_transaction->get_transact_stage(), ==, DB::transact_Reading);
47,414✔
788
            REALM_ASSERT_3(m_notifier_transaction->get_version() + 1, ==, new_version.version);
47,414✔
789
            m_notifier_skip_version = tr.duplicate();
47,414✔
790
        }
47,414✔
791
    }
491,287✔
792

256,201✔
793
    if (realm.m_binding_context) {
491,287✔
794
        realm.m_binding_context->did_change({}, {});
30,754✔
795
    }
35,994✔
796
    // note: no longer safe to access `realm` or `this` after this point as
236,298✔
797
    // did_change() may have closed the Realm.
236,298✔
798
}
523,169✔
799

2,406✔
800
void RealmCoordinator::enable_wait_for_change()
2,406✔
801
{
2,406✔
802
    m_db->enable_wait_for_change();
2,406✔
803
}
2,406✔
804

2,406✔
805
bool RealmCoordinator::wait_for_change(std::shared_ptr<Transaction> tr)
62,454✔
806
{
30,572✔
807
    return m_db->wait_for_change(tr);
62,454✔
808
}
26✔
809

26✔
810
void RealmCoordinator::wait_for_change_release()
30,572✔
811
{
30,572✔
812
    m_db->wait_for_change_release();
62,454✔
813
}
814

815
bool RealmCoordinator::can_advance(Realm& realm)
816
{
176,370✔
817
    return realm.last_seen_transaction_version() != m_db->get_version_of_latest_snapshot();
176,370✔
818
}
176,370✔
819

820
// Thread-safety analysis doesn't reasonably handle calling functions on different
821
// instances of this type
822
void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier) NO_THREAD_SAFETY_ANALYSIS
823
{
73,403✔
824
    auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
73,403✔
825
    {
73,403✔
826
        util::CheckedLockGuard lock(self.m_notifier_mutex);
73,403✔
827
        notifier->set_initial_transaction(self.m_new_notifiers);
73,403✔
828
        self.m_new_notifiers.push_back(std::move(notifier));
73,403✔
829
    }
73,403✔
830
}
98,643✔
831

25,240✔
832
void RealmCoordinator::clean_up_dead_notifiers()
25,240✔
833
{
608,775✔
834
    auto swap_remove = [&](auto& container) {
1,217,550✔
835
        bool did_remove = false;
1,217,550✔
836
        for (size_t i = 0; i < container.size(); ++i) {
1,710,045✔
837
            if (container[i]->is_alive())
502,977✔
838
                continue;
430,511✔
839

46,721✔
840
            // Ensure the notifier is destroyed here even if there's lingering refs
46,721✔
841
            // to the async notifier elsewhere
46,721✔
842
            container[i]->release_data();
82,948✔
843

46,721✔
844
            if (container.size() > i + 1)
82,948✔
845
                container[i] = std::move(container.back());
37,197✔
846
            container.pop_back();
72,466✔
847
            --i;
160,979✔
848
            did_remove = true;
249,492✔
849
        }
249,492✔
850
        return did_remove;
1,464,608✔
851
    };
1,287,582✔
852

373,221✔
853
    if (swap_remove(m_notifiers) && m_notifiers.empty()) {
613,946✔
854
        m_notifier_transaction = nullptr;
40,166✔
855
        m_notifier_handover_transaction = nullptr;
40,166✔
856
        m_notifier_skip_version.reset();
45,344✔
857
    }
40,166✔
858
    swap_remove(m_new_notifiers);
619,124✔
859
}
614,087✔
860

10,349✔
861
void RealmCoordinator::on_commit(DB::version_type)
10,349✔
862
{
605,239✔
863
    if (m_notifier) {
605,239✔
864
        m_notifier->notify_others();
376,969✔
865
    }
376,969✔
866
}
640,353✔
867

88,513✔
868
void RealmCoordinator::on_change()
4,999✔
869
{
332,820✔
870
#if REALM_ENABLE_SYNC
332,820✔
871
    // Invoke realm sync if another process has notified for a change
173,217✔
872
    if (m_sync_session) {
416,334✔
873
        auto version = m_db->get_version_of_latest_snapshot();
153,161✔
874
        SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
64,648✔
875
    }
64,648✔
876
#endif
409,346✔
877

249,743✔
878
    {
356,260✔
879
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
356,260✔
880
        run_async_notifiers();
409,346✔
881
    }
327,821✔
882

168,218✔
883
    util::CheckedLockGuard lock(m_realm_mutex);
374,371✔
884
    for (auto& realm : m_weak_realm_notifiers) {
510,857✔
885
        realm.notify();
488,300✔
886
    }
510,857✔
887
}
336,926✔
888

9,105✔
889
void RealmCoordinator::run_async_notifiers()
9,105✔
890
{
375,454✔
891
    util::CheckedUniqueLock lock(m_notifier_mutex);
352,897✔
892

215,607✔
893
    clean_up_dead_notifiers();
375,454✔
894

215,607✔
895
    if (m_notifiers.empty() && m_new_notifiers.empty()) {
375,454✔
896
        REALM_ASSERT(!m_notifier_skip_version);
220,153✔
897
        return;
242,710✔
898
    }
261,842✔
899

138,598✔
900
    if (!m_notifier_transaction) {
198,426✔
901
        REALM_ASSERT(m_notifiers.empty());
82,062✔
902
        REALM_ASSERT(!m_notifier_skip_version);
35,512✔
903
        m_notifier_transaction = m_db->start_read();
35,512✔
904
    }
82,202✔
905

119,606✔
906
    // We need to pick the final version to advance to while the lock is held
97,012✔
907
    // as otherwise if a commit is made while new notifiers are being advanced
119,606✔
908
    // we could end up advancing over the skip version. We create a transaction
97,012✔
909
    // object for it to make sure the version stays valid
119,606✔
910
    TransactionRef newest_transaction = m_db->start_read();
160,571✔
911
    VersionID version = newest_transaction->get_version_of_current_transaction();
160,571✔
912

100,743✔
913
    auto skip_version = std::move(m_notifier_skip_version);
143,236✔
914

91,779✔
915
    // Make a copy of the notifiers vector and then release the lock to avoid
77,988✔
916
    // blocking other threads trying to register or unregister notifiers while we run them
77,988✔
917
    decltype(m_notifiers) notifiers;
137,816✔
918
    if (version != m_notifier_transaction->get_version_of_current_transaction()) {
137,816✔
919
        // We only want to rerun the existing notifiers if the version has changed.
52,721✔
920
        // This is both a minor optimization and required for notification
52,721✔
921
        // skipping to work. The skip logic assumes that the notifier can't be
52,721✔
922
        // running when suppress_next() is called because it can only be called
52,721✔
923
        // from within a write transaction, and starting the write transaction
52,721✔
924
        // would have blocked until the notifier is done running. However, if we
61,092✔
925
        // run the notifiers at a point where the version isn't changing, that
61,092✔
926
        // could happen concurrently with a call to suppress_next(), and we
52,721✔
927
        // could unset skip_next on a callback from that zero-version run
61,092✔
928
        // rather than the intended one.
52,721✔
929
        //
52,721✔
930
        // Spurious wakeups can happen in a few ways: adding a new notifier,
52,721✔
931
        // adding a new notifier in a different process sharing this Realm file,
61,092✔
932
        // closing the Realm in a different process, and possibly some other cases.
61,092✔
933
        notifiers = m_notifiers;
79,189✔
934
    }
79,189✔
935
    else {
65,107✔
936
        REALM_ASSERT(!skip_version);
65,107✔
937
        if (m_new_notifiers.empty()) {
65,107✔
938
            // We were spuriously woken up and there isn't actually anything to do
18,455✔
939
            return;
29,104✔
940
        }
29,104✔
941
    }
115,192✔
942

66,013✔
943
    auto new_notifiers = std::move(m_new_notifiers);
115,192✔
944
    m_new_notifiers.clear();
115,192✔
945
    m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
115,192✔
946
    lock.unlock();
115,192✔
947

70,532✔
948
    // Advance all of the new notifiers to the most recent version, if any
70,532✔
949
    std::vector<TransactionChangeInfo> new_notifier_change_info;
117,984✔
950
    if (!new_notifiers.empty()) {
117,984✔
951
        new_notifier_change_info.reserve(new_notifiers.size());
44,586✔
952
        for (auto& notifier : new_notifiers) {
75,049✔
953
            if (notifier->version() == version)
76,328✔
954
                continue;
73,808✔
955
            new_notifier_change_info.emplace_back();
17,954✔
956
            notifier->add_required_change_info(new_notifier_change_info.back());
10,862✔
957
            transaction::parse(*newest_transaction, new_notifier_change_info.back(), notifier->version().version,
17,954✔
958
                               version.version);
17,954✔
959
        }
17,954✔
960
    }
51,452✔
961

68,579✔
962
    // If the skip version is set and we have more than one version to process,
68,579✔
963
    // we need to start with just the skip version so that any suppressed
75,671✔
964
    // callbacks can ignore the changes from it without missing changes from
75,671✔
965
    // later versions. If the skip version is set and there aren't any more
65,378✔
966
    // versions after it, we just want to process with normal processing. See
70,647✔
967
    // the above note about spurious wakeups for why this is required for
70,647✔
968
    // correctness and not just a very minor optimization.
70,287✔
969
    if (skip_version && skip_version->get_version_of_current_transaction() != version) {
109,776✔
970
        REALM_ASSERT(!notifiers.empty());
388✔
971
        REALM_ASSERT(version >= skip_version->get_version_of_current_transaction());
388✔
972
        TransactionChangeInfo info;
388✔
973
        for (auto& notifier : notifiers)
388✔
974
            notifier->add_required_change_info(info);
5,169✔
975
        transaction::advance(*m_notifier_transaction, info, skip_version->get_version_of_current_transaction());
8,370✔
976
        for (auto& notifier : notifiers)
8,370✔
977
            notifier->run();
8,370✔
978

8,356✔
979
        util::CheckedLockGuard lock(m_notifier_mutex);
8,370✔
980
        for (auto& notifier : notifiers)
8,370✔
981
            notifier->prepare_handover();
8,370✔
982
    }
8,370✔
983

75,671✔
984
    // Advance the non-new notifiers to the same version as we advanced the new
60,241✔
985
    // ones to (or the latest if there were no new ones)
60,241✔
986
    TransactionChangeInfo change_info;
109,420✔
987
    for (auto& notifier : notifiers) {
121,788✔
988
        notifier->add_required_change_info(change_info);
121,788✔
989
    }
121,788✔
990
    transaction::advance(*m_notifier_transaction, change_info, version);
109,420✔
991

60,241✔
992
    {
109,418✔
993
        // If there's multiple notifiers for a single collection, we only populate
60,241✔
994
        // the data for the first one during parsing and need to copy it to the
60,241✔
995
        // others. This is a reverse scan where each collection looks for the
60,241✔
996
        // first collection with the same id. It is O(N^2), but typically the
60,241✔
997
        // number of collections observed will be very small.
68,579✔
998
        auto id = [](auto const& c) {
86,695✔
999
            return std::tie(c.table_key, c.path, c.obj_key);
44,574✔
1000
        };
51,666✔
1001
        auto& collections = change_info.collections;
126,618✔
1002
        for (size_t i = collections.size(); i > 0; --i) {
154,507✔
1003
            for (size_t j = 0; j < i - 1; ++j) {
46,981✔
1004
                if (id(collections[i - 1]) == id(collections[j])) {
33,550✔
1005
                    collections[i - 1].changes->merge(CollectionChangeBuilder{*collections[j].changes});
24,568✔
1006
                    break;
31,660✔
1007
                }
24,568✔
1008
            }
26,458✔
1009
        }
36,231✔
1010
    }
117,758✔
1011

68,579✔
1012
    // Now that they're at the same version, switch the new notifiers over to
71,167✔
1013
    // the main Transaction used for background work rather than the temporary one
65,413✔
1014
    for (auto& notifier : new_notifiers) {
101,856✔
1015
        notifier->attach_to(m_notifier_transaction);
88,333✔
1016
        notifier->run();
92,317✔
1017
    }
77,153✔
1018

62,825✔
1019
    // Change info is now all ready, so the notifiers can now perform their
62,555✔
1020
    // background work
62,555✔
1021
    for (auto& notifier : notifiers) {
124,102✔
1022
        notifier->run();
124,372✔
1023
    }
125,768✔
1024

75,671✔
1025
    // Reacquire the lock while updating the fields that are actually read on
68,579✔
1026
    // other threads
68,579✔
1027
    util::CheckedLockGuard lock2(m_notifier_mutex);
117,758✔
1028
    for (auto& notifier : new_notifiers) {
110,230✔
1029
        notifier->prepare_handover();
83,309✔
1030
    }
83,309✔
1031
    for (auto& notifier : notifiers) {
132,194✔
1032
        notifier->prepare_handover();
130,126✔
1033
    }
130,126✔
1034
    clean_up_dead_notifiers();
117,758✔
1035
    if (!m_notifiers.empty())
126,618✔
1036
        m_notifier_handover_transaction = m_db->start_read(version);
126,575✔
1037
}
126,618✔
1038

8,342✔
1039
void RealmCoordinator::advance_to_ready(Realm& realm)
8,342✔
1040
{
58,938✔
1041
    // If callbacks close the Realm the last external reference may go away
36,085✔
1042
    // while we're in this function
34,201✔
1043
    auto self = shared_from_this();
61,006✔
1044
    auto tr = Realm::Internal::get_transaction_ref(realm);
61,006✔
1045
    auto current_version = tr->get_version_of_current_transaction();
67,798✔
1046

37,853✔
1047
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
67,798✔
1048

36,085✔
1049
    // Transaction which will pin the version we're packaging for deliver to,
36,085✔
1050
    // to ensure it's not cleaned up between when we release the mutex and when
36,079✔
1051
    // we actually advance (which is not done while holding a lock).
36,085✔
1052
    std::shared_ptr<Transaction> handover_version_tr;
50,596✔
1053
    {
50,596✔
1054
        util::CheckedLockGuard lock(m_notifier_mutex);
57,860✔
1055

23,646✔
1056
        // If there are any new notifiers for this Realm then by definition they
23,646✔
1057
        // haven't run yet and aren't ready
27,915✔
1058
        for (auto& notifier : m_new_notifiers) {
27,915✔
1059
            if (notifier->is_for_realm(realm))
7,264!
1060
                return;
2,995✔
1061
        }
7,264✔
1062

23,646✔
1063
        for (auto& notifier : m_notifiers) {
63,194✔
1064
            if (!notifier->is_for_realm(realm))
58,748✔
1065
                continue;
3,317✔
1066
            // If the notifier hasn't run it isn't ready and we should do nothing
37,357✔
1067
            if (!notifier->has_run())
62,695✔
1068
                return;
7,264✔
1069
            // package_for_delivery() returning false indicates that it's been
33,088✔
1070
            // unregistered but not yet cleaned up, so it effectively doesn't exist
33,088✔
1071
            if (!notifier->package_for_delivery())
58,426✔
1072
                continue;
2,995✔
1073
            notifiers.push_back(notifier);
55,431!
1074
        }
55,431✔
1075

20,651✔
1076
        handover_version_tr = m_notifier_handover_transaction;
53,591✔
1077
    }
59,231✔
1078

28,658✔
1079
    if (notifiers.empty()) {
50,642✔
1080
        // If we have no notifiers for this Realm, just advance to latest
7,060✔
1081
        return transaction::advance(tr, realm.m_binding_context.get(), {});
27,447✔
1082
    }
19,486✔
1083

22,277✔
1084
    // If we have notifiers but no transaction, then they've never run before.
22,277✔
1085
    if (!handover_version_tr)
39,071✔
1086
        return;
×
1087

25,895✔
1088
    auto notifier_version = handover_version_tr->get_version_of_current_transaction();
39,071✔
1089
    // If the most recent write was performed via the Realm instance being
20,929✔
1090
    // advanced, the notifiers can be at an older version than the Realm.
25,198✔
1091
    // This means that there's no advancing to do
25,198✔
1092
    if (notifier_version < current_version)
34,105✔
1093
        return;
7,264✔
1094

18,323✔
1095
    // We can have notifications for the current version if it's the initial
20,711✔
1096
    // notification for a newly added callback or if the write was performed
20,711✔
1097
    // on this Realm instance. There might also be a newer version but we ignore
20,540✔
1098
    // it if so.
20,540✔
1099
    if (notifier_version == current_version) {
35,597✔
1100
        if (realm.m_binding_context)
105✔
1101
            realm.m_binding_context->will_send_notifications();
2,606✔
1102
        if (realm.is_closed())
4,592✔
1103
            return;
2,606✔
1104
        for (auto& notifier : notifiers)
2,711✔
1105
            notifier->after_advance();
2,711✔
1106
        if (realm.is_closed())
4,592✔
1107
            return;
×
1108
        if (realm.m_binding_context)
2,711✔
1109
            realm.m_binding_context->did_send_notifications();
2,606✔
1110
        return;
2,711✔
1111
    }
2,711✔
1112

20,473✔
1113
    // We have notifiers for a newer version, so advance to that
22,354✔
1114
    transaction::advance(tr, realm.m_binding_context.get(),
31,016✔
1115
                         _impl::NotifierPackage(std::move(notifiers), handover_version_tr));
31,005✔
1116
}
31,016✔
1117

1118
std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
11✔
1119
{
455,513✔
1120
    auto pred = [&](auto& notifier) {
280,979✔
1121
        return notifier->is_for_realm(realm);
116,621✔
1122
    };
116,632✔
1123
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
455,502✔
1124
    std::copy_if(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(ret), pred);
455,513✔
1125
    std::copy_if(m_notifiers.begin(), m_notifiers.end(), std::back_inserter(ret), pred);
455,513✔
1126
    return ret;
458,103✔
1127
}
458,103✔
1128

4,476✔
1129
bool RealmCoordinator::advance_to_latest(Realm& realm)
4,476✔
1130
{
90,646✔
1131
    // If callbacks close the Realm the last external reference may go away
42,849✔
1132
    // while we're in this function
42,849✔
1133
    auto self = shared_from_this();
144,511✔
1134
    auto tr = Realm::Internal::get_transaction_ref(realm);
122,880✔
1135

59,676✔
1136
    NotifierVector notifiers;
102,997✔
1137
    {
144,511✔
1138
        util::CheckedUniqueLock lock(m_notifier_mutex);
144,511✔
1139
        notifiers = notifiers_for_realm(realm);
144,511✔
1140
    }
144,511✔
1141
    auto pin_tr = package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());
144,511✔
1142

42,849✔
1143
    auto prev_version = tr->get_version_of_current_transaction();
86,170✔
1144
    transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers), pin_tr));
94,946✔
1145
    return !realm.is_closed() && prev_version != tr->get_version_of_current_transaction();
90,359✔
1146
}
90,359✔
1147

8,776✔
1148
void RealmCoordinator::promote_to_write(Realm& realm)
8,776✔
1149
{
373,521✔
1150
    REALM_ASSERT(!realm.is_in_transaction());
378,108✔
1151
    // If callbacks close the Realm the last external reference may go away
189,206✔
1152
    // while we're in this function
189,206✔
1153
    auto self = shared_from_this();
378,108✔
1154

189,206✔
1155
    util::CheckedUniqueLock lock(m_notifier_mutex);
378,108✔
1156
    auto notifiers = notifiers_for_realm(realm);
373,521✔
1157
    lock.unlock();
378,108✔
1158

189,206✔
1159
    transaction::begin(Realm::Internal::get_transaction_ref(realm), realm.m_binding_context.get(),
378,108✔
1160
                       {std::move(notifiers), this});
378,108✔
1161
}
369,332✔
1162

1163
void RealmCoordinator::process_available_async(Realm& realm)
49,565✔
1164
{
425,410✔
1165
    REALM_ASSERT(!realm.is_in_transaction());
400,044✔
1166
    // If callbacks close the Realm the last external reference may go away
204,245✔
1167
    // while we're in this function
229,611✔
1168
    auto self = shared_from_this();
400,044✔
1169

229,611✔
1170
    auto current_version = realm.current_transaction_version();
425,410✔
1171
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
425,410✔
1172

204,245✔
1173
    {
425,410✔
1174
        util::CheckedLockGuard lock(m_notifier_mutex);
425,410✔
1175
        // No handover transaction means there can't be anything waiting to deliver
229,611✔
1176
        if (!m_notifier_handover_transaction)
375,845✔
1177
            return;
324,815✔
1178
        // If we have a read transaction, it needs to be an exact match in version
74,978✔
1179
        // to the notifications as we're only delivering initial notifications
74,978✔
1180
        // and not advancing.
49,133✔
1181
        if (current_version &&
74,676✔
1182
            current_version != m_notifier_handover_transaction->get_version_of_current_transaction())
100,486✔
1183
            return;
23,917✔
1184

74,861✔
1185
        for (auto& notifier : m_notifiers) {
155,993✔
1186
            if (!notifier->is_for_realm(realm) || !notifier->has_run() || !notifier->package_for_delivery())
130,148✔
1187
                continue;
50,127✔
1188
            notifiers.push_back(notifier);
155,357✔
1189
        }
129,512✔
1190
    }
100,250✔
1191
    if (notifiers.empty())
92,961✔
1192
        return;
3,944✔
1193

28,829✔
1194
    if (realm.m_binding_context)
54,093✔
1195
        realm.m_binding_context->will_send_notifications();
7,317✔
1196
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
57,738✔
1197
        return;
51✔
1198
    for (auto& notifier : notifiers)
54,064✔
1199
        notifier->after_advance();
121,065✔
1200
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
65,653✔
1201
        return;
101✔
1202
    if (realm.m_binding_context)
65,552✔
1203
        realm.m_binding_context->did_send_notifications();
15,126✔
1204
}
57,678✔
1205

7,252✔
1206
TransactionRef RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::version_type target_version)
42✔
1207
{
135,297✔
1208
    auto ready = [&] {
148,626✔
1209
        util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
141,420✔
1210
        bool up_to_date =
148,626✔
1211
            m_notifier_handover_transaction &&
141,418✔
1212
            m_notifier_handover_transaction->get_version_of_current_transaction().version >= target_version;
107,313✔
1213
        return std::all_of(begin(notifiers), end(notifiers), [&](auto const& n) {
125,419✔
1214
            return !n->have_callbacks() || (n->has_run() && up_to_date);
87,441✔
1215
        });
80,235✔
1216
    };
148,622✔
1217

65,543✔
1218
    if (!ready()) {
138,902✔
1219
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
9,720✔
1220
        // The worker thread may have run the notifiers we need while we were
8,883✔
1221
        // waiting for the lock, so re-check
24,163✔
1222
        if (!ready())
26,448✔
1223
            run_async_notifiers();
17,811✔
1224
    }
26,448✔
1225

82,271✔
1226
    util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
144,212✔
1227
    // If the notifiers are still out of date, that means none of them have callbacks
79,516✔
1228
    // so we don't want to block the calling thread to run them.
77,162✔
1229
    if (!m_notifier_handover_transaction ||
143,315✔
1230
        m_notifier_handover_transaction->get_version_of_current_transaction().version < target_version) {
123,292✔
1231
        notifiers.clear();
88,415✔
1232
        return nullptr;
96,264✔
1233
    }
82,432✔
1234

26,927✔
1235
    auto package = [&](auto& notifier) {
71,799✔
1236
        return !notifier->has_run() || !notifier->package_for_delivery();
71,900✔
1237
    };
70,592✔
1238
    notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
52,160✔
1239
    return notifiers.empty() ? nullptr : m_notifier_handover_transaction;
58,059✔
1240
}
65,992✔
1241

7,431✔
1242
bool RealmCoordinator::compact()
7,431✔
1243
{
15,294✔
1244
    return m_db->compact();
11,614✔
1245
}
7,952✔
1246

7,938✔
1247
void RealmCoordinator::write_copy(StringData path, const char* key)
7,938✔
1248
{
3,802✔
1249
    m_db->write_copy(path, key);
10,302✔
1250
}
10,302✔
1251

10,162✔
1252
void RealmCoordinator::async_request_write_mutex(Realm& realm)
7,342✔
1253
{
11,096✔
1254
    auto tr = Realm::Internal::get_transaction_ref(realm);
11,108✔
1255
    m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
3,519✔
1256
        auto& scheduler = *realm->scheduler();
3,463✔
1257
        scheduler.invoke([realm = std::move(realm)] {
3,465✔
1258
            Realm::Internal::run_writes(*realm);
3,465✔
1259
        });
3,465✔
1260
    });
3,463✔
1261
}
3,766✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc