• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2213

10 Apr 2024 11:21PM UTC coverage: 91.792% (-0.8%) from 92.623%
2213

push

Evergreen

web-flow
Add missing availability checks for SecCopyErrorMessageString (#7577)

This requires iOS 11.3 and we currently target iOS 11.

94842 of 175770 branches covered (53.96%)

7 of 22 new or added lines in 2 files covered. (31.82%)

1861 existing lines in 82 files now uncovered.

242866 of 264583 relevant lines covered (91.79%)

5593111.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.29
/src/realm/object-store/impl/realm_coordinator.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2015 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/object-store/impl/realm_coordinator.hpp>
20

21
#include <realm/object-store/impl/collection_notifier.hpp>
22
#include <realm/object-store/impl/external_commit_helper.hpp>
23
#include <realm/object-store/impl/transact_log_handler.hpp>
24
#include <realm/object-store/impl/weak_realm_notifier.hpp>
25
#include <realm/object-store/audit.hpp>
26
#include <realm/object-store/binding_context.hpp>
27
#include <realm/object-store/object_schema.hpp>
28
#include <realm/object-store/object_store.hpp>
29
#include <realm/object-store/property.hpp>
30
#include <realm/object-store/schema.hpp>
31
#include <realm/object-store/thread_safe_reference.hpp>
32
#include <realm/object-store/util/scheduler.hpp>
33

34
#if REALM_ENABLE_SYNC
35
#include <realm/object-store/sync/async_open_task.hpp>
36
#include <realm/object-store/sync/sync_manager.hpp>
37
#include <realm/object-store/sync/sync_session.hpp>
38
#include <realm/object-store/sync/sync_user.hpp>
39
#include <realm/sync/history.hpp>
40
#include <realm/sync/noinst/client_history_impl.hpp>
41
#endif
42

43
#include <realm/db.hpp>
44
#include <realm/history.hpp>
45
#include <realm/string_data.hpp>
46
#include <realm/util/fifo_helper.hpp>
47
#include <realm/sync/config.hpp>
48

49
#include <algorithm>
50
#include <unordered_map>
51

52
using namespace realm;
53
using namespace realm::_impl;
54

55
static auto& s_coordinator_mutex = *new std::mutex;
56
static auto& s_coordinators_per_path = *new std::unordered_map<std::string, std::weak_ptr<RealmCoordinator>>;
57

58
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_coordinator(StringData path)
59
{
52,697✔
60
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
52,697✔
61

25,280✔
62
    auto& weak_coordinator = s_coordinators_per_path[path];
52,697✔
63
    if (auto coordinator = weak_coordinator.lock()) {
52,697✔
64
        return coordinator;
25,876✔
65
    }
25,876✔
66

13,235✔
67
    auto coordinator = std::make_shared<RealmCoordinator>(Private());
26,821✔
68
    weak_coordinator = coordinator;
26,821✔
69
    return coordinator;
26,821✔
70
}
26,821✔
71

72
std::shared_ptr<RealmCoordinator>
73
RealmCoordinator::get_coordinator(const Realm::Config& config) NO_THREAD_SAFETY_ANALYSIS
74
{
110✔
75
    auto coordinator = get_coordinator(config.path);
110✔
76
    util::CheckedLockGuard lock(coordinator->m_realm_mutex);
110✔
77
    coordinator->set_config(config);
110✔
78
    coordinator->open_db();
110✔
79
    return coordinator;
110✔
80
}
110✔
81

82
std::shared_ptr<RealmCoordinator> RealmCoordinator::get_existing_coordinator(StringData path)
83
{
14✔
84
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
14✔
85
    return s_coordinators_per_path[path].lock();
14✔
86
}
14✔
87

88
void RealmCoordinator::set_config(const Realm::Config& config)
89
{
33,213✔
90
    if (config.encryption_key.data() && config.encryption_key.size() != 64)
33,213✔
91
        throw InvalidEncryptionKey();
2✔
92
    if (config.schema_mode == SchemaMode::Immutable && config.sync_config)
33,211✔
UNCOV
93
        throw InvalidArgument(ErrorCodes::IllegalCombination,
×
94
                              "Synchronized Realms cannot be opened in immutable mode");
×
95
    if ((config.schema_mode == SchemaMode::AdditiveDiscovered ||
33,211✔
96
         config.schema_mode == SchemaMode::AdditiveExplicit) &&
33,166✔
97
        config.migration_function)
20,895✔
98
        throw InvalidArgument(ErrorCodes::IllegalCombination,
4✔
99
                              "Realms opened in Additive-only schema mode do not use a migration function");
4✔
100
    if (config.schema_mode == SchemaMode::Immutable && config.migration_function)
33,207✔
101
        throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
102
                              "Realms opened in immutable mode do not use a migration function");
2✔
103
    if (config.schema_mode == SchemaMode::ReadOnly && config.migration_function)
33,205✔
104
        throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
105
                              "Realms opened in read-only mode do not use a migration function");
2✔
106
    if (config.schema_mode == SchemaMode::Immutable && config.initialization_function)
33,203✔
107
        throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
108
                              "Realms opened in immutable mode do not use an initialization function");
2✔
109
    if (config.schema_mode == SchemaMode::ReadOnly && config.initialization_function)
33,201✔
110
        throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
111
                              "Realms opened in read-only mode do not use an initialization function");
2✔
112
    if (config.schema && config.schema_version == ObjectStore::NotVersioned)
33,199✔
113
        throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
114
                              "A schema version must be specified when the schema is specified");
2✔
115
    if (!config.realm_data.is_null() && (!config.immutable() || !config.in_memory))
33,197✔
116
        throw InvalidArgument(
2✔
117
            ErrorCodes::IllegalCombination,
2✔
118
            "In-memory realms initialized from memory buffers can only be opened in read-only mode");
2✔
119
    if (!config.realm_data.is_null() && !config.path.empty())
33,195✔
120
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Specifying both memory buffer and path is invalid");
2✔
121
    if (!config.realm_data.is_null() && !config.encryption_key.empty())
33,193✔
122
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Memory buffers do not support encryption");
2✔
123
    if (config.in_memory && !config.encryption_key.empty()) {
33,191✔
124
        throw InvalidArgument(ErrorCodes::IllegalCombination, "Encryption is not supported for in-memory realms");
2✔
125
    }
2✔
126
    // ResetFile also won't use the migration function, but specifying one is
16,045✔
127
    // allowed to simplify temporarily switching modes during development
16,045✔
128

16,045✔
129
#if REALM_ENABLE_SYNC
33,189✔
130
    if (config.sync_config) {
33,189✔
131
        if (config.sync_config->flx_sync_requested && !config.sync_config->partition_value.empty()) {
1,773✔
132
            throw InvalidArgument(ErrorCodes::IllegalCombination,
2✔
133
                                  "Cannot specify a partition value when flexible sync is enabled");
2✔
134
        }
2✔
135
        if (!config.sync_config->user) {
1,771✔
UNCOV
136
            throw InvalidArgument(ErrorCodes::IllegalCombination,
×
UNCOV
137
                                  "A user must be provided to open a synchronized Realm.");
×
UNCOV
138
        }
×
139
    }
33,187✔
140
#endif
33,187✔
141

16,044✔
142
    bool no_existing_realm =
33,187✔
143
        std::all_of(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [](auto& notifier) {
19,592✔
144
            return notifier.expired();
6,350✔
145
        });
6,350✔
146
    if (no_existing_realm) {
33,187✔
147
        m_config = config;
26,838✔
148
        m_config.scheduler = nullptr;
26,838✔
149
    }
26,838✔
150
    else {
6,349✔
151
        if (m_config.immutable() != config.immutable()) {
6,349✔
152
            throw LogicError(
×
UNCOV
153
                ErrorCodes::MismatchedConfig,
×
UNCOV
154
                util::format("Realm at path '%1' already opened with different read permissions.", config.path));
×
UNCOV
155
        }
×
156
        if (m_config.in_memory != config.in_memory) {
6,349✔
157
            throw LogicError(
2✔
158
                ErrorCodes::MismatchedConfig,
2✔
159
                util::format("Realm at path '%1' already opened with different inMemory settings.", config.path));
2✔
160
        }
2✔
161
        if (m_config.encryption_key != config.encryption_key) {
6,347✔
162
            throw LogicError(
×
UNCOV
163
                ErrorCodes::MismatchedConfig,
×
UNCOV
164
                util::format("Realm at path '%1' already opened with a different encryption key.", config.path));
×
UNCOV
165
        }
×
166
        if (m_config.schema_mode != config.schema_mode) {
6,347✔
167
            throw LogicError(
2✔
168
                ErrorCodes::MismatchedConfig,
2✔
169
                util::format("Realm at path '%1' already opened with a different schema mode.", config.path));
2✔
170
        }
2✔
171
        util::CheckedLockGuard lock(m_schema_cache_mutex);
6,345✔
172
        if (config.schema && m_schema_version != ObjectStore::NotVersioned &&
6,345✔
173
            m_schema_version != config.schema_version) {
5,992✔
174
            throw LogicError(
2✔
175
                ErrorCodes::MismatchedConfig,
2✔
176
                util::format("Realm at path '%1' already opened with different schema version.", config.path));
2✔
177
        }
2✔
178

2,799✔
179
#if REALM_ENABLE_SYNC
6,343✔
180
        if (bool(m_config.sync_config) != bool(config.sync_config)) {
6,343✔
181
            throw LogicError(
×
UNCOV
182
                ErrorCodes::MismatchedConfig,
×
UNCOV
183
                util::format("Realm at path '%1' already opened with different sync configurations.", config.path));
×
UNCOV
184
        }
×
185

2,799✔
186
        if (config.sync_config) {
6,343✔
187
            auto old_user = m_config.sync_config->user;
306✔
188
            auto new_user = config.sync_config->user;
306✔
189
            if (old_user != new_user) {
306✔
190
                throw LogicError(
×
UNCOV
191
                    ErrorCodes::MismatchedConfig,
×
UNCOV
192
                    util::format("Realm at path '%1' already opened with different sync user.", config.path));
×
193
            }
×
194
            if (m_config.sync_config->partition_value != config.sync_config->partition_value) {
306✔
195
                throw LogicError(
×
196
                    ErrorCodes::MismatchedConfig,
×
UNCOV
197
                    util::format("Realm at path '%1' already opened with different partition value.", config.path));
×
198
            }
×
199
            if (m_config.sync_config->flx_sync_requested != config.sync_config->flx_sync_requested) {
306✔
200
                throw LogicError(ErrorCodes::MismatchedConfig,
×
201
                                 util::format("Realm at path '%1' already opened in a different synchronization mode",
×
UNCOV
202
                                              config.path));
×
UNCOV
203
            }
×
204
        }
6,343✔
205
#endif
6,343✔
206
        // Mixing cached and uncached Realms is allowed
2,799✔
207
        m_config.cache = config.cache;
6,343✔
208

2,799✔
209
        // Realm::update_schema() handles complaining about schema mismatches
2,799✔
210
    }
6,343✔
211
}
33,187✔
212

213
std::shared_ptr<Realm> RealmCoordinator::get_cached_realm(Realm::Config const& config,
214
                                                          std::shared_ptr<util::Scheduler> scheduler)
215
{
192✔
216
    if (!config.cache)
192✔
217
        return nullptr;
184✔
218
    util::CheckedUniqueLock lock(m_realm_mutex);
8✔
219
    return do_get_cached_realm(config, scheduler);
8✔
220
}
8✔
221

222
std::shared_ptr<Realm> RealmCoordinator::do_get_cached_realm(Realm::Config const& config,
223
                                                             std::shared_ptr<util::Scheduler> scheduler)
224
{
34,543✔
225
    if (!config.cache)
34,543✔
226
        return nullptr;
33,729✔
227

30✔
228
    if (!scheduler) {
814✔
229
        scheduler = config.scheduler;
798✔
230
    }
798✔
231

30✔
232
    if (!scheduler)
814✔
UNCOV
233
        return nullptr;
×
234

30✔
235
    for (auto& cached_realm : m_weak_realm_notifiers) {
4,236✔
236
        if (!cached_realm.is_cached_for_scheduler(scheduler))
4,234✔
237
            continue;
3,610✔
238
        // can be null if we jumped in between ref count hitting zero and
10✔
239
        // unregister_realm() getting the lock
10✔
240
        if (auto realm = cached_realm.realm()) {
624✔
241
            // If the file is uninitialized and was opened without a schema,
10✔
242
            // do the normal schema init
10✔
243
            if (realm->schema_version() == ObjectStore::NotVersioned)
624✔
244
                break;
4✔
245

8✔
246
            // Otherwise if we have a realm schema it needs to be an exact
8✔
247
            // match (even having the same properties but in different
8✔
248
            // orders isn't good enough)
8✔
249
            if (config.schema && realm->schema() != *config.schema)
620✔
250
                throw LogicError(
×
251
                    ErrorCodes::MismatchedConfig,
×
UNCOV
252
                    util::format("Realm at path '%1' already opened on current thread with different schema.",
×
UNCOV
253
                                 config.path));
×
254

8✔
255
            return realm;
620✔
256
        }
620✔
257
    }
624✔
258
    return nullptr;
202✔
259
}
814✔
260

261
std::shared_ptr<Realm> RealmCoordinator::get_realm(Realm::Config config, util::Optional<VersionID> version)
262
{
32,947✔
263
    REALM_ASSERT(!version || *version != VersionID());
32,947✔
264
    if (!config.scheduler)
32,947✔
265
        config.scheduler = version ? util::Scheduler::make_frozen(*version) : util::Scheduler::make_default();
23,610✔
266
    // realm must be declared before lock so that the mutex is released before
15,924✔
267
    // we release the strong reference to realm, as Realm's destructor may want
15,924✔
268
    // to acquire the same lock
15,924✔
269
    std::shared_ptr<Realm> realm;
32,947✔
270
    util::CheckedUniqueLock lock(m_realm_mutex);
32,947✔
271
    set_config(config);
32,947✔
272
    if ((realm = do_get_cached_realm(config))) {
32,947✔
273
        REALM_ASSERT(!version || realm->read_transaction_version() == *version);
616✔
274
        return realm;
616✔
275
    }
616✔
276
    do_get_realm(std::move(config), realm, version, lock);
32,331✔
277
    if (version) {
32,331✔
278
        realm->read_group();
352✔
279
    }
352✔
280
    return realm;
32,331✔
281
}
32,331✔
282

283
std::shared_ptr<Realm> RealmCoordinator::get_realm(std::shared_ptr<util::Scheduler> scheduler, bool first_time_open)
284
{
970✔
285
    std::shared_ptr<Realm> realm;
970✔
286
    util::CheckedUniqueLock lock(m_realm_mutex);
970✔
287
    auto config = m_config;
970✔
288
    config.scheduler = scheduler ? scheduler : util::Scheduler::make_default();
936✔
289
    if ((realm = do_get_cached_realm(config))) {
970✔
UNCOV
290
        return realm;
×
UNCOV
291
    }
×
292
    do_get_realm(std::move(config), realm, none, lock, first_time_open);
970✔
293
    return realm;
970✔
294
}
970✔
295

296
std::shared_ptr<Realm> RealmCoordinator::freeze_realm(const Realm& source_realm)
297
{
650✔
298
    std::shared_ptr<Realm> realm;
650✔
299
    util::CheckedUniqueLock lock(m_realm_mutex);
650✔
300

325✔
301
    auto version = source_realm.read_transaction_version();
650✔
302
    auto scheduler = util::Scheduler::make_frozen(version);
650✔
303
    if ((realm = do_get_cached_realm(source_realm.config(), scheduler))) {
650✔
304
        return realm;
2✔
305
    }
2✔
306

324✔
307
    auto config = source_realm.config();
648✔
308
    config.scheduler = scheduler;
648✔
309
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
648✔
310
    Realm::Internal::copy_schema(*realm, source_realm);
648✔
311
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
648✔
312
    return realm;
648✔
313
}
648✔
314

315
ThreadSafeReference RealmCoordinator::get_unbound_realm()
316
{
236✔
317
    std::shared_ptr<Realm> realm;
236✔
318
    util::CheckedUniqueLock lock(m_realm_mutex);
236✔
319
    do_get_realm(RealmConfig(m_config), realm, none, lock);
236✔
320
    return ThreadSafeReference(realm);
236✔
321
}
236✔
322

323
void RealmCoordinator::do_get_realm(RealmConfig&& config, std::shared_ptr<Realm>& realm,
324
                                    util::Optional<VersionID> version, util::CheckedUniqueLock& realm_lock,
325
                                    bool first_time_open)
326
{
33,505✔
327
    const auto db_created = open_db();
33,505✔
328
#ifdef REALM_ENABLE_SYNC
33,505✔
329
    SyncConfig::SubscriptionInitializerCallback subscription_function = nullptr;
33,505✔
330
    bool rerun_on_open = false;
33,505✔
331
    if (config.sync_config && config.sync_config->flx_sync_requested &&
33,505✔
332
        config.sync_config->subscription_initializer) {
16,834✔
333
        subscription_function = config.sync_config->subscription_initializer;
144✔
334
        rerun_on_open = config.sync_config->rerun_init_subscription_on_open;
144✔
335
    }
144✔
336
#else
337
    static_cast<void>(first_time_open);
338
    static_cast<void>(db_created);
339
#endif
340

16,505✔
341
    auto schema = std::move(config.schema);
33,505✔
342
    auto migration_function = std::move(config.migration_function);
33,505✔
343
    auto initialization_function = std::move(config.initialization_function);
33,505✔
344
    config.schema = {};
33,505✔
345

16,505✔
346
    realm = Realm::make_shared_realm(std::move(config), version, shared_from_this());
33,505✔
347
    m_weak_realm_notifiers.emplace_back(realm, config.cache);
33,505✔
348

16,505✔
349
#ifdef REALM_ENABLE_SYNC
33,505✔
350
    if (m_sync_session && m_sync_session->user()->is_logged_in())
33,505✔
351
        m_sync_session->revive_if_needed();
1,784✔
352

16,505✔
353
    if (realm->config().audit_config) {
33,505✔
354
        if (m_audit_context)
78✔
355
            m_audit_context->update_metadata(realm->config().audit_config->metadata);
4✔
356
        else
74✔
357
            m_audit_context = make_audit_context(m_db, realm->config());
74✔
358
    }
78✔
359
#else
360
    if (realm->config().audit_config)
361
        REALM_TERMINATE("Cannot use Audit interface if Realm Core is built without Sync");
362
#endif
363

16,505✔
364
    // Cached frozen Realms need to initialize their schema before releasing
16,505✔
365
    // the lock as otherwise they could be read from the cache on another thread
16,505✔
366
    // before the schema initialization happens. They'll never perform a write
16,505✔
367
    // transaction, so unlike with live Realms this is safe to do.
16,505✔
368
    if (config.cache && version && schema) {
33,505!
369
        realm->update_schema(std::move(*schema));
×
UNCOV
370
        schema.reset();
×
UNCOV
371
    }
×
372

16,505✔
373
    realm_lock.unlock_unchecked();
33,505✔
374
    if (schema) {
33,505✔
375
        realm->update_schema(std::move(*schema), config.schema_version, std::move(migration_function),
27,191✔
376
                             std::move(initialization_function));
27,191✔
377
    }
27,191✔
378

16,505✔
379
#ifdef REALM_ENABLE_SYNC
33,505✔
380
    // run subscription initializer if the SDK has instructed core to do so. The subscription callback will be run if:
16,505✔
381
    // 1. this is the first time we are creating the realm file
16,505✔
382
    // 2. the database was already created, but this is the first time we are opening the db and the flag
16,505✔
383
    // rerun_on_open was set
16,505✔
384
    if (subscription_function) {
33,505✔
385
        const auto current_subscription = realm->get_latest_subscription_set();
140✔
386
        const auto subscription_version = current_subscription.version();
140✔
387
        // in case we are hitting this check while during a normal open, we need to take in
70✔
388
        // consideration if the db was created during this call. Since this may be the first time
70✔
389
        // we are actually creating a realm. For async open this does not apply, infact db_created
70✔
390
        // will always be false.
70✔
391
        if (!first_time_open)
140✔
392
            first_time_open = db_created;
101✔
393
        if (subscription_version == 0 || (first_time_open && rerun_on_open)) {
140✔
394
            bool was_in_read = realm->is_in_read_transaction();
74✔
395
            subscription_function(realm);
74✔
396
            if (!was_in_read)
74✔
397
                realm->invalidate();
74✔
398
        }
74✔
399
    }
140✔
400
#endif
33,505✔
401
}
33,505✔
402

403
void RealmCoordinator::bind_to_context(Realm& realm)
404
{
190✔
405
    util::CheckedLockGuard lock(m_realm_mutex);
190✔
406
    for (auto& cached_realm : m_weak_realm_notifiers) {
260✔
407
        if (!cached_realm.is_for_realm(&realm))
260✔
408
            continue;
70✔
409
        cached_realm.bind_to_scheduler();
190✔
410
        return;
190✔
411
    }
190✔
412
    REALM_TERMINATE("Invalid Realm passed to bind_to_context()");
UNCOV
413
}
×
414

415
#if REALM_ENABLE_SYNC
416
std::shared_ptr<AsyncOpenTask> RealmCoordinator::get_synchronized_realm(Realm::Config config)
417
{
156✔
418
    if (!config.sync_config)
156✔
UNCOV
419
        throw LogicError(ErrorCodes::IllegalOperation,
×
UNCOV
420
                         "This method is only available for fully synchronized Realms.");
×
421

78✔
422
    util::CheckedLockGuard lock(m_realm_mutex);
156✔
423
    set_config(config);
156✔
424
    const auto db_open_first_time = open_db();
156✔
425
    return std::make_shared<AsyncOpenTask>(AsyncOpenTask::Private(), shared_from_this(), m_sync_session,
156✔
426
                                           db_open_first_time);
156✔
427
}
156✔
428

429
#endif
430

431
bool RealmCoordinator::open_db()
432
{
33,791✔
433
    if (m_db)
33,791✔
434
        return false;
6,987✔
435

13,226✔
436
#if REALM_ENABLE_SYNC
26,804✔
437
    if (m_config.sync_config) {
26,804✔
438
        REALM_ASSERT(m_config.sync_config->user);
1,411✔
439
        // If we previously opened this Realm, we may have a lingering sync
627✔
440
        // session which outlived its RealmCoordinator. If that happens we
627✔
441
        // want to reuse it instead of creating a new DB.
627✔
442
        if (auto sync_manager = m_config.sync_config->user->sync_manager()) {
1,411✔
443
            m_sync_session = sync_manager->get_existing_session(m_config.path);
1,406✔
444
        }
1,406✔
445
        if (m_sync_session) {
1,411✔
446
            m_db = SyncSession::Internal::get_db(*m_sync_session);
26✔
447
            init_external_helpers();
26✔
448
            return false;
26✔
449
        }
26✔
450
    }
26,778✔
451
#endif
26,778✔
452

13,213✔
453
    bool server_synchronization_mode = m_config.sync_config || m_config.force_sync_history;
26,778✔
454
    bool schema_mode_reset_file =
26,778✔
455
        m_config.schema_mode == SchemaMode::SoftResetFile || m_config.schema_mode == SchemaMode::HardResetFile;
26,778✔
456
    try {
26,778✔
457
        if (m_config.immutable() && m_config.realm_data) {
26,778✔
458
            m_db = DB::create(m_config.realm_data, false);
2✔
459
            return true;
2✔
460
        }
2✔
461
        std::unique_ptr<Replication> history;
26,776✔
462
        if (server_synchronization_mode) {
26,776✔
463
#if REALM_ENABLE_SYNC
8,499✔
464
            bool apply_server_changes = !m_config.sync_config || m_config.sync_config->apply_server_changes;
8,499✔
465
            history = std::make_unique<sync::ClientReplication>(apply_server_changes);
8,499✔
466
#else
467
            REALM_TERMINATE("Realm was not built with sync enabled");
468
#endif
469
        }
8,499✔
470
        else if (!m_config.immutable()) {
18,277✔
471
            history = make_in_realm_history();
18,217✔
472
        }
18,217✔
473

13,212✔
474
        DBOptions options;
26,776✔
475
#ifndef __EMSCRIPTEN__
26,776✔
476
        options.enable_async_writes = true;
26,776✔
477
#endif
26,776✔
478
        options.durability = m_config.in_memory ? DBOptions::Durability::MemOnly : DBOptions::Durability::Full;
21,120✔
479
        options.is_immutable = m_config.immutable();
26,776✔
480
        options.logger = util::Logger::get_default_logger();
26,776✔
481

13,212✔
482
        if (!m_config.fifo_files_fallback_path.empty()) {
26,776✔
483
            options.temp_dir = util::normalize_dir(m_config.fifo_files_fallback_path);
4✔
484
        }
4✔
485
        options.encryption_key = m_config.encryption_key.data();
26,776✔
486
        options.allow_file_format_upgrade = !m_config.disable_format_upgrade && !schema_mode_reset_file;
26,776✔
487
        options.clear_on_invalid_file = m_config.clear_on_invalid_file;
26,776✔
488
        if (history) {
26,776✔
489
            options.backup_at_file_format_change = m_config.backup_at_file_format_change;
26,716✔
490
#ifdef __EMSCRIPTEN__
491
            // Force the DB to be created in memory-only mode, ignoring the filesystem path supplied in the config.
492
            // This is so we can run an SDK on top without having to solve the browser persistence problem yet,
493
            // or teach RealmConfig and SDKs about pure in-memory realms.
494
            m_db = DB::create_in_memory(std::move(history), m_config.path, options);
495
#else
496
            if (m_config.path.size()) {
26,716✔
497
                m_db = DB::create(std::move(history), m_config.path, options);
18,246✔
498
            }
18,246✔
499
            else {
8,470✔
500
                m_db = DB::create(std::move(history), options);
8,470✔
501
            }
8,470✔
502
#endif
26,716✔
503
        }
26,716✔
504
        else {
60✔
505
            options.no_create = true;
60✔
506
            m_db = DB::create(m_config.path, options);
60✔
507
        }
60✔
508
    }
26,776✔
509
    catch (realm::FileFormatUpgradeRequired const&) {
13,213✔
510
        if (!schema_mode_reset_file) {
×
511
            throw;
×
512
        }
×
513
        util::File::remove(m_config.path);
×
514
        return open_db();
×
515
    }
×
516
    catch (UnsupportedFileFormatVersion const&) {
×
517
        if (!schema_mode_reset_file) {
×
UNCOV
518
            throw;
×
UNCOV
519
        }
×
UNCOV
520
        util::File::remove(m_config.path);
×
UNCOV
521
        return open_db();
×
UNCOV
522
    }
×
523

13,206✔
524
    if (m_config.should_compact_on_launch_function) {
26,764✔
525
        size_t free_space = 0;
16✔
526
        size_t used_space = 0;
16✔
527
        if (auto tr = m_db->start_write(true)) {
16✔
528
            tr->commit();
16✔
529
            m_db->get_stats(free_space, used_space);
16✔
530
        }
16✔
531
        if (free_space > 0 && m_config.should_compact_on_launch_function(free_space + used_space, used_space))
16✔
532
            m_db->compact();
8✔
533
    }
16✔
534

13,206✔
535
    init_external_helpers();
26,764✔
536
    return true;
26,764✔
537
}
26,764✔
538

539
void RealmCoordinator::init_external_helpers()
540
{
26,790✔
541
    // There's a circular dependency between SyncSession and ExternalCommitHelper
13,219✔
542
    // where sync commits notify ECH and other commits notify sync via ECH. This
13,219✔
543
    // happens on background threads, so to avoid needing locking on every access
13,219✔
544
    // we have to wire things up in a specific order.
13,219✔
545
#if REALM_ENABLE_SYNC
26,790✔
546
    // We may have reused an existing sync session that outlived its original
13,219✔
547
    // RealmCoordinator. If not, we need to create a new one now.
13,219✔
548
    if (m_config.sync_config && !m_sync_session) {
26,790✔
549
        if (!m_config.sync_config->user || m_config.sync_config->user->state() == SyncUser::State::Removed) {
1,385✔
550
            throw app::AppError(
5✔
551
                ErrorCodes::ClientUserNotFound,
5✔
552
                util::format("Cannot start a sync session for user '%1' because this user has been removed.",
5✔
553
                             m_config.sync_config->user->user_id()));
5✔
554
        }
5✔
555
        if (auto sync_manager = m_config.sync_config->user->sync_manager()) {
1,380✔
556
            m_sync_session = sync_manager->get_session(m_db, m_config);
1,380✔
557
        }
1,380✔
558
    }
1,380✔
559
#endif
26,790✔
560

13,219✔
561
    if (!m_notifier && !m_config.immutable() && m_config.automatic_change_notifications) {
26,787✔
562
        try {
6,088✔
563
            m_notifier = std::make_unique<ExternalCommitHelper>(*this, m_config);
6,088✔
564
        }
6,088✔
565
        catch (std::system_error const& ex) {
3,035✔
UNCOV
566
            throw FileAccessError(ErrorCodes::FileOperationFailed,
×
UNCOV
567
                                  util::format("Failed to create ExternalCommitHelper: %1", ex.what()), get_path(),
×
UNCOV
568
                                  ex.code().value());
×
UNCOV
569
        }
×
570
    }
26,783✔
571
    m_db->add_commit_listener(this);
26,783✔
572
}
26,783✔
573

574
void RealmCoordinator::close()
575
{
20✔
576
    m_db->close();
20✔
577
    m_db = nullptr;
20✔
578
}
20✔
579

580
void RealmCoordinator::delete_and_reopen()
581
{
20✔
582
    util::CheckedLockGuard lock(m_realm_mutex);
20✔
583
    close();
20✔
584
    util::File::remove(m_config.path);
20✔
585
    open_db();
20✔
586
}
20✔
587

588
TransactionRef RealmCoordinator::begin_read(VersionID version, bool frozen_transaction)
589
{
91,345✔
590
    REALM_ASSERT(m_db);
91,345✔
591
    return frozen_transaction ? m_db->start_frozen(version) : m_db->start_read(version);
90,966✔
592
}
91,345✔
593

594
uint64_t RealmCoordinator::get_schema_version() const noexcept
UNCOV
595
{
×
UNCOV
596
    util::CheckedLockGuard lock(m_schema_cache_mutex);
×
UNCOV
597
    return m_schema_version;
×
UNCOV
598
}
×
599

600
bool RealmCoordinator::get_cached_schema(Schema& schema, uint64_t& schema_version,
601
                                         uint64_t& transaction) const noexcept
602
{
52,693✔
603
    util::CheckedLockGuard lock(m_schema_cache_mutex);
52,693✔
604
    if (!m_cached_schema)
52,693✔
605
        return false;
45,256✔
606
    schema = *m_cached_schema;
7,437✔
607
    schema_version = m_schema_version;
7,437✔
608
    transaction = m_schema_transaction_version_max;
7,437✔
609
    return true;
7,437✔
610
}
7,437✔
611

612
void RealmCoordinator::cache_schema(Schema const& new_schema, uint64_t new_schema_version,
613
                                    uint64_t transaction_version)
614
{
50,715✔
615
    util::CheckedLockGuard lock(m_schema_cache_mutex);
50,715✔
616
    if (transaction_version < m_schema_transaction_version_max)
50,715✔
617
        return;
292✔
618
    if (new_schema.empty() || new_schema_version == ObjectStore::NotVersioned)
50,423✔
619
        return;
18,782✔
620

15,643✔
621
    m_cached_schema = new_schema;
31,641✔
622
    m_schema_version = new_schema_version;
31,641✔
623
    m_schema_transaction_version_min = transaction_version;
31,641✔
624
    m_schema_transaction_version_max = transaction_version;
31,641✔
625
}
31,641✔
626

627
void RealmCoordinator::clear_schema_cache_and_set_schema_version(uint64_t new_schema_version)
628
{
18,933✔
629
    util::CheckedLockGuard lock(m_schema_cache_mutex);
18,933✔
630
    m_cached_schema = util::none;
18,933✔
631
    m_schema_version = new_schema_version;
18,933✔
632
}
18,933✔
633

634
void RealmCoordinator::advance_schema_cache(uint64_t previous, uint64_t next)
635
{
122,282✔
636
    util::CheckedLockGuard lock(m_schema_cache_mutex);
122,282✔
637
    if (!m_cached_schema)
122,282✔
638
        return;
18,599✔
639
    REALM_ASSERT(previous <= m_schema_transaction_version_max);
103,683✔
640
    if (next < m_schema_transaction_version_min)
103,683✔
641
        return;
2✔
642
    m_schema_transaction_version_min = std::min(previous, m_schema_transaction_version_min);
103,681✔
643
    m_schema_transaction_version_max = std::max(next, m_schema_transaction_version_max);
103,681✔
644
}
103,681✔
645

646
RealmCoordinator::RealmCoordinator(Private) {}
26,822✔
647

648
RealmCoordinator::~RealmCoordinator()
649
{
26,822✔
650
    {
26,822✔
651
        std::lock_guard<std::mutex> coordinator_lock(s_coordinator_mutex);
26,822✔
652
        for (auto it = s_coordinators_per_path.begin(); it != s_coordinators_per_path.end();) {
58,121✔
653
            if (it->second.expired()) {
31,299✔
654
                it = s_coordinators_per_path.erase(it);
26,777✔
655
            }
26,777✔
656
            else {
4,522✔
657
                ++it;
4,522✔
658
            }
4,522✔
659
        }
31,299✔
660
    }
26,822✔
661

13,235✔
662
    if (m_db) {
26,822✔
663
        m_db->remove_commit_listener(this);
26,772✔
664
    }
26,772✔
665

13,235✔
666
    // Waits for the worker thread to join
13,235✔
667
    m_notifier.reset();
26,822✔
668

13,235✔
669
    // If there's any active NotificationTokens they'll keep the notifiers alive,
13,235✔
670
    // so tell the notifiers to release their Transactions so that the DB can
13,235✔
671
    // be closed immediately.
13,235✔
672
    // No locking needed here because the worker thread is gone
13,235✔
673
    for (auto& notifier : m_new_notifiers)
26,822✔
674
        notifier->release_data();
54✔
675
    for (auto& notifier : m_notifiers)
26,822✔
676
        notifier->release_data();
82✔
677
}
26,822✔
678

679
void RealmCoordinator::unregister_realm(Realm* realm)
680
{
34,344✔
681
    util::CheckedLockGuard lock(m_realm_mutex);
34,344✔
682
    // Normally results notifiers are cleaned up by the background worker thread
16,925✔
683
    // but if that's disabled we need to ensure that any notifiers from this
16,925✔
684
    // Realm get cleaned up
16,925✔
685
    if (!m_config.automatic_change_notifications) {
34,344✔
686
        util::CheckedLockGuard lock(m_notifier_mutex);
26,389✔
687
        clean_up_dead_notifiers();
26,389✔
688
    }
26,389✔
689
    {
34,344✔
690
        auto new_end = remove_if(begin(m_weak_realm_notifiers), end(m_weak_realm_notifiers), [=](auto& notifier) {
48,010✔
691
            return notifier.expired() || notifier.is_for_realm(realm);
48,010✔
692
        });
48,010✔
693
        m_weak_realm_notifiers.erase(new_end, end(m_weak_realm_notifiers));
34,344✔
694
    }
34,344✔
695
}
34,344✔
696

697
// Thread-safety analysis doesn't reasonably handle calling functions on different
698
// instances of this type
699
void RealmCoordinator::clear_cache() NO_THREAD_SAFETY_ANALYSIS
700
{
48✔
701
    std::vector<std::shared_ptr<RealmCoordinator>> coordinators;
48✔
702
    {
48✔
703
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
48✔
704
        for (auto& weak_coordinator : s_coordinators_per_path) {
48✔
705
            if (auto coordinator = weak_coordinator.second.lock()) {
48✔
706
                coordinators.push_back(coordinator);
48✔
707
            }
48✔
708
        }
48✔
709
        s_coordinators_per_path.clear();
48✔
710
    }
48✔
711

24✔
712
    for (auto& coordinator : coordinators) {
48✔
713
        coordinator->m_notifier = nullptr;
48✔
714

24✔
715
        std::vector<std::shared_ptr<Realm>> realms_to_close;
48✔
716
        {
48✔
717
            // Gather a list of all of the realms which will be removed
24✔
718
            util::CheckedLockGuard lock(coordinator->m_realm_mutex);
48✔
719
            for (auto& weak_realm_notifier : coordinator->m_weak_realm_notifiers) {
48✔
720
                if (auto realm = weak_realm_notifier.realm()) {
48✔
721
                    realms_to_close.push_back(realm);
48✔
722
                }
48✔
723
            }
48✔
724
        }
48✔
725

24✔
726
        // Close all of the previously cached Realms. This can't be done while
24✔
727
        // locks are held as it may try to re-lock them.
24✔
728
        for (auto& realm : realms_to_close)
48✔
729
            realm->close();
48✔
730
    }
48✔
731
}
48✔
732

733
void RealmCoordinator::clear_all_caches()
734
{
106✔
735
    std::vector<std::weak_ptr<RealmCoordinator>> to_clear;
106✔
736
    {
106✔
737
        std::lock_guard<std::mutex> lock(s_coordinator_mutex);
106✔
738
        for (auto iter : s_coordinators_per_path) {
77✔
739
            to_clear.push_back(iter.second);
48✔
740
        }
48✔
741
    }
106✔
742
    for (auto weak_coordinator : to_clear) {
77✔
743
        if (auto coordinator = weak_coordinator.lock()) {
48✔
744
            coordinator->clear_cache();
48✔
745
        }
48✔
746
    }
48✔
747
}
106✔
748

749
void RealmCoordinator::assert_no_open_realms() noexcept
750
{
1,015✔
751
#ifdef REALM_DEBUG
1,015✔
752
    std::lock_guard<std::mutex> lock(s_coordinator_mutex);
1,015✔
753
    REALM_ASSERT(s_coordinators_per_path.empty());
1,015✔
754
#endif
1,015✔
755
}
1,015✔
756

757
void RealmCoordinator::wake_up_notifier_worker()
758
{
10,682✔
759
    if (m_notifier) {
10,682✔
760
        // FIXME: this wakes up the notification workers for all processes and
54✔
761
        // not just us. This might be worth optimizing in the future.
54✔
762
        m_notifier->notify_others();
108✔
763
    }
108✔
764
}
10,682✔
765

766
void RealmCoordinator::commit_write(Realm& realm, bool commit_to_disk)
767
{
62,460✔
768
    REALM_ASSERT(!m_config.immutable());
62,460✔
769
    REALM_ASSERT(realm.is_in_transaction());
62,460✔
770

30,575✔
771
    Transaction& tr = Realm::Internal::get_transaction(realm);
62,460✔
772
    VersionID new_version;
62,460✔
773
    {
62,460✔
774
        // Need to acquire this lock before committing or another process could
30,575✔
775
        // perform a write and notify us before we get the chance to set the
30,575✔
776
        // skip version
30,575✔
777
        util::CheckedLockGuard l(m_notifier_mutex);
62,460✔
778
        new_version = tr.commit_and_continue_as_read(commit_to_disk);
62,460✔
779

30,575✔
780
        // The skip version must always be the notifier transaction's current
30,575✔
781
        // version plus one, as we can only skip a prefix and not intermediate
30,575✔
782
        // transactions. If we have a notifier for the current Realm, then we
30,575✔
783
        // waited until it finished running in begin_transaction() and this
30,575✔
784
        // invariant holds. If we don't have any notifiers then we don't need
30,575✔
785
        // to set the skip version, but more importantly *can't* because we
30,575✔
786
        // didn't block when starting the write and the notifier transaction
30,575✔
787
        // may still be on an older version.
30,575✔
788
        //
30,575✔
789
        // Note that this relies on the fact that callbacks cannot be added from
30,575✔
790
        // within write transactions. If they could be, we could hit this point
30,575✔
791
        // with an implicit-created notifier which ran (and so is in m_notifiers
30,575✔
792
        // and not m_new_notifiers) but didn't have a callback at the start of
30,575✔
793
        // the write so we didn't block for it then, but does now have a callback.
30,575✔
794
        // If we add support for that, we'll need to update this logic.
30,575✔
795
        bool have_notifiers = std::any_of(m_notifiers.begin(), m_notifiers.end(), [&](auto&& notifier) {
35,676✔
796
            return notifier->is_for_realm(realm) && notifier->have_callbacks();
10,164✔
797
        });
10,164✔
798
        if (have_notifiers) {
62,460✔
799
            REALM_ASSERT(!m_notifier_skip_version);
2,408✔
800
            REALM_ASSERT(m_notifier_transaction);
2,408✔
801
            REALM_ASSERT_3(m_notifier_transaction->get_transact_stage(), ==, DB::transact_Reading);
2,408✔
802
            REALM_ASSERT_3(m_notifier_transaction->get_version() + 1, ==, new_version.version);
2,408✔
803
            m_notifier_skip_version = tr.duplicate();
2,408✔
804
        }
2,408✔
805
    }
62,460✔
806

30,575✔
807
    if (realm.m_binding_context) {
62,460✔
808
        realm.m_binding_context->did_change({}, {});
26✔
809
    }
26✔
810
    // note: no longer safe to access `realm` or `this` after this point as
30,575✔
811
    // did_change() may have closed the Realm.
30,575✔
812
}
62,460✔
813

814
void RealmCoordinator::enable_wait_for_change()
UNCOV
815
{
×
UNCOV
816
    m_db->enable_wait_for_change();
×
UNCOV
817
}
×
818

819
bool RealmCoordinator::wait_for_change(std::shared_ptr<Transaction> tr)
UNCOV
820
{
×
UNCOV
821
    return m_db->wait_for_change(tr);
×
UNCOV
822
}
×
823

824
void RealmCoordinator::wait_for_change_release()
UNCOV
825
{
×
UNCOV
826
    m_db->wait_for_change_release();
×
UNCOV
827
}
×
828

829
bool RealmCoordinator::can_advance(Realm& realm)
830
{
25,227✔
831
    return realm.last_seen_transaction_version() != m_db->get_version_of_latest_snapshot();
25,227✔
832
}
25,227✔
833

834
// Thread-safety analysis doesn't reasonably handle calling functions on different
835
// instances of this type
836
void RealmCoordinator::register_notifier(std::shared_ptr<CollectionNotifier> notifier) NO_THREAD_SAFETY_ANALYSIS
837
{
10,478✔
838
    auto& self = Realm::Internal::get_coordinator(*notifier->get_realm());
10,478✔
839
    {
10,478✔
840
        util::CheckedLockGuard lock(self.m_notifier_mutex);
10,478✔
841
        notifier->set_initial_transaction(self.m_new_notifiers);
10,478✔
842
        self.m_new_notifiers.push_back(std::move(notifier));
10,478✔
843
    }
10,478✔
844
}
10,478✔
845

846
void RealmCoordinator::clean_up_dead_notifiers()
847
{
86,950✔
848
    auto swap_remove = [&](auto& container) {
173,900✔
849
        bool did_remove = false;
173,900✔
850
        for (size_t i = 0; i < container.size(); ++i) {
241,558✔
851
            if (container[i]->is_alive())
67,658✔
852
                continue;
57,316✔
853

5,171✔
854
            // Ensure the notifier is destroyed here even if there's lingering refs
5,171✔
855
            // to the async notifier elsewhere
5,171✔
856
            container[i]->release_data();
10,342✔
857

5,171✔
858
            if (container.size() > i + 1)
10,342✔
859
                container[i] = std::move(container.back());
5,310✔
860
            container.pop_back();
10,342✔
861
            --i;
10,342✔
862
            did_remove = true;
10,342✔
863
        }
10,342✔
864
        return did_remove;
173,900✔
865
    };
173,900✔
866

44,171✔
867
    if (swap_remove(m_notifiers) && m_notifiers.empty()) {
86,950✔
868
        m_notifier_transaction = nullptr;
4,997✔
869
        m_notifier_handover_transaction = nullptr;
4,997✔
870
        m_notifier_skip_version.reset();
4,997✔
871
    }
4,997✔
872
    swap_remove(m_new_notifiers);
86,950✔
873
}
86,950✔
874

875
void RealmCoordinator::on_commit(DB::version_type)
876
{
81,500✔
877
    if (m_notifier) {
81,500✔
878
        m_notifier->notify_others();
28,431✔
879
    }
28,431✔
880
}
81,500✔
881

882
void RealmCoordinator::on_change()
883
{
44,837✔
884
#if REALM_ENABLE_SYNC
44,837✔
885
    // Invoke realm sync if another process has notified for a change
22,424✔
886
    if (m_sync_session) {
44,837✔
887
        auto version = m_db->get_version_of_latest_snapshot();
9,092✔
888
        SyncSession::Internal::nonsync_transact_notify(*m_sync_session, version);
9,092✔
889
    }
9,092✔
890
#endif
44,837✔
891

22,424✔
892
    {
44,837✔
893
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
44,837✔
894
        run_async_notifiers();
44,837✔
895
    }
44,837✔
896

22,424✔
897
    util::CheckedLockGuard lock(m_realm_mutex);
44,837✔
898
    for (auto& realm : m_weak_realm_notifiers) {
64,610✔
899
        realm.notify();
64,610✔
900
    }
64,610✔
901
}
44,837✔
902

903
void RealmCoordinator::run_async_notifiers()
904
{
44,918✔
905
    util::CheckedUniqueLock lock(m_notifier_mutex);
44,918✔
906

22,473✔
907
    clean_up_dead_notifiers();
44,918✔
908

22,473✔
909
    if (m_notifiers.empty() && m_new_notifiers.empty()) {
44,918✔
910
        REALM_ASSERT(!m_notifier_skip_version);
27,972✔
911
        return;
27,972✔
912
    }
27,972✔
913

8,792✔
914
    if (!m_notifier_transaction) {
16,946✔
915
        REALM_ASSERT(m_notifiers.empty());
5,073✔
916
        REALM_ASSERT(!m_notifier_skip_version);
5,073✔
917
        m_notifier_transaction = m_db->start_read();
5,073✔
918
    }
5,073✔
919

8,792✔
920
    // We need to pick the final version to advance to while the lock is held
8,792✔
921
    // as otherwise if a commit is made while new notifiers are being advanced
8,792✔
922
    // we could end up advancing over the skip version. We create a transaction
8,792✔
923
    // object for it to make sure the version stays valid
8,792✔
924
    TransactionRef newest_transaction = m_db->start_read();
16,946✔
925
    VersionID version = newest_transaction->get_version_of_current_transaction();
16,946✔
926

8,792✔
927
    auto skip_version = std::move(m_notifier_skip_version);
16,946✔
928

8,792✔
929
    // Make a copy of the notifiers vector and then release the lock to avoid
8,792✔
930
    // blocking other threads trying to register or unregister notifiers while we run them
8,792✔
931
    decltype(m_notifiers) notifiers;
16,946✔
932
    if (version != m_notifier_transaction->get_version_of_current_transaction()) {
16,946✔
933
        // We only want to rerun the existing notifiers if the version has changed.
6,105✔
934
        // This is both a minor optimization and required for notification
6,105✔
935
        // skipping to work. The skip logic assumes that the notifier can't be
6,105✔
936
        // running when suppress_next() is called because it can only be called
6,105✔
937
        // from within a write transaction, and starting the write transaction
6,105✔
938
        // would have blocked until the notifier is done running. However, if we
6,105✔
939
        // run the notifiers at a point where the version isn't changing, that
6,105✔
940
        // could happen concurrently with a call to suppress_next(), and we
6,105✔
941
        // could unset skip_next on a callback from that zero-version run
6,105✔
942
        // rather than the intended one.
6,105✔
943
        //
6,105✔
944
        // Spurious wakeups can happen in a few ways: adding a new notifier,
6,105✔
945
        // adding a new notifier in a different process sharing this Realm file,
6,105✔
946
        // closing the Realm in a different process, and possibly some other cases.
6,105✔
947
        notifiers = m_notifiers;
10,507✔
948
    }
10,507✔
949
    else {
6,439✔
950
        REALM_ASSERT(!skip_version);
6,439✔
951
        if (m_new_notifiers.empty()) {
6,439✔
952
            // We were spuriously woken up and there isn't actually anything to do
119✔
953
            return;
1,303✔
954
        }
1,303✔
955
    }
15,643✔
956

8,673✔
957
    auto new_notifiers = std::move(m_new_notifiers);
15,643✔
958
    m_new_notifiers.clear();
15,643✔
959
    m_notifiers.insert(m_notifiers.end(), new_notifiers.begin(), new_notifiers.end());
15,643✔
960
    lock.unlock();
15,643✔
961

8,673✔
962
    // Advance all of the new notifiers to the most recent version, if any
8,673✔
963
    std::vector<TransactionChangeInfo> new_notifier_change_info;
15,643✔
964
    if (!new_notifiers.empty()) {
15,643✔
965
        new_notifier_change_info.reserve(new_notifiers.size());
5,138✔
966
        for (auto& notifier : new_notifiers) {
10,406✔
967
            if (notifier->version() == version)
10,406✔
968
                continue;
10,046✔
969
            new_notifier_change_info.emplace_back();
360✔
970
            notifier->add_required_change_info(new_notifier_change_info.back());
360✔
971
            transaction::parse(*newest_transaction, new_notifier_change_info.back(), notifier->version().version,
360✔
972
                               version.version);
360✔
973
        }
360✔
974
    }
5,138✔
975

8,673✔
976
    // If the skip version is set and we have more than one version to process,
8,673✔
977
    // we need to start with just the skip version so that any suppressed
8,673✔
978
    // callbacks can ignore the changes from it without missing changes from
8,673✔
979
    // later versions. If the skip version is set and there aren't any more
8,673✔
980
    // versions after it, we just want to process with normal processing. See
8,673✔
981
    // the above note about spurious wakeups for why this is required for
8,673✔
982
    // correctness and not just a very minor optimization.
8,673✔
983
    if (skip_version && skip_version->get_version_of_current_transaction() != version) {
15,643✔
984
        REALM_ASSERT(!notifiers.empty());
4✔
985
        REALM_ASSERT(version >= skip_version->get_version_of_current_transaction());
4✔
986
        TransactionChangeInfo info;
4✔
987
        for (auto& notifier : notifiers)
4✔
988
            notifier->add_required_change_info(info);
4✔
989
        transaction::advance(*m_notifier_transaction, info, skip_version->get_version_of_current_transaction());
4✔
990
        for (auto& notifier : notifiers)
4✔
991
            notifier->run();
4✔
992

2✔
993
        util::CheckedLockGuard lock(m_notifier_mutex);
4✔
994
        for (auto& notifier : notifiers)
4✔
995
            notifier->prepare_handover();
4✔
996
    }
4✔
997

8,673✔
998
    // Advance the non-new notifiers to the same version as we advanced the new
8,673✔
999
    // ones to (or the latest if there were no new ones)
8,673✔
1000
    TransactionChangeInfo change_info;
15,643✔
1001
    for (auto& notifier : notifiers) {
17,097✔
1002
        notifier->add_required_change_info(change_info);
17,097✔
1003
    }
17,097✔
1004
    transaction::advance(*m_notifier_transaction, change_info, version);
15,643✔
1005

8,673✔
1006
    {
15,643✔
1007
        // If there's multiple notifiers for a single collection, we only populate
8,673✔
1008
        // the data for the first one during parsing and need to copy it to the
8,673✔
1009
        // others. This is a reverse scan where each collection looks for the
8,673✔
1010
        // first collection with the same id. It is O(N^2), but typically the
8,673✔
1011
        // number of collections observed will be very small.
8,673✔
1012
        auto id = [](auto const& c) {
11,261✔
1013
            return std::tie(c.table_key, c.path, c.obj_key);
5,176✔
1014
        };
5,176✔
1015
        auto& collections = change_info.collections;
15,643✔
1016
        for (size_t i = collections.size(); i > 0; --i) {
19,629✔
1017
            for (size_t j = 0; j < i - 1; ++j) {
4,256✔
1018
                if (id(collections[i - 1]) == id(collections[j])) {
2,588✔
1019
                    collections[i - 1].changes->merge(CollectionChangeBuilder{*collections[j].changes});
2,318✔
1020
                    break;
2,318✔
1021
                }
2,318✔
1022
            }
2,588✔
1023
        }
3,986✔
1024
    }
15,643✔
1025

8,673✔
1026
    // Now that they're at the same version, switch the new notifiers over to
8,673✔
1027
    // the main Transaction used for background work rather than the temporary one
8,673✔
1028
    for (auto& notifier : new_notifiers) {
13,876✔
1029
        notifier->attach_to(m_notifier_transaction);
10,406✔
1030
        notifier->run();
10,406✔
1031
    }
10,406✔
1032

8,673✔
1033
    // Change info is now all ready, so the notifiers can now perform their
8,673✔
1034
    // background work
8,673✔
1035
    for (auto& notifier : notifiers) {
17,097✔
1036
        notifier->run();
17,097✔
1037
    }
17,097✔
1038

8,673✔
1039
    // Reacquire the lock while updating the fields that are actually read on
8,673✔
1040
    // other threads
8,673✔
1041
    util::CheckedLockGuard lock2(m_notifier_mutex);
15,643✔
1042
    for (auto& notifier : new_notifiers) {
13,876✔
1043
        notifier->prepare_handover();
10,406✔
1044
    }
10,406✔
1045
    for (auto& notifier : notifiers) {
17,097✔
1046
        notifier->prepare_handover();
17,097✔
1047
    }
17,097✔
1048
    clean_up_dead_notifiers();
15,643✔
1049
    if (!m_notifiers.empty())
15,643✔
1050
        m_notifier_handover_transaction = m_db->start_read(version);
15,637✔
1051
}
15,643✔
1052

1053
void RealmCoordinator::advance_to_ready(Realm& realm)
1054
{
7,254✔
1055
    // If callbacks close the Realm the last external reference may go away
2,973✔
1056
    // while we're in this function
2,973✔
1057
    auto self = shared_from_this();
7,254✔
1058
    auto tr = Realm::Internal::get_transaction_ref(realm);
7,254✔
1059
    auto current_version = tr->get_version_of_current_transaction();
7,254✔
1060

2,973✔
1061
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
7,254✔
1062

2,973✔
1063
    // Transaction which will pin the version we're packaging for deliver to,
2,973✔
1064
    // to ensure it's not cleaned up between when we release the mutex and when
2,973✔
1065
    // we actually advance (which is not done while holding a lock).
2,973✔
1066
    std::shared_ptr<Transaction> handover_version_tr;
7,254✔
1067
    {
7,254✔
1068
        util::CheckedLockGuard lock(m_notifier_mutex);
7,254✔
1069

2,973✔
1070
        // If there are any new notifiers for this Realm then by definition they
2,973✔
1071
        // haven't run yet and aren't ready
2,973✔
1072
        for (auto& notifier : m_new_notifiers) {
2,973✔
UNCOV
1073
            if (notifier->is_for_realm(realm))
×
UNCOV
1074
                return;
×
UNCOV
1075
        }
×
1076

2,973✔
1077
        for (auto& notifier : m_notifiers) {
8,626✔
1078
            if (!notifier->is_for_realm(realm))
7,990✔
1079
                continue;
46✔
1080
            // If the notifier hasn't run it isn't ready and we should do nothing
4,322✔
1081
            if (!notifier->has_run())
7,944✔
UNCOV
1082
                return;
×
1083
            // package_for_delivery() returning false indicates that it's been
4,322✔
1084
            // unregistered but not yet cleaned up, so it effectively doesn't exist
4,322✔
1085
            if (!notifier->package_for_delivery())
7,944✔
1086
                continue;
×
1087
            notifiers.push_back(notifier);
7,944✔
1088
        }
7,944✔
1089

2,973✔
1090
        handover_version_tr = m_notifier_handover_transaction;
7,254✔
1091
    }
7,254✔
1092

2,973✔
1093
    if (notifiers.empty()) {
7,254✔
1094
        // If we have no notifiers for this Realm, just advance to latest
388✔
1095
        return transaction::advance(tr, realm.m_binding_context.get(), {});
2,784✔
1096
    }
2,784✔
1097

2,585✔
1098
    // If we have notifiers but no transaction, then they've never run before.
2,585✔
1099
    if (!handover_version_tr)
4,470✔
UNCOV
1100
        return;
×
1101

2,585✔
1102
    auto notifier_version = handover_version_tr->get_version_of_current_transaction();
4,470✔
1103
    // If the most recent write was performed via the Realm instance being
2,585✔
1104
    // advanced, the notifiers can be at an older version than the Realm.
2,585✔
1105
    // This means that there's no advancing to do
2,585✔
1106
    if (notifier_version < current_version)
4,470✔
1107
        return;
×
1108

2,585✔
1109
    // We can have notifications for the current version if it's the initial
2,585✔
1110
    // notification for a newly added callback or if the write was performed
2,585✔
1111
    // on this Realm instance. There might also be a newer version but we ignore
2,585✔
1112
    // it if so.
2,585✔
1113
    if (notifier_version == current_version) {
4,470✔
1114
        if (realm.m_binding_context)
8✔
UNCOV
1115
            realm.m_binding_context->will_send_notifications();
×
1116
        if (realm.is_closed())
8✔
UNCOV
1117
            return;
×
1118
        for (auto& notifier : notifiers)
8✔
1119
            notifier->after_advance();
8✔
1120
        if (realm.is_closed())
8✔
UNCOV
1121
            return;
×
1122
        if (realm.m_binding_context)
8✔
UNCOV
1123
            realm.m_binding_context->did_send_notifications();
×
1124
        return;
8✔
1125
    }
8✔
1126

2,582✔
1127
    // We have notifiers for a newer version, so advance to that
2,582✔
1128
    transaction::advance(tr, realm.m_binding_context.get(),
4,462✔
1129
                         _impl::NotifierPackage(std::move(notifiers), handover_version_tr));
4,462✔
1130
}
4,462✔
1131

1132
std::vector<std::shared_ptr<_impl::CollectionNotifier>> RealmCoordinator::notifiers_for_realm(Realm& realm)
1133
{
58,820✔
1134
    auto pred = [&](auto& notifier) {
36,841✔
1135
        return notifier->is_for_realm(realm);
16,248✔
1136
    };
16,248✔
1137
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> ret;
58,820✔
1138
    std::copy_if(m_new_notifiers.begin(), m_new_notifiers.end(), std::back_inserter(ret), pred);
58,820✔
1139
    std::copy_if(m_notifiers.begin(), m_notifiers.end(), std::back_inserter(ret), pred);
58,820✔
1140
    return ret;
58,820✔
1141
}
58,820✔
1142

1143
bool RealmCoordinator::advance_to_latest(Realm& realm)
1144
{
9,249✔
1145
    // If callbacks close the Realm the last external reference may go away
4,469✔
1146
    // while we're in this function
4,469✔
1147
    auto self = shared_from_this();
9,249✔
1148
    auto tr = Realm::Internal::get_transaction_ref(realm);
9,249✔
1149

4,469✔
1150
    NotifierVector notifiers;
9,249✔
1151
    {
9,249✔
1152
        util::CheckedUniqueLock lock(m_notifier_mutex);
9,249✔
1153
        notifiers = notifiers_for_realm(realm);
9,249✔
1154
    }
9,249✔
1155
    auto pin_tr = package_notifiers(notifiers, m_db->get_version_of_latest_snapshot());
9,249✔
1156

4,469✔
1157
    auto prev_version = tr->get_version_of_current_transaction();
9,249✔
1158
    transaction::advance(tr, realm.m_binding_context.get(), _impl::NotifierPackage(std::move(notifiers), pin_tr));
9,249✔
1159
    return !realm.is_closed() && prev_version != tr->get_version_of_current_transaction();
9,249✔
1160
}
9,249✔
1161

1162
void RealmCoordinator::promote_to_write(Realm& realm)
1163
{
49,571✔
1164
    REALM_ASSERT(!realm.is_in_transaction());
49,571✔
1165
    // If callbacks close the Realm the last external reference may go away
24,203✔
1166
    // while we're in this function
24,203✔
1167
    auto self = shared_from_this();
49,571✔
1168

24,203✔
1169
    util::CheckedUniqueLock lock(m_notifier_mutex);
49,571✔
1170
    auto notifiers = notifiers_for_realm(realm);
49,571✔
1171
    lock.unlock();
49,571✔
1172

24,203✔
1173
    transaction::begin(Realm::Internal::get_transaction_ref(realm), realm.m_binding_context.get(),
49,571✔
1174
                       {std::move(notifiers), this});
49,571✔
1175
}
49,571✔
1176

1177
void RealmCoordinator::process_available_async(Realm& realm)
1178
{
49,481✔
1179
    REALM_ASSERT(!realm.is_in_transaction());
49,481✔
1180
    // If callbacks close the Realm the last external reference may go away
23,644✔
1181
    // while we're in this function
23,644✔
1182
    auto self = shared_from_this();
49,481✔
1183

23,644✔
1184
    auto current_version = realm.current_transaction_version();
49,481✔
1185
    std::vector<std::shared_ptr<_impl::CollectionNotifier>> notifiers;
49,481✔
1186

23,644✔
1187
    {
49,481✔
1188
        util::CheckedLockGuard lock(m_notifier_mutex);
49,481✔
1189
        // No handover transaction means there can't be anything waiting to deliver
23,644✔
1190
        if (!m_notifier_handover_transaction)
49,481✔
1191
            return;
42,198✔
1192
        // If we have a read transaction, it needs to be an exact match in version
3,637✔
1193
        // to the notifications as we're only delivering initial notifications
3,637✔
1194
        // and not advancing.
3,637✔
1195
        if (current_version &&
7,283✔
1196
            current_version != m_notifier_handover_transaction->get_version_of_current_transaction())
7,278✔
1197
            return;
36✔
1198

3,620✔
1199
        for (auto& notifier : m_notifiers) {
15,204✔
1200
            if (!notifier->is_for_realm(realm) || !notifier->has_run() || !notifier->package_for_delivery())
15,204✔
1201
                continue;
78✔
1202
            notifiers.push_back(notifier);
15,126✔
1203
        }
15,126✔
1204
    }
7,247✔
1205
    if (notifiers.empty())
7,247✔
1206
        return;
37✔
1207

3,599✔
1208
    if (realm.m_binding_context)
7,210✔
1209
        realm.m_binding_context->will_send_notifications();
4✔
1210
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
7,210✔
1211
        return;
2✔
1212
    for (auto& notifier : notifiers)
7,208✔
1213
        notifier->after_advance();
15,124✔
1214
    if (realm.is_closed()) // i.e. the Realm was closed in the callback above
7,208✔
1215
        return;
2✔
1216
    if (realm.m_binding_context)
7,206✔
UNCOV
1217
        realm.m_binding_context->did_send_notifications();
×
1218
}
7,206✔
1219

1220
TransactionRef RealmCoordinator::package_notifiers(NotifierVector& notifiers, VersionID::version_type target_version)
1221
{
15,755✔
1222
    auto ready = [&] {
17,013✔
1223
        util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
17,013✔
1224
        bool up_to_date =
17,013✔
1225
            m_notifier_handover_transaction &&
17,013✔
1226
            m_notifier_handover_transaction->get_version_of_current_transaction().version >= target_version;
12,598✔
1227
        return std::all_of(begin(notifiers), end(notifiers), [&](auto const& n) {
14,054✔
1228
            return !n->have_callbacks() || (n->has_run() && up_to_date);
11,362✔
1229
        });
11,362✔
1230
    };
17,013✔
1231

7,712✔
1232
    if (!ready()) {
15,755✔
1233
        util::CheckedUniqueLock lock(m_running_notifiers_mutex);
1,258✔
1234
        // The worker thread may have run the notifiers we need while we were
1,160✔
1235
        // waiting for the lock, so re-check
1,160✔
1236
        if (!ready())
1,258✔
1237
            run_async_notifiers();
81✔
1238
    }
1,258✔
1239

7,712✔
1240
    util::CheckedUniqueLock notifier_lock(m_notifier_mutex);
15,755✔
1241
    // If the notifiers are still out of date, that means none of them have callbacks
7,712✔
1242
    // so we don't want to block the calling thread to run them.
7,712✔
1243
    if (!m_notifier_handover_transaction ||
15,755✔
1244
        m_notifier_handover_transaction->get_version_of_current_transaction().version < target_version) {
12,085✔
1245
        notifiers.clear();
8,480✔
1246
        return nullptr;
8,480✔
1247
    }
8,480✔
1248

3,605✔
1249
    auto package = [&](auto& notifier) {
10,095✔
1250
        return !notifier->has_run() || !notifier->package_for_delivery();
10,095✔
1251
    };
10,095✔
1252
    notifiers.erase(std::remove_if(begin(notifiers), end(notifiers), package), end(notifiers));
7,275✔
1253
    return notifiers.empty() ? nullptr : m_notifier_handover_transaction;
7,263✔
1254
}
7,275✔
1255

1256
bool RealmCoordinator::compact()
1257
{
2✔
1258
    return m_db->compact();
2✔
1259
}
2✔
1260

1261
void RealmCoordinator::write_copy(StringData path, const char* key)
1262
{
20✔
1263
    m_db->write_copy(path, key);
20✔
1264
}
20✔
1265

1266
void RealmCoordinator::async_request_write_mutex(Realm& realm)
1267
{
538✔
1268
    auto tr = Realm::Internal::get_transaction_ref(realm);
538✔
1269
    m_db->async_request_write_mutex(tr, [realm = realm.shared_from_this()]() mutable {
503✔
1270
        auto& scheduler = *realm->scheduler();
495✔
1271
        scheduler.invoke([realm = std::move(realm)] {
495✔
1272
            Realm::Internal::run_writes(*realm);
495✔
1273
        });
495✔
1274
    });
495✔
1275
}
538✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc