• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1767

17 Oct 2023 03:12PM UTC coverage: 91.639% (+0.01%) from 91.629%
1767

push

Evergreen

web-flow
Merge pull request #7060 from sean-brandenburg/BAAS-21435-IQF-In-Migration

94280 of 173488 branches covered (0.0%)

4 of 4 new or added lines in 1 file covered. (100.0%)

47 existing lines in 10 files now uncovered.

230659 of 251705 relevant lines covered (91.64%)

6633439.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.93
/test/object-store/sync/flx_migration.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2023 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <util/crypt_key.hpp>
20
#include <util/sync/baas_admin_api.hpp>
21
#include <util/sync/flx_sync_harness.hpp>
22
#include <util/sync/sync_test_utils.hpp>
23

24
#include <realm/object-store/impl/object_accessor_impl.hpp>
25
#include <realm/object-store/impl/realm_coordinator.hpp>
26
#include <realm/object-store/thread_safe_reference.hpp>
27
#include <realm/object-store/sync/async_open_task.hpp>
28
#include <realm/object-store/util/scheduler.hpp>
29

30
#include <realm/sync/protocol.hpp>
31
#include <realm/sync/noinst/client_history_impl.hpp>
32
#include <realm/sync/noinst/client_reset_operation.hpp>
33

34
#include <realm/util/future.hpp>
35

36
#include <catch2/catch_all.hpp>
37

38
#include <chrono>
39

40
#if REALM_ENABLE_SYNC
41
#if REALM_ENABLE_AUTH_TESTS
42

43
using namespace realm;
44

45
enum MigrationMode { MigrateToFLX, RollbackToPBS };
46

47
static void trigger_server_migration(const AppSession& app_session, MigrationMode switch_mode,
48
                                     const std::shared_ptr<util::Logger>& logger)
49
{
30✔
50
    auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
30✔
51

15✔
52
    REQUIRE(app_session.admin_api.is_sync_enabled(app_session.server_app_id));
30!
53
    app_session.admin_api.migrate_to_flx(app_session.server_app_id, baas_sync_service.id,
30✔
54
                                         switch_mode == MigrateToFLX);
30✔
55

15✔
56
    // While the server migration is in progress, the server cannot be used - wait until the migration
15✔
57
    // is complete. migrated with be populated with the 'isMigrated' value from the complete response
15✔
58
    AdminAPISession::MigrationStatus status;
30✔
59
    std::string last_status;
30✔
60
    std::string op_stg = [switch_mode] {
30✔
61
        if (switch_mode == MigrateToFLX)
30✔
62
            return "PBS->FLX Server migration";
18✔
63
        else
12✔
64
            return "FLX->PBS Server rollback";
12✔
65
    }();
30✔
66
    const int duration = 600; // 10 minutes, for now, since it sometimes takes longer than 300 seconds
30✔
67
    try {
30✔
68
        timed_sleeping_wait_for(
30✔
69
            [&] {
877✔
70
                status = app_session.admin_api.get_migration_status(app_session.server_app_id);
877✔
71
                if (logger && last_status != status.statusMessage) {
877✔
72
                    last_status = status.statusMessage;
112✔
73
                    logger->debug("%1 status: %2", op_stg, last_status);
112✔
74
                }
112✔
75
                return status.complete;
877✔
76
            },
877✔
77
            // Query the migration status every 0.5 seconds for up to 90 seconds
15✔
78
            std::chrono::seconds(duration), std::chrono::milliseconds(500));
30✔
79
    }
30✔
80
    catch (const std::runtime_error&) {
15✔
81
        if (logger)
×
82
            logger->debug("%1 timed out after %2 seconds", op_stg, duration);
×
83
        REQUIRE(false);
×
84
    }
×
85
    if (logger) {
30✔
86
        logger->debug("%1 complete", op_stg);
30✔
87
    }
30✔
88
    REQUIRE((switch_mode == MigrateToFLX) == status.isMigrated);
30!
89
}
30✔
90

91
// Add a set of count number of Object objects to the realm
92
static std::vector<ObjectId> fill_test_data(SyncTestFile& config, std::optional<std::string> partition = std::nullopt,
93
                                            int start = 1, int count = 5)
94
{
14✔
95
    std::vector<ObjectId> ret;
14✔
96
    auto realm = Realm::get_shared_realm(config);
14✔
97
    realm->begin_transaction();
14✔
98
    CppContext c(realm);
14✔
99
    // Add some objects with the provided partition value
7✔
100
    for (int i = 0; i < count; i++, ++start) {
84✔
101
        auto id = ObjectId::gen();
70✔
102
        auto obj = Object::create(
70✔
103
            c, realm, "Object",
70✔
104
            std::any(AnyDict{{"_id", std::any(id)}, {"string_field", util::format("value-%1", start)}}));
70✔
105

35✔
106
        if (partition) {
70✔
107
            obj.set_column_value("realm_id", *partition);
60✔
108
        }
60✔
109
        ret.push_back(id);
70✔
110
    }
70✔
111
    realm->commit_transaction();
14✔
112
    return ret;
14✔
113
}
14✔
114

115

116
TEST_CASE("Test server migration and rollback", "[sync][flx][flx migration][baas]") {
2✔
117
    auto logger_ptr = util::Logger::get_default_logger();
2✔
118

1✔
119
    const std::string base_url = get_base_url();
2✔
120
    const std::string partition1 = "migration-test";
2✔
121
    const std::string partition2 = "another-value";
2✔
122
    const Schema mig_schema{
2✔
123
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
124
                                {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
125
                                {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
126
    };
2✔
127
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
128
    TestAppSession session(create_app(server_app_config));
2✔
129
    SyncTestFile config1(session.app(), partition1, server_app_config.schema);
2✔
130
    SyncTestFile config2(session.app(), partition2, server_app_config.schema);
2✔
131

1✔
132
    // Fill some objects
1✔
133
    auto objects1 = fill_test_data(config1, partition1);    // 5 objects starting at 1
2✔
134
    auto objects2 = fill_test_data(config2, partition2, 6); // 5 objects starting at 6
2✔
135

1✔
136
    auto check_data = [&](SharedRealm& realm, bool check_set1, bool check_set2) {
14✔
137
        auto table = realm->read_group().get_table("class_Object");
14✔
138
        auto partition_col = table->get_column_key("realm_id");
14✔
139
        auto string_col = table->get_column_key("string_field");
14✔
140

7✔
141
        size_t table_size = [check_set1, check_set2] {
14✔
142
            if (check_set1 && check_set2)
14✔
UNCOV
143
                return 10;
×
144
            if (check_set1 || check_set2)
14✔
145
                return 5;
12✔
146
            return 0;
2✔
147
        }();
2✔
148

7✔
149
        REQUIRE(table->size() == table_size);
14!
150
        REQUIRE(bool(table->find_first(partition_col, StringData(partition1))) == check_set1);
14!
151
        REQUIRE(bool(table->find_first(string_col, StringData("value-5"))) == check_set1);
14!
152
        REQUIRE(bool(table->find_first(partition_col, StringData(partition2))) == check_set2);
14!
153
        REQUIRE(bool(table->find_first(string_col, StringData("value-6"))) == check_set2);
14!
154
    };
14✔
155

1✔
156
    // Wait for the two partition sets to upload
1✔
157
    {
2✔
158
        auto realm1 = Realm::get_shared_realm(config1);
2✔
159

1✔
160
        REQUIRE(!wait_for_upload(*realm1));
2!
161
        REQUIRE(!wait_for_download(*realm1));
2!
162

1✔
163
        check_data(realm1, true, false);
2✔
164

1✔
165
        auto realm2 = Realm::get_shared_realm(config2);
2✔
166

1✔
167
        REQUIRE(!wait_for_upload(*realm2));
2!
168
        REQUIRE(!wait_for_download(*realm2));
2!
169

1✔
170
        check_data(realm2, false, true);
2✔
171
    }
2✔
172

1✔
173
    // Migrate to FLX
1✔
174
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
175

1✔
176
    {
2✔
177
        SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema,
2✔
178
                                SyncConfig::FLXSyncEnabled{});
2✔
179

1✔
180
        auto flx_realm = Realm::get_shared_realm(flx_config);
2✔
181
        {
2✔
182
            auto subs = flx_realm->get_latest_subscription_set();
2✔
183
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
184

1✔
185
            REQUIRE(!wait_for_upload(*flx_realm));
2!
186
            REQUIRE(!wait_for_download(*flx_realm));
2!
187

1✔
188
            check_data(flx_realm, false, false);
2✔
189
        }
2✔
190

1✔
191
        {
2✔
192
            auto flx_table = flx_realm->read_group().get_table("class_Object");
2✔
193
            auto mut_subs = flx_realm->get_latest_subscription_set().make_mutable_copy();
2✔
194
            mut_subs.insert_or_assign(
2✔
195
                "flx_migrated_Objects_1",
2✔
196
                Query(flx_table).equal(flx_table->get_column_key("realm_id"), StringData{partition1}));
2✔
197
            auto subs = std::move(mut_subs).commit();
2✔
198
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
199

1✔
200
            REQUIRE(!wait_for_upload(*flx_realm));
2!
201
            REQUIRE(!wait_for_download(*flx_realm));
2!
202
            wait_for_advance(*flx_realm);
2✔
203

1✔
204
            check_data(flx_realm, true, false);
2✔
205
        }
2✔
206

1✔
207
        {
2✔
208
            auto flx_table = flx_realm->read_group().get_table("class_Object");
2✔
209
            auto mut_subs = flx_realm->get_latest_subscription_set().make_mutable_copy();
2✔
210
            mut_subs.clear();
2✔
211
            mut_subs.insert_or_assign(
2✔
212
                "flx_migrated_Objects_2",
2✔
213
                Query(flx_table).equal(flx_table->get_column_key("realm_id"), StringData{partition2}));
2✔
214
            auto subs = std::move(mut_subs).commit();
2✔
215
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
216

1✔
217
            REQUIRE(!wait_for_upload(*flx_realm));
2!
218
            REQUIRE(!wait_for_download(*flx_realm));
2!
219
            wait_for_advance(*flx_realm);
2✔
220

1✔
221
            check_data(flx_realm, false, true);
2✔
222
        }
2✔
223
    }
2✔
224

1✔
225
    // Roll back to PBS
1✔
226
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
227

1✔
228
    // Try to connect as FLX
1✔
229
    {
2✔
230
        SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema,
2✔
231
                                SyncConfig::FLXSyncEnabled{});
2✔
232
        auto [err_promise, err_future] = util::make_promise_future<SyncError>();
2✔
233
        util::CopyablePromiseHolder promise(std::move(err_promise));
2✔
234
        flx_config.sync_config->error_handler =
2✔
235
            [&logger_ptr, error_promise = std::move(promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
2✔
236
                // This situation should return the switch_to_pbs error
1✔
237
                logger_ptr->error("Server rolled back - connect as FLX received error: %1", err.status);
2✔
238
                error_promise.get_promise().emplace_value(std::move(err));
2✔
239
            };
2✔
240
        auto flx_realm = Realm::get_shared_realm(flx_config);
2✔
241
        auto err = wait_for_future(std::move(err_future), std::chrono::seconds(30)).get();
2✔
242
        REQUIRE(err.status == ErrorCodes::WrongSyncType);
2!
243
        REQUIRE(err.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
244
    }
2✔
245

1✔
246
    {
2✔
247
        SyncTestFile pbs_config(session.app(), partition1, server_app_config.schema);
2✔
248
        auto pbs_realm = Realm::get_shared_realm(pbs_config);
2✔
249

1✔
250
        REQUIRE(!wait_for_upload(*pbs_realm));
2!
251
        REQUIRE(!wait_for_download(*pbs_realm));
2!
252

1✔
253
        check_data(pbs_realm, true, false);
2✔
254
    }
2✔
255
    {
2✔
256
        SyncTestFile pbs_config(session.app(), partition2, server_app_config.schema);
2✔
257
        auto pbs_realm = Realm::get_shared_realm(pbs_config);
2✔
258

1✔
259
        REQUIRE(!wait_for_upload(*pbs_realm));
2!
260
        REQUIRE(!wait_for_download(*pbs_realm));
2!
261

1✔
262
        check_data(pbs_realm, false, true);
2✔
263
    }
2✔
264
}
2✔
265

266
TEST_CASE("Test client migration and rollback", "[sync][flx][flx migration][baas]") {
2✔
267
    auto logger_ptr = util::Logger::get_default_logger();
2✔
268

1✔
269
    const std::string base_url = get_base_url();
2✔
270
    const std::string partition = "migration-test";
2✔
271
    const Schema mig_schema{
2✔
272
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
273
                                {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
274
                                {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
275
    };
2✔
276
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
277
    TestAppSession session(create_app(server_app_config));
2✔
278
    SyncTestFile config(session.app(), partition, server_app_config.schema);
2✔
279
    config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
280

1✔
281
    // Fill some objects
1✔
282
    auto objects = fill_test_data(config, partition); // 5 objects starting at 1
2✔
283

1✔
284
    // Wait to upload the data
1✔
285
    {
2✔
286
        auto realm = Realm::get_shared_realm(config);
2✔
287

1✔
288
        REQUIRE(!wait_for_upload(*realm));
2!
289
        REQUIRE(!wait_for_download(*realm));
2!
290

1✔
291
        auto table = realm->read_group().get_table("class_Object");
2✔
292
        REQUIRE(table->size() == 5);
2!
293
    }
2✔
294

1✔
295
    // Migrate to FLX
1✔
296
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
297

1✔
298
    {
2✔
299
        auto realm = Realm::get_shared_realm(config);
2✔
300

1✔
301
        REQUIRE(!wait_for_upload(*realm));
2!
302
        REQUIRE(!wait_for_download(*realm));
2!
303

1✔
304
        auto table = realm->read_group().get_table("class_Object");
2✔
305
        REQUIRE(table->size() == 5);
2!
306
    }
2✔
307

1✔
308
    // Roll back to PBS
1✔
309
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
310

1✔
311
    {
2✔
312
        auto realm = Realm::get_shared_realm(config);
2✔
313

1✔
314
        REQUIRE(!wait_for_upload(*realm));
2!
315
        REQUIRE(!wait_for_download(*realm));
2!
316

1✔
317
        auto table = realm->read_group().get_table("class_Object");
2✔
318
        REQUIRE(table->size() == 5);
2!
319
    }
2✔
320
}
2✔
321

322
TEST_CASE("Test client migration and rollback with recovery", "[sync][flx][flx migration][baas]") {
2✔
323
    auto logger_ptr = util::Logger::get_default_logger();
2✔
324

1✔
325
    const std::string base_url = get_base_url();
2✔
326
    const std::string partition = "migration-test";
2✔
327
    const Schema mig_schema{
2✔
328
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
329
                                {"string_field", PropertyType::String | PropertyType::Nullable}}),
2✔
330
    };
2✔
331
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
332
    TestAppSession session(create_app(server_app_config));
2✔
333
    SyncTestFile config(session.app(), partition, server_app_config.schema);
2✔
334
    config.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
335

1✔
336
    // Fill some objects
1✔
337
    auto objects = fill_test_data(config); // 5 objects starting at 1 with no partition value set
2✔
338
    // Primary key of the object to recover
1✔
339
    auto obj_id = ObjectId::gen();
2✔
340

1✔
341
    // Keep this realm around for after the revert to PBS
1✔
342
    auto outer_realm = Realm::get_shared_realm(config);
2✔
343
    REQUIRE(!wait_for_upload(*outer_realm));
2!
344
    REQUIRE(!wait_for_download(*outer_realm));
2!
345

1✔
346
    // Wait to upload the data
1✔
347
    {
2✔
348
        auto table = outer_realm->read_group().get_table("class_Object");
2✔
349
        REQUIRE(table->size() == 5);
2!
350

1✔
351
        // Pause the sync session and make a change.
1✔
352
        // This will be recovered when it is resumed after the migration.
1✔
353
        outer_realm->sync_session()->pause();
2✔
354
        outer_realm->begin_transaction();
2✔
355
        outer_realm->read_group()
2✔
356
            .get_table("class_Object")
2✔
357
            ->create_object_with_primary_key(obj_id)
2✔
358
            .set("string_field", "partition-set-during-sync-upload");
2✔
359
        outer_realm->commit_transaction();
2✔
360
    }
2✔
361

1✔
362
    // Migrate to FLX
1✔
363
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
364

1✔
365
    // Resume the session and verify the additional object was uploaded after the migration
1✔
366
    outer_realm->sync_session()->resume();
2✔
367
    REQUIRE(!wait_for_upload(*outer_realm));
2!
368
    REQUIRE(!wait_for_download(*outer_realm));
2!
369

1✔
370
    {
2✔
371
        auto sync_session = outer_realm->sync_session();
2✔
372
        REQUIRE(sync_session);
2!
373
        auto sub_store = sync_session->get_flx_subscription_store();
2✔
374
        REQUIRE(sub_store);
2!
375
        auto active_subs = sub_store->get_active();
2✔
376
        REQUIRE(active_subs.size() == 1);
2!
377
        REQUIRE(active_subs.find("flx_migrated_Object"));
2!
378

1✔
379
        auto table = outer_realm->read_group().get_table("class_Object");
2✔
380
        REQUIRE(table->size() == 6);
2!
381

1✔
382
        auto object_table = outer_realm->read_group().get_table("class_Object");
2✔
383
        auto pending_object = object_table->get_object_with_primary_key(obj_id);
2✔
384
        REQUIRE(pending_object.get<String>("string_field") == "partition-set-during-sync-upload");
2!
385
    }
2✔
386

1✔
387
    // Pause the sync session so a pending subscription and object can be created
1✔
388
    // before processing the rollback
1✔
389
    outer_realm->sync_session()->pause();
2✔
390
    util::Future<sync::SubscriptionSet::State> new_subs_future = [&] {
2✔
391
        auto sub_store = outer_realm->sync_session()->get_flx_subscription_store();
2✔
392
        auto mut_subs = sub_store->get_active().make_mutable_copy();
2✔
393

1✔
394
        auto object_table = outer_realm->read_group().get_table("class_Object");
2✔
395
        auto string_col_key = object_table->get_column_key("string_field");
2✔
396
        mut_subs.insert_or_assign("dummy_subs", Query(object_table).equal(string_col_key, StringData{"some-value"}));
2✔
397
        auto new_subs = mut_subs.commit();
2✔
398
        return new_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
399
    }();
2✔
400

1✔
401
    // Add a local object while the session is paused. This will be recovered when connecting after the rollback.
1✔
402
    {
2✔
403
        outer_realm->begin_transaction();
2✔
404
        outer_realm->read_group()
2✔
405
            .get_table("class_Object")
2✔
406
            ->create_object_with_primary_key(ObjectId::gen())
2✔
407
            .set("string_field", "partition-set-by-pbs");
2✔
408
        outer_realm->commit_transaction();
2✔
409
    }
2✔
410

1✔
411
    // Wait for the object to be written to Atlas/MongoDB before rollback, otherwise it may be lost
1✔
412
    reset_utils::wait_for_object_to_persist_to_atlas(session.app()->current_user(), session.app_session(), "Object",
2✔
413
                                                     {{"_id", obj_id}});
2✔
414

1✔
415
    //  Roll back to PBS
1✔
416
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
417

1✔
418
    // Connect after rolling back to PBS
1✔
419
    outer_realm->sync_session()->resume();
2✔
420
    REQUIRE(!wait_for_upload(*outer_realm));
2!
421
    REQUIRE(!wait_for_download(*outer_realm));
2!
422

1✔
423
    {
2✔
424
        auto table = outer_realm->read_group().get_table("class_Object");
2✔
425
        REQUIRE(table->size() == 7);
2!
426

1✔
427
        // Verify the internal sync session subscription store has been cleared
1✔
428
        auto sync_session = outer_realm->sync_session();
2✔
429
        REQUIRE(sync_session);
2!
430
        auto sub_store = SyncSession::OnlyForTesting::get_subscription_store_base(*sync_session);
2✔
431
        REQUIRE(sub_store);
2!
432
        auto active_subs = sub_store->get_latest();
2✔
433
        REQUIRE(active_subs.size() == 0);
2!
434
        REQUIRE(active_subs.version() == 0);
2!
435

1✔
436
        auto result = wait_for_future(std::move(new_subs_future)).get_no_throw();
2✔
437
        REALM_ASSERT(result.is_ok());
2✔
438
        REALM_ASSERT(result.get_value() == sync::SubscriptionSet::State::Superseded);
2✔
439
    }
2✔
440

1✔
441
    //  Migrate back to FLX - and keep the realm session open
1✔
442
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
443

1✔
444
    REQUIRE(!wait_for_upload(*outer_realm));
2!
445
    REQUIRE(!wait_for_download(*outer_realm));
2!
446

1✔
447
    // Verify data has been sync'ed and there is only 1 subscription for the Object table
1✔
448
    {
2✔
449
        auto table = outer_realm->read_group().get_table("class_Object");
2✔
450
        REQUIRE(table->size() == 7);
2!
451
        auto sync_session = outer_realm->sync_session();
2✔
452
        REQUIRE(sync_session);
2!
453
        auto sub_store = sync_session->get_flx_subscription_store();
2✔
454
        REQUIRE(sub_store);
2!
455
        auto active_subs = sub_store->get_active();
2✔
456
        REQUIRE(active_subs.size() == 1);
2!
457
        REQUIRE(active_subs.find("flx_migrated_Object"));
2!
458
    }
2✔
459

1✔
460
    // Roll back to PBS once again - and keep the realm session open
1✔
461
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
462

1✔
463
    REQUIRE(!wait_for_upload(*outer_realm));
2!
464
    REQUIRE(!wait_for_download(*outer_realm));
2!
465

1✔
466
    {
2✔
467
        auto table = outer_realm->read_group().get_table("class_Object");
2✔
468
        REQUIRE(table->size() == 7);
2!
469
    }
2✔
470
}
2✔
471

472
TEST_CASE("An interrupted migration or rollback can recover on the next session",
473
          "[sync][flx][flx migration][baas]") {
2✔
474
    auto logger_ptr = util::Logger::get_default_logger();
2✔
475

1✔
476
    const std::string base_url = get_base_url();
2✔
477
    const std::string partition = "migration-test";
2✔
478
    const Schema mig_schema{
2✔
479
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
480
                                {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
481
                                {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
482
    };
2✔
483
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
484
    TestAppSession session(create_app(server_app_config));
2✔
485
    SyncTestFile config(session.app(), partition, server_app_config.schema);
2✔
486
    config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
487

1✔
488
    // Fill some objects
1✔
489
    auto objects = fill_test_data(config, partition);
2✔
490

1✔
491
    // Wait to upload the data
1✔
492
    {
2✔
493
        auto realm = Realm::get_shared_realm(config);
2✔
494

1✔
495
        REQUIRE(!wait_for_upload(*realm));
2!
496
        REQUIRE(!wait_for_download(*realm));
2!
497

1✔
498
        auto table = realm->read_group().get_table("class_Object");
2✔
499
        CHECK(table->size() == 5);
2!
500
    }
2✔
501

1✔
502
    // Migrate to FLX
1✔
503
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
504

1✔
505
    auto error_event_hook = [&config](sync::ProtocolError error, int& error_count) {
4✔
506
        return [&config, &error_count, error](std::weak_ptr<SyncSession> weak_session,
4✔
507
                                              const SyncClientHookData& data) mutable {
32✔
508
            if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
32✔
509
                return SyncClientHookAction::NoAction;
24✔
510
            }
24✔
511
            auto session = weak_session.lock();
8✔
512
            REQUIRE(session);
8!
513

4✔
514
            if (session->path() != config.path) {
8✔
515
                return SyncClientHookAction::NoAction;
×
516
            }
×
517

4✔
518
            auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
8✔
519

4✔
520
            if (error_code == sync::ProtocolError::initial_sync_not_completed) {
8✔
521
                return SyncClientHookAction::NoAction;
×
522
            }
×
523

4✔
524
            REQUIRE(error_code == error);
8!
525
            ++error_count;
8✔
526
            return SyncClientHookAction::NoAction;
8✔
527
        };
8✔
528
    };
4✔
529

1✔
530
    // Session is interrupted before the migration is completed.
1✔
531
    {
2✔
532
        auto error_count = 0;
2✔
533
        config.sync_config->on_sync_client_event_hook =
2✔
534
            error_event_hook(sync::ProtocolError::migrate_to_flx, error_count);
2✔
535
        auto realm = Realm::get_shared_realm(config);
2✔
536

1✔
537
        timed_wait_for([&] {
834,674✔
538
            return util::File::exists(_impl::ClientResetOperation::get_fresh_path_for(config.path));
834,674✔
539
        });
834,674✔
540

1✔
541
        // Pause then resume the session. This triggers the server to send a new client reset request.
1✔
542
        realm->sync_session()->pause();
2✔
543
        realm->sync_session()->resume();
2✔
544

1✔
545
        REQUIRE(!wait_for_upload(*realm));
2!
546
        REQUIRE(!wait_for_download(*realm));
2!
547

1✔
548
        auto table = realm->read_group().get_table("class_Object");
2✔
549
        CHECK(table->size() == 5);
2!
550

1✔
551
        // Client reset is requested twice.
1✔
552
        REQUIRE(error_count == 2);
2!
553
    }
2✔
554

1✔
555
    //  Roll back to PBS
1✔
556
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
557

1✔
558
    // Session is interrupted before the rollback is completed.
1✔
559
    {
2✔
560
        auto error_count = 0;
2✔
561
        config.sync_config->on_sync_client_event_hook =
2✔
562
            error_event_hook(sync::ProtocolError::revert_to_pbs, error_count);
2✔
563
        auto realm = Realm::get_shared_realm(config);
2✔
564

1✔
565
        timed_wait_for([&] {
774,618✔
566
            return util::File::exists(_impl::ClientResetOperation::get_fresh_path_for(config.path));
774,618✔
567
        });
774,618✔
568

1✔
569
        // Pause then resume the session. This triggers the server to send a new client reset request.
1✔
570
        realm->sync_session()->pause();
2✔
571
        realm->sync_session()->resume();
2✔
572

1✔
573
        REQUIRE(!wait_for_upload(*realm));
2!
574
        REQUIRE(!wait_for_download(*realm));
2!
575

1✔
576
        auto table = realm->read_group().get_table("class_Object");
2✔
577
        CHECK(table->size() == 5);
2!
578

1✔
579
        // Client reset is requested twice.
1✔
580
        REQUIRE(error_count == 2);
2!
581
    }
2✔
582
}
2✔
583

584
TEST_CASE("Update to native FLX after migration", "[sync][flx][flx migration][baas]") {
2✔
585
    auto logger_ptr = util::Logger::get_default_logger();
2✔
586

1✔
587
    const std::string base_url = get_base_url();
2✔
588
    const std::string partition = "migration-test";
2✔
589
    const Schema mig_schema{
2✔
590
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
591
                                {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
592
                                {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
593
    };
2✔
594
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
595
    TestAppSession session(create_app(server_app_config));
2✔
596
    SyncTestFile config(session.app(), partition, server_app_config.schema);
2✔
597
    config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
598

1✔
599
    // Fill some objects
1✔
600
    auto objects = fill_test_data(config, partition); // 5 objects starting at 1
2✔
601

1✔
602
    // Wait to upload the data
1✔
603
    {
2✔
604
        auto realm = Realm::get_shared_realm(config);
2✔
605

1✔
606
        REQUIRE(!wait_for_upload(*realm));
2!
607
        REQUIRE(!wait_for_download(*realm));
2!
608

1✔
609
        auto table = realm->read_group().get_table("class_Object");
2✔
610
        CHECK(table->size() == 5);
2!
611
    }
2✔
612

1✔
613
    // Migrate to FLX
1✔
614
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
615

1✔
616
    {
2✔
617
        auto realm = Realm::get_shared_realm(config);
2✔
618

1✔
619
        REQUIRE(!wait_for_upload(*realm));
2!
620
        REQUIRE(!wait_for_download(*realm));
2!
621

1✔
622
        auto table = realm->read_group().get_table("class_Object");
2✔
623
        CHECK(table->size() == 5);
2!
624

1✔
625
        // Close the sync session and make a change. This will be recovered by the migration.
1✔
626
        realm->sync_session()->force_close();
2✔
627
        realm->begin_transaction();
2✔
628
        realm->read_group()
2✔
629
            .get_table("class_Object")
2✔
630
            ->create_object_with_primary_key(ObjectId::gen())
2✔
631
            .set("string_field", "flx_migration_object");
2✔
632
        realm->commit_transaction();
2✔
633
    }
2✔
634

1✔
635
    // Update to native FLX
1✔
636
    {
2✔
637
        SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema,
2✔
638
                                SyncConfig::FLXSyncEnabled{});
2✔
639
        flx_config.path = config.path;
2✔
640

1✔
641
        auto realm = Realm::get_shared_realm(flx_config);
2✔
642

1✔
643
        realm->begin_transaction();
2✔
644
        realm->read_group()
2✔
645
            .get_table("class_Object")
2✔
646
            ->create_object_with_primary_key(ObjectId::gen())
2✔
647
            .set("realm_id", partition)
2✔
648
            .set("string_field", "flx_native_object");
2✔
649
        ;
2✔
650
        realm->commit_transaction();
2✔
651

1✔
652
        REQUIRE(!wait_for_upload(*realm));
2!
653
        REQUIRE(!wait_for_download(*realm));
2!
654

1✔
655
        auto table = realm->read_group().get_table("class_Object");
2✔
656
        CHECK(table->size() == 7);
2!
657
    }
2✔
658

1✔
659
    // Open a new realm and check all data is sync'ed.
1✔
660
    {
2✔
661
        SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema,
2✔
662
                                SyncConfig::FLXSyncEnabled{});
2✔
663

1✔
664
        auto flx_realm = Realm::get_shared_realm(flx_config);
2✔
665

1✔
666
        auto flx_table = flx_realm->read_group().get_table("class_Object");
2✔
667
        auto mut_subs = flx_realm->get_latest_subscription_set().make_mutable_copy();
2✔
668
        auto partition_col_key = flx_table->get_column_key("realm_id");
2✔
669
        mut_subs.insert_or_assign("flx_migrated_Object",
2✔
670
                                  Query(flx_table).equal(partition_col_key, StringData{partition}));
2✔
671
        auto subs = std::move(mut_subs).commit();
2✔
672
        subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
673

1✔
674
        flx_realm->refresh();
2✔
675
        auto table = flx_realm->read_group().get_table("class_Object");
2✔
676
        CHECK(table->size() == 7);
2!
677
    }
2✔
678

1✔
679
    //  Roll back to PBS
1✔
680
    trigger_server_migration(session.app_session(), RollbackToPBS, logger_ptr);
2✔
681

1✔
682
    // Connect again as native FLX: server replies with SwitchToPBS
1✔
683
    {
2✔
684
        SyncTestFile flx_config(session.app()->current_user(), server_app_config.schema,
2✔
685
                                SyncConfig::FLXSyncEnabled{});
2✔
686
        flx_config.path = config.path;
2✔
687

1✔
688
        auto [err_promise, err_future] = util::make_promise_future<SyncError>();
2✔
689
        util::CopyablePromiseHolder promise(std::move(err_promise));
2✔
690
        flx_config.sync_config->error_handler =
2✔
691
            [&logger_ptr, error_promise = std::move(promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
2✔
692
                // This situation should return the switch_to_pbs error
1✔
693
                logger_ptr->error("Server rolled back - connect as FLX received error: %1", err.status);
2✔
694
                error_promise.get_promise().emplace_value(std::move(err));
2✔
695
            };
2✔
696
        auto flx_realm = Realm::get_shared_realm(flx_config);
2✔
697
        auto err = wait_for_future(std::move(err_future), std::chrono::seconds(30)).get();
2✔
698
        REQUIRE(err.status == ErrorCodes::WrongSyncType);
2!
699
        REQUIRE(err.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
700
    }
2✔
701
}
2✔
702

703
TEST_CASE("New table is synced after migration", "[sync][flx][flx migration][baas]") {
2✔
704
    auto logger_ptr = util::Logger::get_default_logger();
2✔
705

1✔
706
    const std::string base_url = get_base_url();
2✔
707
    const std::string partition = "migration-test";
2✔
708
    const Schema mig_schema{
2✔
709
        ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
710
                                {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
711
                                {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
712
    };
2✔
713
    auto server_app_config = minimal_app_config(base_url, "server_migrate_rollback", mig_schema);
2✔
714
    TestAppSession session(create_app(server_app_config));
2✔
715
    SyncTestFile config(session.app(), partition, server_app_config.schema);
2✔
716
    config.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
717

1✔
718
    // Fill some objects
1✔
719
    auto objects = fill_test_data(config, partition); // 5 objects starting at 1
2✔
720

1✔
721
    // Wait to upload the data
1✔
722
    {
2✔
723
        auto realm = Realm::get_shared_realm(config);
2✔
724

1✔
725
        REQUIRE(!wait_for_upload(*realm));
2!
726
        REQUIRE(!wait_for_download(*realm));
2!
727

1✔
728
        auto table = realm->read_group().get_table("class_Object");
2✔
729
        CHECK(table->size() == 5);
2!
730
    }
2✔
731

1✔
732
    // Migrate to FLX
1✔
733
    trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
734

1✔
735
    {
2✔
736
        auto realm = Realm::get_shared_realm(config);
2✔
737

1✔
738
        REQUIRE(!wait_for_upload(*realm));
2!
739
        REQUIRE(!wait_for_download(*realm));
2!
740

1✔
741
        auto table = realm->read_group().get_table("class_Object");
2✔
742
        CHECK(table->size() == 5);
2!
743
    }
2✔
744

1✔
745
    // Open a new realm with an additional table.
1✔
746
    {
2✔
747
        const Schema schema{
2✔
748
            ObjectSchema("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
749
                                    {"string_field", PropertyType::String | PropertyType::Nullable},
2✔
750
                                    {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
751
            ObjectSchema("Object2", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
752
                                     {"realm_id", PropertyType::String | PropertyType::Nullable}}),
2✔
753
        };
2✔
754
        SyncTestFile flx_config(session.app()->current_user(), schema, SyncConfig::FLXSyncEnabled{});
2✔
755

1✔
756
        auto flx_realm = Realm::get_shared_realm(flx_config);
2✔
757

1✔
758
        // Create a subscription for the new table.
1✔
759
        auto table = flx_realm->read_group().get_table("class_Object2");
2✔
760
        auto mut_subs = flx_realm->get_latest_subscription_set().make_mutable_copy();
2✔
761
        mut_subs.insert_or_assign(Query(table).equal(table->get_column_key("realm_id"), StringData{partition}));
2✔
762

1✔
763
        auto subs = std::move(mut_subs).commit();
2✔
764
        subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
765

1✔
766
        wait_for_upload(*flx_realm);
2✔
767

1✔
768
        // Create one object of the new table type.
1✔
769
        flx_realm->begin_transaction();
2✔
770
        flx_realm->read_group()
2✔
771
            .get_table("class_Object2")
2✔
772
            ->create_object_with_primary_key(ObjectId::gen())
2✔
773
            .set("realm_id", partition);
2✔
774
        flx_realm->commit_transaction();
2✔
775

1✔
776
        wait_for_upload(*flx_realm);
2✔
777
        wait_for_download(*flx_realm);
2✔
778
    }
2✔
779

1✔
780
    // Open the migrated realm and sync the new table and data.
1✔
781
    {
2✔
782
        auto realm = Realm::get_shared_realm(config);
2✔
783

1✔
784
        REQUIRE(!wait_for_upload(*realm));
2!
785
        REQUIRE(!wait_for_download(*realm));
2!
786

1✔
787
        auto table = realm->read_group().get_table("class_Object");
2✔
788
        CHECK(table->size() == 5);
2!
789
        auto table2 = realm->read_group().get_table("class_Object2");
2✔
790
        CHECK(table2->size() == 1);
2!
791
        auto sync_session = realm->sync_session();
2✔
792
        REQUIRE(sync_session);
2!
793
        auto sub_store = sync_session->get_flx_subscription_store();
2✔
794
        REQUIRE(sub_store);
2!
795
        auto active_subs = sub_store->get_active();
2✔
796
        REQUIRE(active_subs.size() == 2);
2!
797
        REQUIRE(active_subs.find("flx_migrated_Object2"));
2!
798
    }
2✔
799
}
2✔
800

801
// There is a sequence of events where we tried to open a frozen Realm with new
802
// object types in the schema and this fails schema validation causing client reset
803
// to fail.
804
//   - Add a new class to the schema, but use async open to initiate
805
//     sync without any schema
806
//   - Have the server send a client reset.
807
//   - The client tries to populate the notify_before callback with a frozen Realm using
808
//     the schema with the new class, but the class is not stored on disk yet.
809
// This hits the update_schema() check that makes sure that the frozen Realm's schema is
810
// a subset of the one found on disk. Since it is not, a schema exception is thrown
811
// which is eventually forwarded to the sync error handler and client reset fails.
812
TEST_CASE("Async open + client reset", "[sync][flx][flx migration][baas]") {
4✔
813
    auto logger_ptr = util::Logger::get_default_logger();
4✔
814

2✔
815
    const std::string base_url = get_base_url();
4✔
816
    const std::string partition = "async-open-migration-test";
4✔
817
    ObjectSchema shared_object("Object", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
4✔
818
                                          {"string_field", PropertyType::String | PropertyType::Nullable},
4✔
819
                                          {"realm_id", PropertyType::String | PropertyType::Nullable}});
4✔
820
    const Schema mig_schema{shared_object};
4✔
821
    size_t num_before_reset_notifications = 0;
4✔
822
    size_t num_after_reset_notifications = 0;
4✔
823
    auto server_app_config = minimal_app_config(base_url, "async_open_during_migration", mig_schema);
4✔
824
    std::optional<SyncTestFile> config; // destruct this after the sessions are torn down
4✔
825
    TestAppSession session(create_app(server_app_config));
4✔
826
    config.emplace(session.app(), partition, server_app_config.schema);
4✔
827
    config->sync_config->client_resync_mode = ClientResyncMode::Recover;
4✔
828
    config->sync_config->notify_before_client_reset = [&](SharedRealm before) {
4✔
829
        logger_ptr->debug("notify_before_client_reset");
4✔
830
        REQUIRE(before);
4!
831
        REQUIRE(before->is_frozen());
4!
832
        auto table = before->read_group().get_table("class_Object");
4✔
833
        CHECK(table);
4!
834
        ++num_before_reset_notifications;
4✔
835
    };
4✔
836
    config->sync_config->notify_after_client_reset = [&](SharedRealm before, ThreadSafeReference after_ref,
4✔
837
                                                         bool did_recover) {
4✔
838
        logger_ptr->debug("notify_after_client_reset");
4✔
839
        CHECK(did_recover);
4!
840
        REQUIRE(before);
4!
841
        auto table_before = before->read_group().get_table("class_Object");
4✔
842
        CHECK(table_before);
4!
843
        SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_default());
4✔
844
        REQUIRE(after);
4!
845
        auto table_after = after->read_group().get_table("class_Object");
4✔
846
        REQUIRE(table_after);
4!
847
        ++num_after_reset_notifications;
4✔
848
    };
4✔
849

2✔
850
    ObjectSchema locally_added("LocallyAdded", {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
4✔
851
                                                {"string_field", PropertyType::String | PropertyType::Nullable},
4✔
852
                                                {"realm_id", PropertyType::String | PropertyType::Nullable}});
4✔
853

2✔
854
    SECTION("no initial state") {
4✔
855
        // Migrate to FLX
1✔
856
        trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
857
        shared_object.persisted_properties.push_back({"oid_field", PropertyType::ObjectId | PropertyType::Nullable});
2✔
858
        config->schema = {shared_object, locally_added};
2✔
859

1✔
860
        async_open_realm(*config, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
861
            REQUIRE(ref);
2!
862
            REQUIRE_FALSE(error);
2!
863

1✔
864
            auto realm = Realm::get_shared_realm(std::move(ref));
2✔
865

1✔
866
            auto table = realm->read_group().get_table("class_Object");
2✔
867
            REQUIRE(table->size() == 0);
2!
868
            REQUIRE(num_before_reset_notifications == 1);
2!
869
            REQUIRE(num_after_reset_notifications == 1);
2!
870

1✔
871
            auto locally_added_table = realm->read_group().get_table("class_LocallyAdded");
2✔
872
            REQUIRE(locally_added_table);
2!
873
            REQUIRE(locally_added_table->size() == 0);
2!
874
        });
2✔
875
    }
2✔
876

2✔
877
    SECTION("initial state") {
4✔
878
        {
2✔
879
            config->schema = {shared_object};
2✔
880
            auto realm = Realm::get_shared_realm(*config);
2✔
881
            realm->begin_transaction();
2✔
882
            auto table = realm->read_group().get_table("class_Object");
2✔
883
            table->create_object_with_primary_key(ObjectId::gen());
2✔
884
            realm->commit_transaction();
2✔
885
            wait_for_upload(*realm);
2✔
886
        }
2✔
887
        trigger_server_migration(session.app_session(), MigrateToFLX, logger_ptr);
2✔
888
        {
2✔
889
            shared_object.persisted_properties.push_back(
2✔
890
                {"oid_field", PropertyType::ObjectId | PropertyType::Nullable});
2✔
891
            config->schema = {shared_object, locally_added};
2✔
892

1✔
893
            async_open_realm(*config, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
894
                REQUIRE(ref);
2!
895
                REQUIRE_FALSE(error);
2!
896

1✔
897
                auto realm = Realm::get_shared_realm(std::move(ref));
2✔
898

1✔
899
                auto table = realm->read_group().get_table("class_Object");
2✔
900
                REQUIRE(table->size() == 1);
2!
901
                REQUIRE(num_before_reset_notifications == 1);
2!
902
                REQUIRE(num_after_reset_notifications == 1);
2!
903

1✔
904
                auto locally_added_table = realm->read_group().get_table("class_LocallyAdded");
2✔
905
                REQUIRE(locally_added_table);
2!
906
                REQUIRE(locally_added_table->size() == 0);
2!
907
            });
2✔
908
        }
2✔
909
    }
2✔
910
}
4✔
911

912
#endif // REALM_ENABLE_AUTH_TESTS
913
#endif // REALM_ENABLE_SYNC
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc