• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1874

27 Nov 2023 09:40PM UTC coverage: 91.676% (-0.02%) from 91.695%
1874

push

Evergreen

web-flow
Fix a bunch of throw statements to use Realm exceptions (#7141)

* Fix a bunch of throw statements to use Realm exceptions
* check correct exception in test
* clang-format and replaced a couple of std::exceptions in SyncManager
* Updated changelog

---------

Co-authored-by: Jonathan Reams <jbreams@mongodb.com>
Co-authored-by: Jørgen Edelbo <jorgen.edelbo@mongodb.com>

92402 of 169340 branches covered (0.0%)

2 of 77 new or added lines in 7 files covered. (2.6%)

98 existing lines in 17 files now uncovered.

231851 of 252903 relevant lines covered (91.68%)

6068365.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.86
/src/realm/sync/noinst/migration_store.cpp
1
#include <realm/sync/noinst/migration_store.hpp>
2

3
#include <realm/transaction.hpp>
4
#include <realm/sync/noinst/sync_metadata_schema.hpp>
5

6
namespace realm::sync {
7
namespace {
8
constexpr static int c_schema_version = 1;
9
constexpr static std::string_view c_flx_migration_table("flx_migration");
10
constexpr static std::string_view c_flx_migration_started_at("started_at");
11
constexpr static std::string_view c_flx_migration_completed_at("completed_at");
12
constexpr static std::string_view c_flx_migration_state("state");
13
constexpr static std::string_view c_flx_migration_query_string("query_string");
14
constexpr static std::string_view c_flx_migration_original_partition("original_partition");
15
constexpr static std::string_view
16
    c_flx_migration_sentinel_subscription_set_version("sentinel_subscription_set_version");
17
constexpr static std::string_view c_flx_subscription_name_prefix("flx_migrated_");
18

19
class MigrationStoreInit : public MigrationStore {
20
public:
21
    explicit MigrationStoreInit(DBRef db)
22
        : MigrationStore(std::move(db))
23
    {
2,778✔
24
    }
2,778✔
25
};
26

27
} // namespace
28

29
MigrationStoreRef MigrationStore::create(DBRef db)
30
{
2,778✔
31
    return std::make_shared<MigrationStoreInit>(std::move(db));
2,778✔
32
}
2,778✔
33

34
MigrationStore::MigrationStore(DBRef db)
35
    : m_db(std::move(db))
36
    , m_state(MigrationState::NotMigrated)
37
{
2,778✔
38
    load_data(true); // read_only, default to NotMigrated if table is not initialized
2,778✔
39
}
2,778✔
40

41
bool MigrationStore::load_data(bool read_only)
42
{
3,134✔
43
    std::unique_lock lock{m_mutex};
3,134✔
44

1,394✔
45
    if (m_migration_table) {
3,134✔
46
        return true; // already initialized
120✔
47
    }
120✔
48

1,334✔
49
    std::vector<SyncMetadataTable> internal_tables{
3,014✔
50
        {&m_migration_table,
3,014✔
51
         c_flx_migration_table,
3,014✔
52
         {
3,014✔
53
             {&m_migration_started_at, c_flx_migration_started_at, type_Timestamp},
3,014✔
54
             {&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp, true},
3,014✔
55
             {&m_migration_state, c_flx_migration_state, type_Int},
3,014✔
56
             {&m_migration_query_str, c_flx_migration_query_string, type_String},
3,014✔
57
             {&m_migration_partition, c_flx_migration_original_partition, type_String},
3,014✔
58
             {&m_sentinel_query_version, c_flx_migration_sentinel_subscription_set_version, type_Int, true},
3,014✔
59
         }},
3,014✔
60
    };
3,014✔
61

1,334✔
62
    std::optional<int64_t> schema_version;
3,014✔
63
    auto tr = m_db->start_read();
3,014✔
64
    if (read_only) {
3,014✔
65
        // Writing is disabled
1,216✔
66
        SyncMetadataSchemaVersionsReader schema_versions(tr);
2,778✔
67
        schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
2,778✔
68
        if (!schema_version) {
2,778✔
69
            return false; // Either table is not initialized or version does not exist
2,754✔
70
        }
2,754✔
71
    }
236✔
72
    else { // writable
236✔
73
        SyncMetadataSchemaVersions schema_versions(tr);
236✔
74
        schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
236✔
75
        // Create the version and metadata_schema if it doesn't exist
118✔
76
        if (!schema_version) {
236✔
77
            tr->promote_to_write();
236✔
78
            schema_versions.set_version_for(tr, internal_schema_groups::c_flx_migration_store, c_schema_version);
236✔
79
            create_sync_metadata_schema(tr, &internal_tables);
236✔
80
            tr->commit_and_continue_as_read();
236✔
81
        }
236✔
82
    }
236✔
83
    // Load the metadata schema unless it was just created
1,334✔
84
    if (!m_migration_table) {
1,464✔
85
        if (*schema_version != c_schema_version) {
24✔
NEW
86
            throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
×
NEW
87
                               "Invalid schema version for flexible sync migration store metadata");
×
UNCOV
88
        }
×
89
        load_sync_metadata_schema(tr, &internal_tables);
24✔
90
    }
24✔
91

130✔
92
    REALM_ASSERT(m_migration_table);
260✔
93

130✔
94
    // Read the migration object if exists, or default to not migrated
130✔
95
    if (auto migration_table = tr->get_table(m_migration_table); !migration_table->is_empty()) {
260✔
96
        auto migration_store_obj = migration_table->get_object(0);
16✔
97
        m_state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
16✔
98
        m_query_string = migration_store_obj.get<String>(m_migration_query_str);
16✔
99
        m_migrated_partition = migration_store_obj.get<String>(m_migration_partition);
16✔
100
        m_sentinel_subscription_set_version =
16✔
101
            migration_store_obj.get<util::Optional<int64_t>>(m_sentinel_query_version);
16✔
102
    }
16✔
103
    else {
244✔
104
        m_state = MigrationState::NotMigrated;
244✔
105
        m_query_string.reset();
244✔
106
        m_migrated_partition.reset();
244✔
107
        m_sentinel_subscription_set_version.reset();
244✔
108
    }
244✔
109
    return true;
260✔
110
}
260✔
111

112
bool MigrationStore::is_migration_in_progress()
113
{
96✔
114
    std::lock_guard lock{m_mutex};
96✔
115
    return m_state == MigrationState::InProgress;
96✔
116
}
96✔
117

118
bool MigrationStore::is_migrated()
119
{
5,576✔
120
    std::lock_guard lock{m_mutex};
5,576✔
121
    return m_state == MigrationState::Migrated;
5,576✔
122
}
5,576✔
123

124
bool MigrationStore::is_rollback_in_progress()
125
{
2,750✔
126
    std::lock_guard lock{m_mutex};
2,750✔
127
    return m_state == MigrationState::RollbackInProgress;
2,750✔
128
}
2,750✔
129

130
void MigrationStore::complete_migration_or_rollback()
131
{
264✔
132
    // Ensure the migration table has been initialized
132✔
133
    bool loaded = load_data();
264✔
134
    REALM_ASSERT(loaded);
264✔
135

132✔
136
    std::unique_lock lock{m_mutex};
264✔
137
    if (m_state != MigrationState::InProgress && m_state != MigrationState::RollbackInProgress) {
264✔
138
        return;
200✔
139
    }
200✔
140

32✔
141
    // Complete rollback.
32✔
142
    if (m_state == MigrationState::RollbackInProgress) {
64✔
143
        clear(std::move(lock)); // releases the lock
20✔
144
        return;
20✔
145
    }
20✔
146

22✔
147
    // Complete migration.
22✔
148
    m_state = MigrationState::Migrated;
44✔
149

22✔
150
    auto tr = m_db->start_write();
44✔
151
    auto migration_table = tr->get_table(m_migration_table);
44✔
152
    REALM_ASSERT(!migration_table->is_empty());
44✔
153
    auto migration_store_obj = migration_table->get_object(0);
44✔
154
    migration_store_obj.set(m_migration_state, int64_t(m_state));
44✔
155
    migration_store_obj.set(m_migration_completed_at, Timestamp{std::chrono::system_clock::now()});
44✔
156
    tr->commit();
44✔
157
}
44✔
158

159
std::optional<std::string> MigrationStore::get_migrated_partition()
160
{
1,494✔
161
    std::lock_guard lock{m_mutex};
1,494✔
162
    // This will be valid if migration in progress or complete
760✔
163
    return m_migrated_partition;
1,494✔
164
}
1,494✔
165

166
std::optional<std::string> MigrationStore::get_query_string()
167
{
140✔
168
    std::lock_guard lock{m_mutex};
140✔
169
    // This will be valid if migration in progress or complete
70✔
170
    return m_query_string;
140✔
171
}
140✔
172

173
std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shared_ptr<realm::SyncConfig> config)
174
{
232✔
175
    REALM_ASSERT(config);
232✔
176
    // If load data failed in the constructor, m_state defaults to NotMigrated
116✔
177

116✔
178
    std::unique_lock lock{m_mutex};
232✔
179
    if (config->flx_sync_requested || m_state == MigrationState::NotMigrated ||
232✔
180
        m_state == MigrationState::RollbackInProgress) {
190✔
181
        return config;
120✔
182
    }
120✔
183

56✔
184
    // Once in the migrated state, the partition value cannot change for the same realm file
56✔
185
    if (m_state == MigrationState::Migrated && m_migrated_partition &&
112✔
186
        m_migrated_partition != config->partition_value) {
74✔
187
        throw LogicError(
12✔
188
            ErrorCodes::IllegalOperation,
12✔
189
            util::format("Partition value cannot be changed for migrated realms\n - original: %1\n -   config: %2",
12✔
190
                         m_migrated_partition, config->partition_value));
12✔
191
    }
12✔
192

50✔
193
    return convert_sync_config_to_flx(std::move(config));
100✔
194
}
100✔
195

196
std::shared_ptr<realm::SyncConfig>
197
MigrationStore::convert_sync_config_to_flx(std::shared_ptr<realm::SyncConfig> config)
198
{
116✔
199
    if (config->flx_sync_requested) {
116✔
200
        return config;
4✔
201
    }
4✔
202

56✔
203
    auto flx_config = std::make_shared<realm::SyncConfig>(*config); // deep copy
112✔
204
    flx_config->partition_value = "";
112✔
205
    flx_config->flx_sync_requested = true;
112✔
206

56✔
207
    return flx_config;
112✔
208
}
112✔
209

210
void MigrationStore::migrate_to_flx(std::string_view rql_query_string, std::string_view partition_value)
211
{
56✔
212
    REALM_ASSERT(!rql_query_string.empty());
56✔
213

28✔
214
    // Ensure the migration table has been initialized
28✔
215
    bool loaded = load_data();
56✔
216
    REALM_ASSERT(loaded);
56✔
217

28✔
218
    std::unique_lock lock{m_mutex};
56✔
219
    // Can call migrate_to_flx multiple times if migration has not completed.
28✔
220
    REALM_ASSERT(m_state != MigrationState::Migrated);
56✔
221
    m_state = MigrationState::InProgress;
56✔
222
    m_query_string.emplace(rql_query_string);
56✔
223
    m_migrated_partition.emplace(partition_value);
56✔
224

28✔
225
    auto tr = m_db->start_read();
56✔
226
    auto migration_table = tr->get_table(m_migration_table);
56✔
227
    // A migration object may exist if the migration was started in a previous session.
28✔
228
    if (migration_table->is_empty()) {
56✔
229
        tr->promote_to_write();
44✔
230
        auto migration_store_obj = migration_table->create_object();
44✔
231
        migration_store_obj.set(m_migration_query_str, *m_query_string);
44✔
232
        migration_store_obj.set(m_migration_state, int64_t(m_state));
44✔
233
        migration_store_obj.set(m_migration_partition, *m_migrated_partition);
44✔
234
        migration_store_obj.set(m_migration_started_at, Timestamp{std::chrono::system_clock::now()});
44✔
235
        tr->commit();
44✔
236
    }
44✔
237
    else {
12✔
238
        auto migration_store_obj = migration_table->get_object(0);
12✔
239
        auto state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
12✔
240
        auto query_string = migration_store_obj.get<String>(m_migration_query_str);
12✔
241
        auto migrated_partition = migration_store_obj.get<String>(m_migration_partition);
12✔
242
        REALM_ASSERT(m_state == state);
12✔
243
        REALM_ASSERT(m_query_string == query_string);
12✔
244
        REALM_ASSERT(m_migrated_partition == migrated_partition);
12✔
245
    }
12✔
246
}
56✔
247

248
void MigrationStore::rollback_to_pbs()
249
{
28✔
250
    // Ensure the migration table has been initialized
14✔
251
    bool loaded = load_data();
28✔
252
    REALM_ASSERT(loaded);
28✔
253

14✔
254
    std::unique_lock lock{m_mutex};
28✔
255
    // Can call rollback_to_pbs multiple times if rollback has not completed.
14✔
256
    REALM_ASSERT(m_state != MigrationState::NotMigrated);
28✔
257
    m_state = MigrationState::RollbackInProgress;
28✔
258

14✔
259
    auto tr = m_db->start_write();
28✔
260
    auto migration_table = tr->get_table(m_migration_table);
28✔
261
    REALM_ASSERT(!migration_table->is_empty());
28✔
262
    auto migration_store_obj = migration_table->get_object(0);
28✔
263
    migration_store_obj.set(m_migration_state, int64_t(m_state));
28✔
264
    tr->commit();
28✔
265
}
28✔
266

267
void MigrationStore::cancel_migration()
268
{
8✔
269
    // Ensure the migration table has been initialized
4✔
270
    bool loaded = load_data();
8✔
271
    REALM_ASSERT(loaded);
8✔
272

4✔
273
    // Clear the migration state
4✔
274
    std::unique_lock lock{m_mutex};
8✔
275
    REALM_ASSERT(m_state == MigrationState::Migrated);
8✔
276
    clear(std::move(lock)); // releases the lock
8✔
277
}
8✔
278

279
void MigrationStore::clear(std::unique_lock<std::mutex>)
280
{
28✔
281
    // Make sure the migration table has been initialized before calling clear()
14✔
282
    REALM_ASSERT(m_migration_table);
28✔
283

14✔
284
    auto tr = m_db->start_read();
28✔
285
    auto migration_table = tr->get_table(m_migration_table);
28✔
286
    if (migration_table->is_empty()) {
28✔
287
        return; // already cleared
×
288
    }
×
289

14✔
290
    m_state = MigrationState::NotMigrated;
28✔
291
    m_query_string.reset();
28✔
292
    m_migrated_partition.reset();
28✔
293
    m_sentinel_subscription_set_version.reset();
28✔
294
    tr->promote_to_write();
28✔
295
    migration_table->clear();
28✔
296
    tr->commit();
28✔
297
}
28✔
298

299
Subscription MigrationStore::make_subscription(const std::string& object_class_name,
300
                                               const std::string& rql_query_string)
301
{
48✔
302
    REALM_ASSERT(!object_class_name.empty());
48✔
303

24✔
304
    std::string subscription_name = c_flx_subscription_name_prefix.data() + object_class_name;
48✔
305
    return Subscription{subscription_name, object_class_name, rql_query_string};
48✔
306
}
48✔
307

308
void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store)
309
{
2,376✔
310
    std::unique_lock lock{m_mutex};
2,376✔
311
    if (m_state != MigrationState::Migrated) {
2,376✔
312
        return;
2,352✔
313
    }
2,352✔
314

12✔
315
    REALM_ASSERT(m_query_string);
24✔
316
    create_subscriptions(subs_store, *m_query_string);
24✔
317
}
24✔
318

319
void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string)
320
{
64✔
321
    if (rql_query_string.empty()) {
64✔
322
        return;
×
323
    }
×
324

32✔
325
    auto mut_sub = subs_store.get_latest().make_mutable_copy();
64✔
326
    auto sub_count = mut_sub.size();
64✔
327

32✔
328
    auto tr = m_db->start_read();
64✔
329
    // List of tables covered by the latest subscription set.
32✔
330
    auto tables = subs_store.get_tables_for_latest(*tr);
64✔
331

32✔
332
    // List of tables in the realm.
32✔
333
    auto table_keys = tr->get_table_keys();
64✔
334
    for (const auto& key : table_keys) {
496✔
335
        if (!tr->table_is_public(key)) {
496✔
336
            continue;
408✔
337
        }
408✔
338
        auto table = tr->get_table(key);
88✔
339
        if (table->get_table_type() != Table::Type::TopLevel) {
88✔
340
            continue;
×
341
        }
×
342
        auto object_class_name = table->get_class_name();
88✔
343
        if (tables.find(object_class_name) == tables.end()) {
88✔
344
            auto sub = make_subscription(object_class_name, rql_query_string);
48✔
345
            mut_sub.insert_sub(sub);
48✔
346
        }
48✔
347
    }
88✔
348

32✔
349
    // No new subscription was added.
32✔
350
    if (mut_sub.size() == sub_count) {
64✔
351
        return;
24✔
352
    }
24✔
353

20✔
354
    // Commit new subscription set.
20✔
355
    mut_sub.commit();
40✔
356
}
40✔
357

358
void MigrationStore::create_sentinel_subscription_set(const SubscriptionStore& subs_store)
359
{
4✔
360
    std::lock_guard lock{m_mutex};
4✔
361
    if (m_state != MigrationState::Migrated) {
4✔
362
        return;
×
363
    }
×
364
    if (m_sentinel_subscription_set_version) {
4✔
365
        return;
×
366
    }
×
367
    auto mut_sub = subs_store.get_latest().make_mutable_copy();
4✔
368
    auto subscription_set_version = mut_sub.commit().version();
4✔
369
    m_sentinel_subscription_set_version.emplace(subscription_set_version);
4✔
370

2✔
371
    auto tr = m_db->start_write();
4✔
372
    auto migration_table = tr->get_table(m_migration_table);
4✔
373
    REALM_ASSERT(!migration_table->is_empty());
4✔
374
    auto migration_store_obj = migration_table->get_object(0);
4✔
375
    migration_store_obj.set(m_sentinel_query_version, *m_sentinel_subscription_set_version);
4✔
376
    tr->commit();
4✔
377
}
4✔
378

379
std::optional<int64_t> MigrationStore::get_sentinel_subscription_set_version()
380
{
11,184✔
381
    std::lock_guard lock{m_mutex};
11,184✔
382
    return m_sentinel_subscription_set_version;
11,184✔
383
}
11,184✔
384

385
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc