• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_113

27 Oct 2023 10:49AM UTC coverage: 91.596% (+0.03%) from 91.571%
thomas.goyne_113

push

Evergreen

web-flow
Merge pull request #7085 from realm/release/13.23.2

Release/13.23.2

91788 of 168244 branches covered (0.0%)

230165 of 251282 relevant lines covered (91.6%)

6444552.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.56
/src/realm/sync/noinst/migration_store.cpp
1
#include <realm/sync/noinst/migration_store.hpp>
2

3
#include <realm/transaction.hpp>
4
#include <realm/sync/noinst/sync_metadata_schema.hpp>
5

6
namespace realm::sync {
7
namespace {
8
constexpr static int c_schema_version = 1;
9
constexpr static std::string_view c_flx_migration_table("flx_migration");
10
constexpr static std::string_view c_flx_migration_started_at("started_at");
11
constexpr static std::string_view c_flx_migration_completed_at("completed_at");
12
constexpr static std::string_view c_flx_migration_state("state");
13
constexpr static std::string_view c_flx_migration_query_string("query_string");
14
constexpr static std::string_view c_flx_migration_original_partition("original_partition");
15
constexpr static std::string_view
16
    c_flx_migration_sentinel_subscription_set_version("sentinel_subscription_set_version");
17
constexpr static std::string_view c_flx_subscription_name_prefix("flx_migrated_");
18

19
class MigrationStoreInit : public MigrationStore {
20
public:
21
    explicit MigrationStoreInit(DBRef db)
22
        : MigrationStore(std::move(db))
23
    {
2,714✔
24
    }
2,714✔
25
};
26

27
} // namespace
28

29
MigrationStoreRef MigrationStore::create(DBRef db)
30
{
2,714✔
31
    return std::make_shared<MigrationStoreInit>(std::move(db));
2,714✔
32
}
2,714✔
33

34
MigrationStore::MigrationStore(DBRef db)
35
    : m_db(std::move(db))
36
    , m_state(MigrationState::NotMigrated)
37
{
2,714✔
38
    load_data(true); // read_only, default to NotMigrated if table is not initialized
2,714✔
39
}
2,714✔
40

41
bool MigrationStore::load_data(bool read_only)
42
{
3,066✔
43
    std::unique_lock lock{m_mutex};
3,066✔
44

1,360✔
45
    if (m_migration_table) {
3,066✔
46
        return true; // already initialized
120✔
47
    }
120✔
48

1,300✔
49
    std::vector<SyncMetadataTable> internal_tables{
2,946✔
50
        {&m_migration_table,
2,946✔
51
         c_flx_migration_table,
2,946✔
52
         {
2,946✔
53
             {&m_migration_started_at, c_flx_migration_started_at, type_Timestamp},
2,946✔
54
             {&m_migration_completed_at, c_flx_migration_completed_at, type_Timestamp, true},
2,946✔
55
             {&m_migration_state, c_flx_migration_state, type_Int},
2,946✔
56
             {&m_migration_query_str, c_flx_migration_query_string, type_String},
2,946✔
57
             {&m_migration_partition, c_flx_migration_original_partition, type_String},
2,946✔
58
             {&m_sentinel_query_version, c_flx_migration_sentinel_subscription_set_version, type_Int, true},
2,946✔
59
         }},
2,946✔
60
    };
2,946✔
61

1,300✔
62
    std::optional<int64_t> schema_version;
2,946✔
63
    auto tr = m_db->start_read();
2,946✔
64
    if (read_only) {
2,946✔
65
        // Writing is disabled
2,946✔
66
        SyncMetadataSchemaVersionsReader schema_versions(tr);
2,946✔
67
        schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
2,690✔
68
        if (!schema_version) {
2,690✔
69
            return false; // Either table is not initialized or version does not exist
2,690✔
70
        }
128✔
71
    }
128✔
72
    else { // writable
256✔
73
        SyncMetadataSchemaVersions schema_versions(tr);
16✔
74
        schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_migration_store);
16✔
75
        // Create the version and metadata_schema if it doesn't exist
16✔
76
        if (!schema_version) {
16✔
77
            tr->promote_to_write();
16✔
78
            schema_versions.set_version_for(tr, internal_schema_groups::c_flx_migration_store, c_schema_version);
16✔
79
            create_sync_metadata_schema(tr, &internal_tables);
16✔
80
            tr->commit_and_continue_as_read();
240✔
81
        }
240✔
82
    }
240✔
83
    // Load the metadata schema unless it was just created
240✔
84
    if (!m_migration_table) {
240✔
85
        if (*schema_version != c_schema_version) {
240✔
86
            throw std::runtime_error("Invalid schema version for flexible sync migration store metadata");
256✔
87
        }
256✔
88
        load_sync_metadata_schema(tr, &internal_tables);
89
    }
90

96✔
91
    REALM_ASSERT(m_migration_table);
96✔
92

96✔
93
    // Read the migration object if exists, or default to not migrated
96✔
94
    if (auto migration_table = tr->get_table(m_migration_table); !migration_table->is_empty()) {
95
        auto migration_store_obj = migration_table->get_object(0);
96
        m_state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
5,448✔
97
        m_query_string = migration_store_obj.get<String>(m_migration_query_str);
5,448✔
98
        m_migrated_partition = migration_store_obj.get<String>(m_migration_partition);
5,448✔
99
        m_sentinel_subscription_set_version =
5,448✔
100
            migration_store_obj.get<util::Optional<int64_t>>(m_sentinel_query_version);
101
    }
102
    else {
2,686✔
103
        m_state = MigrationState::NotMigrated;
2,686✔
104
        m_query_string.reset();
2,686✔
105
        m_migrated_partition.reset();
2,686✔
106
        m_sentinel_subscription_set_version.reset();
107
    }
108
    return true;
260✔
109
}
130✔
110

260✔
111
bool MigrationStore::is_migration_in_progress()
260✔
112
{
130✔
113
    std::lock_guard lock{m_mutex};
260✔
114
    return m_state == MigrationState::InProgress;
260✔
115
}
196✔
116

196✔
117
bool MigrationStore::is_migrated()
32✔
118
{
32✔
119
    std::lock_guard lock{m_mutex};
64✔
120
    return m_state == MigrationState::Migrated;
20✔
121
}
20✔
122

20✔
123
bool MigrationStore::is_rollback_in_progress()
22✔
124
{
22✔
125
    std::lock_guard lock{m_mutex};
44✔
126
    return m_state == MigrationState::RollbackInProgress;
22✔
127
}
44✔
128

44✔
129
void MigrationStore::complete_migration_or_rollback()
44✔
130
{
44✔
131
    // Ensure the migration table has been initialized
44✔
132
    bool loaded = load_data();
44✔
133
    REALM_ASSERT(loaded);
44✔
134

44✔
135
    std::unique_lock lock{m_mutex};
136
    if (m_state != MigrationState::InProgress && m_state != MigrationState::RollbackInProgress) {
137
        return;
1,460✔
138
    }
1,460✔
139

746✔
140
    // Complete rollback.
1,460✔
141
    if (m_state == MigrationState::RollbackInProgress) {
1,460✔
142
        clear(std::move(lock)); // releases the lock
143
        return;
144
    }
140✔
145

140✔
146
    // Complete migration.
70✔
147
    m_state = MigrationState::Migrated;
140✔
148

140✔
149
    auto tr = m_db->start_write();
150
    auto migration_table = tr->get_table(m_migration_table);
151
    REALM_ASSERT(!migration_table->is_empty());
232✔
152
    auto migration_store_obj = migration_table->get_object(0);
232✔
153
    migration_store_obj.set(m_migration_state, int64_t(m_state));
116✔
154
    migration_store_obj.set(m_migration_completed_at, Timestamp{std::chrono::system_clock::now()});
116✔
155
    tr->commit();
232✔
156
}
232✔
157

190✔
158
std::optional<std::string> MigrationStore::get_migrated_partition()
120✔
159
{
120✔
160
    std::lock_guard lock{m_mutex};
56✔
161
    // This will be valid if migration in progress or complete
56✔
162
    return m_migrated_partition;
112✔
163
}
74✔
164

12✔
165
std::optional<std::string> MigrationStore::get_query_string()
12✔
166
{
12✔
167
    std::lock_guard lock{m_mutex};
12✔
168
    // This will be valid if migration in progress or complete
12✔
169
    return m_query_string;
50✔
170
}
100✔
171

100✔
172
std::shared_ptr<realm::SyncConfig> MigrationStore::convert_sync_config(std::shared_ptr<realm::SyncConfig> config)
173
{
174
    REALM_ASSERT(config);
175
    // If load data failed in the constructor, m_state defaults to NotMigrated
116✔
176

116✔
177
    std::unique_lock lock{m_mutex};
4✔
178
    if (config->flx_sync_requested || m_state == MigrationState::NotMigrated ||
4✔
179
        m_state == MigrationState::RollbackInProgress) {
56✔
180
        return config;
112✔
181
    }
112✔
182

112✔
183
    // Once in the migrated state, the partition value cannot change for the same realm file
56✔
184
    if (m_state == MigrationState::Migrated && m_migrated_partition &&
112✔
185
        m_migrated_partition != config->partition_value) {
112✔
186
        throw LogicError(
187
            ErrorCodes::IllegalOperation,
188
            util::format("Partition value cannot be changed for migrated realms\n - original: %1\n -   config: %2",
56✔
189
                         m_migrated_partition, config->partition_value));
56✔
190
    }
28✔
191

28✔
192
    return convert_sync_config_to_flx(std::move(config));
56✔
193
}
56✔
194

28✔
195
std::shared_ptr<realm::SyncConfig>
56✔
196
MigrationStore::convert_sync_config_to_flx(std::shared_ptr<realm::SyncConfig> config)
28✔
197
{
56✔
198
    if (config->flx_sync_requested) {
56✔
199
        return config;
56✔
200
    }
56✔
201

28✔
202
    auto flx_config = std::make_shared<realm::SyncConfig>(*config); // deep copy
56✔
203
    flx_config->partition_value = "";
56✔
204
    flx_config->flx_sync_requested = true;
28✔
205

56✔
206
    return flx_config;
44✔
207
}
44✔
208

44✔
209
void MigrationStore::migrate_to_flx(std::string_view rql_query_string, std::string_view partition_value)
44✔
210
{
44✔
211
    REALM_ASSERT(!rql_query_string.empty());
44✔
212

44✔
213
    // Ensure the migration table has been initialized
44✔
214
    bool loaded = load_data();
12✔
215
    REALM_ASSERT(loaded);
12✔
216

12✔
217
    std::unique_lock lock{m_mutex};
12✔
218
    // Can call migrate_to_flx multiple times if migration has not completed.
12✔
219
    REALM_ASSERT(m_state != MigrationState::Migrated);
12✔
220
    m_state = MigrationState::InProgress;
12✔
221
    m_query_string.emplace(rql_query_string);
12✔
222
    m_migrated_partition.emplace(partition_value);
12✔
223

56✔
224
    auto tr = m_db->start_read();
225
    auto migration_table = tr->get_table(m_migration_table);
226
    // A migration object may exist if the migration was started in a previous session.
28✔
227
    if (migration_table->is_empty()) {
14✔
228
        tr->promote_to_write();
28✔
229
        auto migration_store_obj = migration_table->create_object();
28✔
230
        migration_store_obj.set(m_migration_query_str, *m_query_string);
14✔
231
        migration_store_obj.set(m_migration_state, int64_t(m_state));
28✔
232
        migration_store_obj.set(m_migration_partition, *m_migrated_partition);
14✔
233
        migration_store_obj.set(m_migration_started_at, Timestamp{std::chrono::system_clock::now()});
28✔
234
        tr->commit();
28✔
235
    }
14✔
236
    else {
28✔
237
        auto migration_store_obj = migration_table->get_object(0);
28✔
238
        auto state = static_cast<MigrationState>(migration_store_obj.get<int64_t>(m_migration_state));
28✔
239
        auto query_string = migration_store_obj.get<String>(m_migration_query_str);
28✔
240
        auto migrated_partition = migration_store_obj.get<String>(m_migration_partition);
28✔
241
        REALM_ASSERT(m_state == state);
28✔
242
        REALM_ASSERT(m_query_string == query_string);
28✔
243
        REALM_ASSERT(m_migrated_partition == migrated_partition);
244
    }
245
}
8✔
246

4✔
247
void MigrationStore::rollback_to_pbs()
8✔
248
{
8✔
249
    // Ensure the migration table has been initialized
4✔
250
    bool loaded = load_data();
4✔
251
    REALM_ASSERT(loaded);
8✔
252

8✔
253
    std::unique_lock lock{m_mutex};
8✔
254
    // Can call rollback_to_pbs multiple times if rollback has not completed.
8✔
255
    REALM_ASSERT(m_state != MigrationState::NotMigrated);
256
    m_state = MigrationState::RollbackInProgress;
257

28✔
258
    auto tr = m_db->start_write();
14✔
259
    auto migration_table = tr->get_table(m_migration_table);
28✔
260
    REALM_ASSERT(!migration_table->is_empty());
14✔
261
    auto migration_store_obj = migration_table->get_object(0);
28✔
262
    migration_store_obj.set(m_migration_state, int64_t(m_state));
28✔
263
    tr->commit();
28✔
264
}
×
265

×
266
void MigrationStore::cancel_migration()
14✔
267
{
28✔
268
    // Ensure the migration table has been initialized
28✔
269
    bool loaded = load_data();
28✔
270
    REALM_ASSERT(loaded);
28✔
271

28✔
272
    // Clear the migration state
28✔
273
    std::unique_lock lock{m_mutex};
28✔
274
    REALM_ASSERT(m_state == MigrationState::Migrated);
28✔
275
    clear(std::move(lock)); // releases the lock
276
}
277

278
void MigrationStore::clear(std::unique_lock<std::mutex>)
48✔
279
{
48✔
280
    // Make sure the migration table has been initialized before calling clear()
24✔
281
    REALM_ASSERT(m_migration_table);
48✔
282

48✔
283
    auto tr = m_db->start_read();
48✔
284
    auto migration_table = tr->get_table(m_migration_table);
285
    if (migration_table->is_empty()) {
286
        return; // already cleared
2,448✔
287
    }
2,448✔
288

2,448✔
289
    m_state = MigrationState::NotMigrated;
2,420✔
290
    m_query_string.reset();
2,420✔
291
    m_migrated_partition.reset();
14✔
292
    m_sentinel_subscription_set_version.reset();
28✔
293
    tr->promote_to_write();
28✔
294
    migration_table->clear();
28✔
295
    tr->commit();
296
}
297

68✔
298
Subscription MigrationStore::make_subscription(const std::string& object_class_name,
68✔
299
                                               const std::string& rql_query_string)
×
300
{
×
301
    REALM_ASSERT(!object_class_name.empty());
34✔
302

68✔
303
    std::string subscription_name = c_flx_subscription_name_prefix.data() + object_class_name;
68✔
304
    return Subscription{subscription_name, object_class_name, rql_query_string};
34✔
305
}
68✔
306

34✔
307
void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store)
68✔
308
{
34✔
309
    std::unique_lock lock{m_mutex};
34✔
310
    if (m_state != MigrationState::Migrated) {
68✔
311
        return;
536✔
312
    }
536✔
313

444✔
314
    REALM_ASSERT(m_query_string);
444✔
315
    create_subscriptions(subs_store, *m_query_string);
92✔
316
}
92✔
317

×
318
void MigrationStore::create_subscriptions(const SubscriptionStore& subs_store, const std::string& rql_query_string)
×
319
{
92✔
320
    if (rql_query_string.empty()) {
92✔
321
        return;
48✔
322
    }
48✔
323

48✔
324
    auto mut_sub = subs_store.get_latest().make_mutable_copy();
92✔
325
    auto sub_count = mut_sub.size();
34✔
326

34✔
327
    auto tr = m_db->start_read();
68✔
328
    // List of tables covered by the latest subscription set.
24✔
329
    auto tables = subs_store.get_tables_for_latest(*tr);
24✔
330

22✔
331
    // List of tables in the realm.
22✔
332
    auto table_keys = tr->get_table_keys();
44✔
333
    for (const auto& key : table_keys) {
44✔
334
        if (!tr->table_is_public(key)) {
335
            continue;
336
        }
4✔
337
        auto table = tr->get_table(key);
4✔
338
        if (table->get_table_type() != Table::Type::TopLevel) {
4✔
339
            continue;
×
340
        }
×
341
        auto object_class_name = table->get_class_name();
4✔
342
        if (tables.find(object_class_name) == tables.end()) {
×
343
            auto sub = make_subscription(object_class_name, rql_query_string);
×
344
            mut_sub.insert_sub(sub);
4✔
345
        }
4✔
346
    }
4✔
347

2✔
348
    // No new subscription was added.
4✔
349
    if (mut_sub.size() == sub_count) {
4✔
350
        return;
4✔
351
    }
4✔
352

4✔
353
    // Commit new subscription set.
4✔
354
    mut_sub.commit();
4✔
355
}
356

357
void MigrationStore::create_sentinel_subscription_set(const SubscriptionStore& subs_store)
11,006✔
358
{
11,006✔
359
    std::lock_guard lock{m_mutex};
11,006✔
360
    if (m_state != MigrationState::Migrated) {
11,006✔
361
        return;
362
    }
363
    if (m_sentinel_subscription_set_version) {
364
        return;
365
    }
366
    auto mut_sub = subs_store.get_latest().make_mutable_copy();
367
    auto subscription_set_version = mut_sub.commit().version();
368
    m_sentinel_subscription_set_version.emplace(subscription_set_version);
369

370
    auto tr = m_db->start_write();
371
    auto migration_table = tr->get_table(m_migration_table);
372
    REALM_ASSERT(!migration_table->is_empty());
373
    auto migration_store_obj = migration_table->get_object(0);
374
    migration_store_obj.set(m_sentinel_query_version, *m_sentinel_subscription_set_version);
375
    tr->commit();
376
}
377

378
std::optional<int64_t> MigrationStore::get_sentinel_subscription_set_version()
379
{
380
    std::lock_guard lock{m_mutex};
381
    return m_sentinel_subscription_set_version;
382
}
383

384
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc