• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1751

11 Oct 2023 08:13PM UTC coverage: 91.585% (+0.02%) from 91.563%
1751

push

Evergreen

web-flow
Merge pull request #7031 from realm/tg/commit-notify

Simplify internal commit notification

94262 of 173480 branches covered (0.0%)

425 of 432 new or added lines in 20 files covered. (98.38%)

37 existing lines in 12 files now uncovered.

230487 of 251665 relevant lines covered (91.58%)

6531054.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.36
/test/object-store/sync/flx_sync.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2021 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#if REALM_ENABLE_AUTH_TESTS
20

21
#include <catch2/catch_all.hpp>
22

23
#include <util/test_file.hpp>
24
#include <util/crypt_key.hpp>
25
#include <util/sync/flx_sync_harness.hpp>
26
#include <util/sync/sync_test_utils.hpp>
27

28
#include <realm/object_id.hpp>
29
#include <realm/query_expression.hpp>
30

31
#include <realm/object-store/binding_context.hpp>
32
#include <realm/object-store/impl/object_accessor_impl.hpp>
33
#include <realm/object-store/impl/realm_coordinator.hpp>
34
#include <realm/object-store/schema.hpp>
35
#include <realm/object-store/sync/generic_network_transport.hpp>
36
#include <realm/object-store/sync/mongo_client.hpp>
37
#include <realm/object-store/sync/mongo_database.hpp>
38
#include <realm/object-store/sync/mongo_collection.hpp>
39
#include <realm/object-store/sync/async_open_task.hpp>
40
#include <realm/object-store/util/bson/bson.hpp>
41
#include <realm/object-store/sync/sync_session.hpp>
42

43
#include <realm/sync/client_base.hpp>
44
#include <realm/sync/config.hpp>
45
#include <realm/sync/noinst/client_history_impl.hpp>
46
#include <realm/sync/noinst/client_reset.hpp>
47
#include <realm/sync/noinst/client_reset_operation.hpp>
48
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
49
#include <realm/sync/noinst/server/access_token.hpp>
50
#include <realm/sync/protocol.hpp>
51
#include <realm/sync/subscriptions.hpp>
52

53
#include <realm/util/future.hpp>
54
#include <realm/util/logger.hpp>
55

56
#include <catch2/catch_all.hpp>
57

58
#include <filesystem>
59
#include <iostream>
60
#include <stdexcept>
61

62
using namespace std::string_literals;
63

64
namespace realm {
65

66
class TestHelper {
67
public:
68
    static DBRef& get_db(SharedRealm const& shared_realm)
69
    {
70
        return Realm::Internal::get_db(*shared_realm);
71
    }
72
};
73

74
} // namespace realm
75

76
namespace realm::app {
77

78
namespace {
79
const Schema g_minimal_schema{
80
    {"TopLevel",
81
     {
82
         {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
83
     }},
84
};
85

86
const Schema g_large_array_schema{
87
    ObjectSchema("TopLevel",
88
                 {
89
                     {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
90
                     {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
91
                     {"list_of_strings", PropertyType::Array | PropertyType::String},
92
                 }),
93
};
94

95
const Schema g_simple_embedded_obj_schema{
96
    {"TopLevel",
97
     {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
98
      {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
99
      {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
100
    {"TopLevel_embedded_obj",
101
     ObjectSchema::ObjectType::Embedded,
102
     {
103
         {"str_field", PropertyType::String | PropertyType::Nullable},
104
     }},
105
};
106

107
// Populates a FLXSyncTestHarness with the g_large_array_schema with objects that are large enough that
108
// they are guaranteed to fill multiple bootstrap download messages. Currently this means generating 5
109
// objects each with 1024 array entries of 1024 bytes each.
110
//
111
// Returns a list of the _id values for the objects created.
112
std::vector<ObjectId> fill_large_array_schema(FLXSyncTestHarness& harness)
113
{
12✔
114
    std::vector<ObjectId> ret;
12✔
115
    REQUIRE(harness.schema() == g_large_array_schema);
12!
116
    harness.load_initial_data([&](SharedRealm realm) {
12✔
117
        CppContext c(realm);
12✔
118
        for (int i = 0; i < 5; ++i) {
72✔
119
            auto id = ObjectId::gen();
60✔
120
            auto obj = Object::create(c, realm, "TopLevel",
60✔
121
                                      std::any(AnyDict{{"_id", id},
60✔
122
                                                       {"list_of_strings", AnyVector{}},
60✔
123
                                                       {"queryable_int_field", static_cast<int64_t>(i * 5)}}));
60✔
124
            List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings"));
60✔
125
            for (int j = 0; j < 1024; ++j) {
61,500✔
126
                str_list.add(c, std::any(std::string(1024, 'a' + (j % 26))));
61,440✔
127
            }
61,440✔
128

30✔
129
            ret.push_back(id);
60✔
130
        }
60✔
131
    });
12✔
132
    return ret;
12✔
133
}
12✔
134

135
} // namespace
136

137
TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][baas]") {
2✔
138
    FLXSyncTestHarness harness("basic_flx_connect");
2✔
139

1✔
140
    auto foo_obj_id = ObjectId::gen();
2✔
141
    auto bar_obj_id = ObjectId::gen();
2✔
142
    harness.load_initial_data([&](SharedRealm realm) {
2✔
143
        CppContext c(realm);
2✔
144
        Object::create(c, realm, "TopLevel",
2✔
145
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
146
                                        {"queryable_str_field", "foo"s},
2✔
147
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
148
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
149
        Object::create(c, realm, "TopLevel",
2✔
150
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
151
                                        {"queryable_str_field", "bar"s},
2✔
152
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
153
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
154
    });
2✔
155

1✔
156

1✔
157
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
158
        wait_for_download(*realm);
2✔
159
        {
2✔
160
            auto empty_subs = realm->get_latest_subscription_set();
2✔
161
            CHECK(empty_subs.size() == 0);
2!
162
            CHECK(empty_subs.version() == 0);
2!
163
            empty_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
164
        }
2✔
165

1✔
166
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
167
        auto col_key = table->get_column_key("queryable_str_field");
2✔
168
        Query query_foo(table);
2✔
169
        query_foo.equal(col_key, "foo");
2✔
170
        {
2✔
171
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
172
            new_subs.insert_or_assign(query_foo);
2✔
173
            auto subs = new_subs.commit();
2✔
174
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
175
        }
2✔
176

1✔
177
        wait_for_download(*realm);
2✔
178
        {
2✔
179
            wait_for_advance(*realm);
2✔
180
            Results results(realm, table);
2✔
181
            CHECK(results.size() == 1);
2!
182
            auto obj = results.get<Obj>(0);
2✔
183
            CHECK(obj.is_valid());
2!
184
            CHECK(obj.get<ObjectId>("_id") == foo_obj_id);
2!
185
        }
2✔
186

1✔
187
        {
2✔
188
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
189
            Query new_query_bar(table);
2✔
190
            new_query_bar.equal(col_key, "bar");
2✔
191
            mut_subs.insert_or_assign(new_query_bar);
2✔
192
            auto subs = mut_subs.commit();
2✔
193
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
194
        }
2✔
195

1✔
196
        {
2✔
197
            wait_for_advance(*realm);
2✔
198
            Results results(realm, Query(table));
2✔
199
            CHECK(results.size() == 2);
2!
200
        }
2✔
201

1✔
202
        {
2✔
203
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
204
            CHECK(mut_subs.erase(query_foo));
2!
205
            Query new_query_bar(table);
2✔
206
            new_query_bar.equal(col_key, "bar");
2✔
207
            mut_subs.insert_or_assign(new_query_bar);
2✔
208
            auto subs = mut_subs.commit();
2✔
209
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
210
        }
2✔
211

1✔
212
        {
2✔
213
            wait_for_advance(*realm);
2✔
214
            Results results(realm, Query(table));
2✔
215
            CHECK(results.size() == 1);
2!
216
            auto obj = results.get<Obj>(0);
2✔
217
            CHECK(obj.is_valid());
2!
218
            CHECK(obj.get<ObjectId>("_id") == bar_obj_id);
2!
219
        }
2✔
220

1✔
221
        {
2✔
222
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
223
            mut_subs.clear();
2✔
224
            auto subs = mut_subs.commit();
2✔
225
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
226
        }
2✔
227

1✔
228
        {
2✔
229
            wait_for_advance(*realm);
2✔
230
            Results results(realm, table);
2✔
231
            CHECK(results.size() == 0);
2!
232
        }
2✔
233
    });
2✔
234
}
2✔
235

236
TEST_CASE("flx: test commands work", "[sync][flx][test command][baas]") {
2✔
237
    FLXSyncTestHarness harness("test_commands");
2✔
238
    harness.do_with_new_realm([&](const SharedRealm& realm) {
2✔
239
        wait_for_upload(*realm);
2✔
240
        nlohmann::json command_request = {
2✔
241
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
242
        };
2✔
243
        auto resp_body =
2✔
244
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), command_request.dump()).get();
2✔
245
        REQUIRE(resp_body == "{}");
2!
246

1✔
247
        auto bad_status =
2✔
248
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "foobar: }").get_no_throw();
2✔
249
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
250
        REQUIRE_THAT(bad_status.get_status().reason(),
2✔
251
                     Catch::Matchers::ContainsSubstring("Invalid json input to send_test_command"));
2✔
252

1✔
253
        bad_status =
2✔
254
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "{\"cmd\": \"\"}").get_no_throw();
2✔
255
        REQUIRE_FALSE(bad_status.is_ok());
2!
256
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
257
        REQUIRE(bad_status.get_status().reason() ==
2!
258
                "Must supply command name in \"command\" field of test command json object");
2✔
259
    });
2✔
260
}
2✔
261

262

263
static auto make_error_handler()
264
{
38✔
265
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
38✔
266
    auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
38✔
267
    auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) {
37✔
268
        error_promise->emplace_value(std::move(err));
36✔
269
    };
36✔
270
    return std::make_pair(std::move(error_future), std::move(fn));
38✔
271
}
38✔
272

273
static auto make_client_reset_handler()
274
{
16✔
275
    auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
16✔
276
    auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
16✔
277
    auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
16✔
278
        reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
12✔
279
    };
16✔
280
    return std::make_pair(std::move(reset_future), std::move(fn));
16✔
281
}
16✔
282

283

284
TEST_CASE("app: error handling integration test", "[sync][flx][baas]") {
16✔
285
    static std::optional<FLXSyncTestHarness> harness{"error_handling"};
16✔
286
    create_user_and_log_in(harness->app());
16✔
287
    SyncTestFile config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
288
    auto&& [error_future, error_handler] = make_error_handler();
16✔
289
    config.sync_config->error_handler = std::move(error_handler);
16✔
290
    config.sync_config->client_resync_mode = ClientResyncMode::Manual;
16✔
291

8✔
292
    SECTION("handles unknown errors gracefully") {
16✔
293
        auto r = Realm::get_shared_realm(config);
2✔
294
        wait_for_download(*r);
2✔
295
        nlohmann::json error_body = {
2✔
296
            {"tryAgain", false},         {"message", "fake error"},
2✔
297
            {"shouldClientReset", true}, {"isRecoveryModeDisabled", false},
2✔
298
            {"action", "ClientReset"},
2✔
299
        };
2✔
300
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
301
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
302
        auto test_cmd_res =
2✔
303
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
304
                .get();
2✔
305
        REQUIRE(test_cmd_res == "{}");
2!
306
        auto error = wait_for_future(std::move(error_future)).get();
2✔
307
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
308
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
2!
309
        REQUIRE(error.is_fatal);
2!
310
        REQUIRE_THAT(error.status.reason(),
2✔
311
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
312
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
313
    }
2✔
314

8✔
315
    SECTION("unknown errors without actions are application bugs") {
16✔
316
        auto r = Realm::get_shared_realm(config);
2✔
317
        wait_for_download(*r);
2✔
318
        nlohmann::json error_body = {
2✔
319
            {"tryAgain", false},
2✔
320
            {"message", "fake error"},
2✔
321
            {"shouldClientReset", false},
2✔
322
            {"isRecoveryModeDisabled", false},
2✔
323
        };
2✔
324
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
325
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
326
        auto test_cmd_res =
2✔
327
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
328
                .get();
2✔
329
        REQUIRE(test_cmd_res == "{}");
2!
330
        auto error = wait_for_future(std::move(error_future)).get();
2✔
331
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
332
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
333
        REQUIRE(error.is_fatal);
2!
334
        REQUIRE_THAT(error.status.reason(),
2✔
335
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
336
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
337
    }
2✔
338

8✔
339
    SECTION("handles unknown actions gracefully") {
16✔
340
        auto r = Realm::get_shared_realm(config);
2✔
341
        wait_for_download(*r);
2✔
342
        nlohmann::json error_body = {
2✔
343
            {"tryAgain", false},
2✔
344
            {"message", "fake error"},
2✔
345
            {"shouldClientReset", true},
2✔
346
            {"isRecoveryModeDisabled", false},
2✔
347
            {"action", "FakeActionThatWillNeverExist"},
2✔
348
        };
2✔
349
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
350
                                       {"args", nlohmann::json{{"errorCode", 201}, {"errorBody", error_body}}}};
2✔
351
        auto test_cmd_res =
2✔
352
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
353
                .get();
2✔
354
        REQUIRE(test_cmd_res == "{}");
2!
355
        auto error = wait_for_future(std::move(error_future)).get();
2✔
356
        REQUIRE(error.status == ErrorCodes::RuntimeError);
2!
357
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
358
        REQUIRE(error.is_fatal);
2!
359
        REQUIRE_THAT(error.status.reason(), !Catch::Matchers::ContainsSubstring("Unknown sync protocol error code"));
2✔
360
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
361
    }
2✔
362

8✔
363

8✔
364
    SECTION("unknown connection-level errors are still errors") {
16✔
365
        auto r = Realm::get_shared_realm(config);
2✔
366
        wait_for_download(*r);
2✔
367
        nlohmann::json error_body = {{"tryAgain", false},
2✔
368
                                     {"message", "fake error"},
2✔
369
                                     {"shouldClientReset", false},
2✔
370
                                     {"isRecoveryModeDisabled", false},
2✔
371
                                     {"action", "ApplicationBug"}};
2✔
372
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
373
                                       {"args", nlohmann::json{{"errorCode", 199}, {"errorBody", error_body}}}};
2✔
374
        auto test_cmd_res =
2✔
375
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
376
                .get();
2✔
377
        REQUIRE(test_cmd_res == "{}");
2!
378
        auto error = wait_for_future(std::move(error_future)).get();
2✔
379
        REQUIRE(error.status == ErrorCodes::SyncProtocolInvariantFailed);
2!
380
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ProtocolViolation);
2!
381
        REQUIRE(error.is_fatal);
2!
382
    }
2✔
383

8✔
384
    SECTION("client reset errors") {
16✔
385
        auto r = Realm::get_shared_realm(config);
6✔
386
        wait_for_download(*r);
6✔
387
        nlohmann::json error_body = {{"tryAgain", false},
6✔
388
                                     {"message", "fake error"},
6✔
389
                                     {"shouldClientReset", true},
6✔
390
                                     {"isRecoveryModeDisabled", false},
6✔
391
                                     {"action", "ClientReset"}};
6✔
392
        auto code = GENERATE(sync::ProtocolError::bad_client_file_ident, sync::ProtocolError::bad_server_version,
6✔
393
                             sync::ProtocolError::diverging_histories);
6✔
394
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
6✔
395
                                       {"args", nlohmann::json{{"errorCode", code}, {"errorBody", error_body}}}};
6✔
396
        auto test_cmd_res =
6✔
397
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
6✔
398
                .get();
6✔
399
        REQUIRE(test_cmd_res == "{}");
6!
400
        auto error = wait_for_future(std::move(error_future)).get();
6✔
401
        REQUIRE(error.status == ErrorCodes::SyncClientResetRequired);
6!
402
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
6!
403
        REQUIRE(error.is_client_reset_requested());
6!
404
        REQUIRE(error.is_fatal);
6!
405
    }
6✔
406

8✔
407
    SECTION("teardown") {
16✔
408
        harness.reset();
2✔
409
    }
2✔
410
}
16✔
411

412

413
TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
42✔
414
    std::vector<ObjectSchema> schema{
42✔
415
        {"TopLevel",
42✔
416
         {
42✔
417
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
418
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
419
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
42✔
420
             {"non_queryable_field", PropertyType::String | PropertyType::Nullable},
42✔
421
             {"list_of_ints_field", PropertyType::Int | PropertyType::Array},
42✔
422
             {"sum_of_list_field", PropertyType::Int},
42✔
423
         }},
42✔
424
        {"TopLevel2",
42✔
425
         {
42✔
426
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
427
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
428
         }},
42✔
429
    };
42✔
430

21✔
431
    // some of these tests make additive schema changes which is only allowed in dev mode
21✔
432
    constexpr bool dev_mode = true;
42✔
433
    FLXSyncTestHarness harness("flx_client_reset",
42✔
434
                               {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode});
42✔
435

21✔
436
    auto add_object = [](SharedRealm realm, std::string str_field, int64_t int_field,
42✔
437
                         ObjectId oid = ObjectId::gen()) {
116✔
438
        CppContext c(realm);
116✔
439
        realm->begin_transaction();
116✔
440

58✔
441
        int64_t r1 = random_int();
116✔
442
        int64_t r2 = random_int();
116✔
443
        int64_t r3 = random_int();
116✔
444
        int64_t sum = uint64_t(r1) + r2 + r3;
116✔
445

58✔
446
        Object::create(c, realm, "TopLevel",
116✔
447
                       std::any(AnyDict{{"_id", oid},
116✔
448
                                        {"queryable_str_field", str_field},
116✔
449
                                        {"queryable_int_field", int_field},
116✔
450
                                        {"non_queryable_field", "non queryable 1"s},
116✔
451
                                        {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
116✔
452
                                        {"sum_of_list_field", sum}}));
116✔
453
        realm->commit_transaction();
116✔
454
    };
116✔
455

21✔
456
    auto subscribe_to_and_add_objects = [&](SharedRealm realm, size_t num_objects) {
41✔
457
        auto table = realm->read_group().get_table("class_TopLevel");
40✔
458
        auto id_col = table->get_primary_key_column();
40✔
459
        auto sub_set = realm->get_latest_subscription_set();
40✔
460
        for (size_t i = 0; i < num_objects; ++i) {
130✔
461
            auto oid = ObjectId::gen();
90✔
462
            auto mut_sub = sub_set.make_mutable_copy();
90✔
463
            mut_sub.clear();
90✔
464
            mut_sub.insert_or_assign(Query(table).equal(id_col, oid));
90✔
465
            sub_set = mut_sub.commit();
90✔
466
            add_object(realm, util::format("added _id='%1'", oid), 0, oid);
90✔
467
        }
90✔
468
    };
40✔
469

21✔
470
    auto add_subscription_for_new_object = [&](SharedRealm realm, std::string str_field,
42✔
471
                                               int64_t int_field) -> sync::SubscriptionSet {
32✔
472
        auto table = realm->read_group().get_table("class_TopLevel");
22✔
473
        auto queryable_str_field = table->get_column_key("queryable_str_field");
22✔
474
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
22✔
475
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, StringData(str_field)));
22✔
476
        auto resulting_set = sub_set.commit();
22✔
477
        add_object(realm, str_field, int_field);
22✔
478
        return resulting_set;
22✔
479
    };
22✔
480

21✔
481
    auto add_invalid_subscription = [&](SharedRealm realm) -> sync::SubscriptionSet {
22✔
482
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
483
        auto queryable_str_field = table->get_column_key("non_queryable_field");
2✔
484
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
2✔
485
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
486
        auto resulting_set = sub_set.commit();
2✔
487
        return resulting_set;
2✔
488
    };
2✔
489

21✔
490
    auto count_queries_with_str = [](sync::SubscriptionSet subs, std::string_view str) {
24✔
491
        size_t count = 0;
6✔
492
        for (auto sub : subs) {
10✔
493
            if (sub.query_string.find(str) != std::string::npos) {
10✔
494
                ++count;
4✔
495
            }
4✔
496
        }
10✔
497
        return count;
6✔
498
    };
6✔
499
    create_user_and_log_in(harness.app());
42✔
500
    auto user1 = harness.app()->current_user();
42✔
501
    create_user_and_log_in(harness.app());
42✔
502
    auto user2 = harness.app()->current_user();
42✔
503
    SyncTestFile config_local(user1, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
504
    config_local.path += ".local";
42✔
505
    SyncTestFile config_remote(user2, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
506
    config_remote.path += ".remote";
42✔
507
    const std::string str_field_value = "foo";
42✔
508
    const int64_t local_added_int = 100;
42✔
509
    const int64_t remote_added_int = 200;
42✔
510
    size_t before_reset_count = 0;
42✔
511
    size_t after_reset_count = 0;
42✔
512
    config_local.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) {
34✔
513
        ++before_reset_count;
26✔
514
    };
26✔
515
    config_local.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference,
42✔
516
                                                                               bool) {
21✔
517
        ++after_reset_count;
×
518
    };
×
519

21✔
520
    SECTION("Recover: offline writes and subscription (single subscription)") {
42✔
521
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
522
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
523
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
524
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
525
        test_reset
2✔
526
            ->populate_initial_object([&](SharedRealm realm) {
2✔
527
                auto pk_of_added_object = ObjectId::gen();
2✔
528
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
529
                auto table = realm->read_group().get_table(ObjectStore::table_name_for_object_type("TopLevel"));
2✔
530
                REALM_ASSERT(table);
2✔
531
                mut_subs.insert_or_assign(Query(table));
2✔
532
                mut_subs.commit();
2✔
533

1✔
534
                realm->begin_transaction();
2✔
535
                CppContext c(realm);
2✔
536
                int64_t r1 = random_int();
2✔
537
                int64_t r2 = random_int();
2✔
538
                int64_t r3 = random_int();
2✔
539
                int64_t sum = uint64_t(r1) + r2 + r3;
2✔
540

1✔
541
                Object::create(c, realm, "TopLevel",
2✔
542
                               std::any(AnyDict{{"_id"s, pk_of_added_object},
2✔
543
                                                {"queryable_str_field"s, "initial value"s},
2✔
544
                                                {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
2✔
545
                                                {"sum_of_list_field", sum}}));
2✔
546

1✔
547
                realm->commit_transaction();
2✔
548
                wait_for_upload(*realm);
2✔
549
                return pk_of_added_object;
2✔
550
            })
2✔
551
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
552
                add_object(local_realm, str_field_value, local_added_int);
2✔
553
            })
2✔
554
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
555
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
556
                sync::SubscriptionSet::State actual =
2✔
557
                    remote_realm->get_latest_subscription_set()
2✔
558
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
559
                        .get();
2✔
560
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
561
            })
2✔
562
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
563
                wait_for_advance(*local_realm);
2✔
564
                ClientResyncMode mode = client_reset_future.get();
2✔
565
                REQUIRE(mode == ClientResyncMode::Recover);
2!
566
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
567
                auto str_col = table->get_column_key("queryable_str_field");
2✔
568
                auto int_col = table->get_column_key("queryable_int_field");
2✔
569
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
570
                tv.sort(int_col);
2✔
571
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
572
                REQUIRE(tv.size() == 2);
2!
573
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
574
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
575
            })
2✔
576
            ->run();
2✔
577
    }
2✔
578

21✔
579
    SECTION("Recover: subscription and offline writes after client reset failure") {
42✔
580
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
581
        auto&& [error_future, error_handler] = make_error_handler();
2✔
582
        config_local.sync_config->error_handler = error_handler;
2✔
583

1✔
584
        std::string fresh_path = realm::_impl::ClientResetOperation::get_fresh_path_for(config_local.path);
2✔
585
        // create a non-empty directory that we'll fail to delete
1✔
586
        util::make_dir(fresh_path);
2✔
587
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
588

1✔
589
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
590
        test_reset
2✔
591
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
592
                auto mut_sub = local_realm->get_latest_subscription_set().make_mutable_copy();
2✔
593
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
594
                mut_sub.insert_or_assign(Query(table));
2✔
595
                mut_sub.commit();
2✔
596

1✔
597
                CppContext c(local_realm);
2✔
598
                local_realm->begin_transaction();
2✔
599
                Object::create(c, local_realm, "TopLevel2",
2✔
600
                               std::any(AnyDict{{"_id"s, ObjectId::gen()}, {"queryable_str_field"s, "foo"s}}));
2✔
601
                local_realm->commit_transaction();
2✔
602
            })
2✔
603
            ->on_post_reset([](SharedRealm local_realm) {
2✔
604
                // Verify offline subscription was not removed.
1✔
605
                auto subs = local_realm->get_latest_subscription_set();
2✔
606
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
607
                REQUIRE(subs.find(Query(table)));
2!
608
            })
2✔
609
            ->run();
2✔
610

1✔
611
        // Remove the folder preventing the completion of a client reset.
1✔
612
        util::try_remove_dir_recursive(fresh_path);
2✔
613

1✔
614
        RealmConfig config_copy = config_local;
2✔
615
        config_copy.sync_config = std::make_shared<SyncConfig>(*config_copy.sync_config);
2✔
616
        config_copy.sync_config->error_handler = nullptr;
2✔
617
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
618
        config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
619

1✔
620
        // Attempt to open the realm again.
1✔
621
        // This time the client reset succeeds and the offline subscription and writes are recovered.
1✔
622
        auto realm = Realm::get_shared_realm(config_copy);
2✔
623
        ClientResyncMode mode = reset_future.get();
2✔
624
        REQUIRE(mode == ClientResyncMode::Recover);
2!
625

1✔
626
        auto table = realm->read_group().get_table("class_TopLevel2");
2✔
627
        auto str_col = table->get_column_key("queryable_str_field");
2✔
628
        REQUIRE(table->size() == 1);
2!
629
        REQUIRE(table->get_object(0).get<String>(str_col) == "foo");
2!
630
    }
2✔
631

21✔
632
    SECTION("Recover: offline writes and subscriptions (multiple subscriptions)") {
42✔
633
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
634
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
635
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
636
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
637
        test_reset
2✔
638
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
639
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
640
            })
2✔
641
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
642
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
643
                sync::SubscriptionSet::State actual =
2✔
644
                    remote_realm->get_latest_subscription_set()
2✔
645
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
646
                        .get();
2✔
647
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
648
            })
2✔
649
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
650
                ClientResyncMode mode = client_reset_future.get();
2✔
651
                REQUIRE(mode == ClientResyncMode::Recover);
2!
652
                auto subs = local_realm->get_latest_subscription_set();
2✔
653
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
654
                // make sure that the subscription for "foo" survived the reset
1✔
655
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
656
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
657
                REQUIRE(count_of_foo == 1);
2!
658
                local_realm->refresh();
2✔
659
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
660
                auto str_col = table->get_column_key("queryable_str_field");
2✔
661
                auto int_col = table->get_column_key("queryable_int_field");
2✔
662
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
663
                tv.sort(int_col);
2✔
664
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
665
                REQUIRE(tv.size() == 2);
2!
666
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
667
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
668
            })
2✔
669
            ->run();
2✔
670
    }
2✔
671

21✔
672
    auto validate_integrity_of_arrays = [](TableRef table) -> size_t {
29✔
673
        auto sum_col = table->get_column_key("sum_of_list_field");
16✔
674
        auto array_col = table->get_column_key("list_of_ints_field");
16✔
675
        auto query = table->column<Lst<Int>>(array_col).sum() == table->column<Int>(sum_col) &&
16✔
676
                     table->column<Lst<Int>>(array_col).size() > 0;
16✔
677
        return query.count();
16✔
678
    };
16✔
679

21✔
680
    SECTION("Recover: offline writes with associated subscriptions in the correct order") {
42✔
681
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
682
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
683
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
684
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
685
        constexpr size_t num_objects_added = 20;
2✔
686
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
687
        constexpr size_t num_objects_added_by_remote = 1;  // make_remote_changes()
2✔
688
        test_reset
2✔
689
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
690
                subscribe_to_and_add_objects(local_realm, num_objects_added);
2✔
691
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
692
                REQUIRE(table->size() == num_objects_added + num_objects_added_by_harness);
2!
693
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
694
                REQUIRE(count_of_valid_array_data == num_objects_added);
2!
695
            })
2✔
696
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
697
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
698
                sync::SubscriptionSet::State actual =
2✔
699
                    remote_realm->get_latest_subscription_set()
2✔
700
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
701
                        .get();
2✔
702
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
703
            })
2✔
704
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
705
                ClientResyncMode mode = client_reset_future.get();
2✔
706
                REQUIRE(mode == ClientResyncMode::Recover);
2!
707
                local_realm->refresh();
2✔
708
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
709
                auto state = latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
710
                REQUIRE(state == sync::SubscriptionSet::State::Complete);
2!
711
                local_realm->refresh();
2✔
712
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
713
                if (table->size() != 1) {
2✔
714
                    table->to_json(std::cout, 1, {});
×
715
                }
×
716
                REQUIRE(table->size() == 1);
2!
717
                auto mut_sub = latest_subs.make_mutable_copy();
2✔
718
                mut_sub.clear();
2✔
719
                mut_sub.insert_or_assign(Query(table));
2✔
720
                latest_subs = mut_sub.commit();
2✔
721
                latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
722
                local_realm->refresh();
2✔
723
                REQUIRE(table->size() ==
2!
724
                        num_objects_added + num_objects_added_by_harness + num_objects_added_by_remote);
2✔
725
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
726
                REQUIRE(count_of_valid_array_data == num_objects_added + num_objects_added_by_remote);
2!
727
            })
2✔
728
            ->run();
2✔
729
    }
2✔
730

21✔
731
    SECTION("Recover: incompatible property changes are rejected") {
42✔
732
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
733
        auto&& [error_future, err_handler] = make_error_handler();
2✔
734
        config_local.sync_config->error_handler = err_handler;
2✔
735
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
736
        constexpr size_t num_objects_added_before = 2;
2✔
737
        constexpr size_t num_objects_added_after = 2;
2✔
738
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
739
        constexpr std::string_view added_property_name = "new_property";
2✔
740
        test_reset
2✔
741
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
742
                subscribe_to_and_add_objects(local_realm, num_objects_added_before);
2✔
743
                Schema local_update = schema;
2✔
744
                Schema::iterator it = local_update.find("TopLevel");
2✔
745
                REQUIRE(it != local_update.end());
2!
746
                it->persisted_properties.push_back(
2✔
747
                    {std::string(added_property_name), PropertyType::Float | PropertyType::Nullable});
2✔
748
                local_realm->update_schema(local_update);
2✔
749
                subscribe_to_and_add_objects(local_realm, num_objects_added_after);
2✔
750
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
751
                REQUIRE(table->size() ==
2!
752
                        num_objects_added_before + num_objects_added_after + num_objects_added_by_harness);
2✔
753
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
754
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
755
            })
2✔
756
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
757
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
758
                Schema remote_update = schema;
2✔
759
                Schema::iterator it = remote_update.find("TopLevel");
2✔
760
                REQUIRE(it != remote_update.end());
2!
761
                it->persisted_properties.push_back(
2✔
762
                    {std::string(added_property_name), PropertyType::UUID | PropertyType::Nullable});
2✔
763
                remote_realm->update_schema(remote_update);
2✔
764
                sync::SubscriptionSet::State actual =
2✔
765
                    remote_realm->get_latest_subscription_set()
2✔
766
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
767
                        .get();
2✔
768
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
769
            })
2✔
770
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm local_realm) mutable {
2✔
771
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
772
                REQUIRE(before_reset_count == 1);
2!
773
                REQUIRE(after_reset_count == 0);
2!
774
                REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
775
                REQUIRE(sync_error.is_client_reset_requested());
2!
776
                local_realm->refresh();
2✔
777
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
778
                // since schema validation happens in the first recovery commit, that whole commit is rolled back
1✔
779
                // and the final state here is "pre reset"
1✔
780
                REQUIRE(table->size() ==
2!
781
                        num_objects_added_before + num_objects_added_by_harness + num_objects_added_after);
2✔
782
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
783
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
784
            })
2✔
785
            ->run();
2✔
786
    }
2✔
787

21✔
788
    SECTION("unsuccessful replay of local changes") {
42✔
789
        constexpr size_t num_objects_added_before = 2;
4✔
790
        constexpr size_t num_objects_added_after = 2;
4✔
791
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
4✔
792
        constexpr std::string_view added_property_name = "new_property";
4✔
793
        auto&& [error_future, err_handler] = make_error_handler();
4✔
794
        config_local.sync_config->error_handler = err_handler;
4✔
795

2✔
796
        // The local changes here are a bit contrived because removing a column is disallowed
2✔
797
        // at the object store layer for sync'd Realms. The only reason a recovery should fail in production
2✔
798
        // during the apply stage is due to programmer error or external factors such as out of disk space.
2✔
799
        // Any schema discrepancies are caught by the initial diff, so the way to make a recovery fail here is
2✔
800
        // to add and remove a column at the core level such that the schema diff passes, but instructions are
2✔
801
        // generated which will fail when applied.
2✔
802
        reset_utils::TestClientReset::Callback make_local_changes_that_will_fail = [&](SharedRealm local_realm) {
4✔
803
            subscribe_to_and_add_objects(local_realm, num_objects_added_before);
4✔
804
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
805
            REQUIRE(table->size() == num_objects_added_before + num_objects_added_by_harness);
4!
806
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
807
            REQUIRE(count_of_valid_array_data == num_objects_added_before);
4!
808
            local_realm->begin_transaction();
4✔
809
            ColKey added = table->add_column(type_Int, added_property_name);
4✔
810
            table->remove_column(added);
4✔
811
            local_realm->commit_transaction();
4✔
812
            subscribe_to_and_add_objects(local_realm, num_objects_added_after); // these are lost!
4✔
813
        };
4✔
814

2✔
815
        reset_utils::TestClientReset::Callback verify_post_reset_state = [&, err_future = std::move(error_future)](
4✔
816
                                                                             SharedRealm local_realm) mutable {
4✔
817
            auto sync_error = wait_for_future(std::move(err_future)).get();
4✔
818
            REQUIRE(before_reset_count == 1);
4!
819
            REQUIRE(after_reset_count == 0);
4!
820
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
4!
821
            REQUIRE(sync_error.is_client_reset_requested());
4!
822
            local_realm->refresh();
4✔
823
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
824
            ColKey added = table->get_column_key(added_property_name);
4✔
825
            REQUIRE(!added); // partial recovery halted at remove_column() but rolled back everything in the change
4!
826
            // table is missing num_objects_added_after and the last commit after the latest subscription
2✔
827
            // this is due to how recovery batches together changesets up until a subscription
2✔
828
            const size_t expected_added_objects = num_objects_added_before - 1;
4✔
829
            REQUIRE(table->size() == expected_added_objects + num_objects_added_by_harness);
4!
830
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
831
            REQUIRE(count_of_valid_array_data == expected_added_objects);
4!
832
        };
4✔
833

2✔
834
        SECTION("Recover: unsuccessful recovery leads to a manual reset") {
4✔
835
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
836
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
837
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
838
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
839
                ->run();
2✔
840
            RealmConfig config_copy = config_local;
2✔
841
            auto&& [error_future2, err_handler2] = make_error_handler();
2✔
842
            config_copy.sync_config->error_handler = err_handler2;
2✔
843
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
844
            auto sync_error = wait_for_future(std::move(error_future2)).get();
2✔
845
            REQUIRE(before_reset_count == 2);
2!
846
            REQUIRE(after_reset_count == 0);
2!
847
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
848
            REQUIRE(sync_error.is_client_reset_requested());
2!
849
        }
2✔
850

2✔
851
        SECTION("RecoverOrDiscard: unsuccessful reapply leads to discard") {
4✔
852
            config_local.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard;
2✔
853
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
854
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
855
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
856
                ->run();
2✔
857

1✔
858
            RealmConfig config_copy = config_local;
2✔
859
            auto&& [client_reset_future, reset_handler] = make_client_reset_handler();
2✔
860
            config_copy.sync_config->error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
1✔
861
                REALM_ASSERT_EX(!err.is_fatal, err.status);
×
862
                CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
863
            };
×
864
            config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
865
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
866
            ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
867
            REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
868
            realm_post_reset->refresh();
2✔
869
            auto table = realm_post_reset->read_group().get_table("class_TopLevel");
2✔
870
            ColKey added = table->get_column_key(added_property_name);
2✔
871
            REQUIRE(!added);                                        // reverted local changes
2!
872
            REQUIRE(table->size() == num_objects_added_by_harness); // discarded all offline local changes
2!
873
        }
2✔
874
    }
4✔
875

21✔
876
    SECTION("DiscardLocal: offline writes and subscriptions are lost") {
42✔
877
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
878
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
879
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
880
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
881
        test_reset
2✔
882
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
883
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
884
            })
2✔
885
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
886
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
887
            })
2✔
888
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) mutable {
2✔
889
                ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
890
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
891
                auto subs = local_realm->get_latest_subscription_set();
2✔
892
                wait_for_future(subs.get_state_change_notification(sync::SubscriptionSet::State::Complete)).get();
2✔
893
                local_realm->refresh();
2✔
894
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
895
                auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
896
                auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
897
                auto tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
898
                // the object we created while offline was discarded, and the remote object was not downloaded
1✔
899
                REQUIRE(tv.size() == 0);
2!
900
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
901
                // make sure that the subscription for "foo" did not survive the reset
1✔
902
                REQUIRE(count_of_foo == 0);
2!
903
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
904

1✔
905
                // adding data and subscriptions to a reset Realm works as normal
1✔
906
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
907
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
908
                REQUIRE(latest_subs.version() > subs.version());
2!
909
                wait_for_future(latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete))
2✔
910
                    .get();
2✔
911
                local_realm->refresh();
2✔
912
                count_of_foo = count_queries_with_str(latest_subs, util::format("\"%1\"", str_field_value));
2✔
913
                REQUIRE(count_of_foo == 1);
2!
914
                tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
915
                REQUIRE(tv.size() == 2);
2!
916
                tv.sort(queryable_int_field);
2✔
917
                REQUIRE(tv.get_object(0).get<int64_t>(queryable_int_field) == local_added_int);
2!
918
                REQUIRE(tv.get_object(1).get<int64_t>(queryable_int_field) == remote_added_int);
2!
919
            })
2✔
920
            ->run();
2✔
921
    }
2✔
922

21✔
923
    SECTION("DiscardLocal: an invalid subscription made while offline becomes superceeded") {
42✔
924
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
925
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
926
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
927
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
928
        std::unique_ptr<sync::SubscriptionSet> invalid_sub;
2✔
929
        test_reset
2✔
930
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
931
                invalid_sub = std::make_unique<sync::SubscriptionSet>(add_invalid_subscription(local_realm));
2✔
932
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
933
            })
2✔
934
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
935
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
936
            })
2✔
937
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
938
                local_realm->refresh();
2✔
939
                sync::SubscriptionSet::State actual =
2✔
940
                    invalid_sub->get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
941
                REQUIRE(actual == sync::SubscriptionSet::State::Superseded);
2!
942
                ClientResyncMode mode = client_reset_future.get();
2✔
943
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
944
            })
2✔
945
            ->run();
2✔
946
    }
2✔
947

21✔
948
    SECTION("DiscardLocal: an error is produced if a previously successful query becomes invalid due to "
42✔
949
            "server changes across a reset") {
22✔
950
        // Disable dev mode so non-queryable fields are not automatically added as queryable
1✔
951
        const AppSession& app_session = harness.session().app_session();
2✔
952
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
953
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
954
        auto&& [error_future, err_handler] = make_error_handler();
2✔
955
        config_local.sync_config->error_handler = err_handler;
2✔
956
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
957
        test_reset
2✔
958
            ->setup([&](SharedRealm realm) {
2✔
959
                if (realm->sync_session()->path() == config_local.path) {
2✔
960
                    auto added_sub = add_subscription_for_new_object(realm, str_field_value, 0);
2✔
961
                    added_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
962
                }
2✔
963
            })
2✔
964
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
965
                add_object(local_realm, str_field_value, local_added_int);
2✔
966
                // Make "queryable_str_field" not a valid query field.
1✔
967
                // Pre-reset, the Realm had a successful query on it, but now when the client comes back online
1✔
968
                // and tries to reset, the fresh Realm download will fail with a query error.
1✔
969
                const AppSession& app_session = harness.session().app_session();
2✔
970
                auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
2✔
971
                auto baas_sync_config =
2✔
972
                    app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service);
2✔
973
                REQUIRE(baas_sync_config.queryable_field_names->is_array());
2!
974
                auto it = baas_sync_config.queryable_field_names->begin();
2✔
975
                for (; it != baas_sync_config.queryable_field_names->end(); ++it) {
2✔
976
                    if (*it == "queryable_str_field") {
2✔
977
                        break;
2✔
978
                    }
2✔
979
                }
2✔
980
                REQUIRE(it != baas_sync_config.queryable_field_names->end());
2!
981
                baas_sync_config.queryable_field_names->erase(it);
2✔
982
                app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
2✔
983
            })
2✔
984
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm) mutable {
2✔
985
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
986
                INFO(sync_error.status);
2✔
987
                CHECK(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
988
            })
2✔
989
            ->run();
2✔
990
    }
2✔
991

21✔
992
    SECTION("DiscardLocal: completion callbacks fire after client reset even when there is no data to download") {
42✔
993
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
994
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
995
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
996
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
997
        test_reset
2✔
998
            ->on_post_local_changes([&](SharedRealm realm) {
2✔
999
                wait_for_upload(*realm);
2✔
1000
                wait_for_download(*realm);
2✔
1001
            })
2✔
1002
            ->run();
2✔
1003
    }
2✔
1004

21✔
1005
    SECTION("DiscardLocal: open realm after client reset failure") {
42✔
1006
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1007
        auto&& [error_future, error_handler] = make_error_handler();
2✔
1008
        config_local.sync_config->error_handler = error_handler;
2✔
1009

1✔
1010
        std::string fresh_path = realm::_impl::ClientResetOperation::get_fresh_path_for(config_local.path);
2✔
1011
        // create a non-empty directory that we'll fail to delete
1✔
1012
        util::make_dir(fresh_path);
2✔
1013
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
1014

1✔
1015
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1016
        test_reset->run();
2✔
1017

1✔
1018
        // Client reset fails due to sync client not being able to create the fresh realm.
1✔
1019
        auto sync_error = wait_for_future(std::move(error_future)).get();
2✔
1020
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1021

1✔
1022
        // Open the realm again. This should not crash.
1✔
1023
        auto&& [err_future, err_handler] = make_error_handler();
2✔
1024
        config_local.sync_config->error_handler = std::move(err_handler);
2✔
1025

1✔
1026
        auto realm_post_reset = Realm::get_shared_realm(config_local);
2✔
1027
        sync_error = wait_for_future(std::move(err_future)).get();
2✔
1028
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1029
    }
2✔
1030

21✔
1031
    enum class ResetMode { NoReset, InitiateClientReset };
42✔
1032
    auto seed_realm = [&harness, &subscribe_to_and_add_objects](RealmConfig config, ResetMode reset_mode) {
34✔
1033
        config.sync_config->error_handler = [path = config.path](std::shared_ptr<SyncSession>, SyncError err) {
13✔
1034
            // ignore spurious failures on this instance
1035
            util::format(std::cout, "spurious error while seeding a Realm at '%1': %2\n", path, err.status);
×
1036
        };
×
1037
        SharedRealm realm = Realm::get_shared_realm(config);
26✔
1038
        subscribe_to_and_add_objects(realm, 1);
26✔
1039
        auto subs = realm->get_latest_subscription_set();
26✔
1040
        auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
26✔
1041
        CHECK(result == sync::SubscriptionSet::State::Complete);
26!
1042
        if (reset_mode == ResetMode::InitiateClientReset) {
26✔
1043
            reset_utils::trigger_client_reset(harness.session().app_session(), realm);
18✔
1044
        }
18✔
1045
        realm->close();
26✔
1046
    };
26✔
1047

21✔
1048
    auto setup_reset_handlers_for_schema_validation =
42✔
1049
        [&before_reset_count, &after_reset_count](RealmConfig& config, Schema expected_schema) {
28✔
1050
            auto& sync_config = *config.sync_config;
14✔
1051
            sync_config.error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
7✔
1052
                FAIL(err.status);
×
1053
            };
×
1054
            sync_config.notify_before_client_reset = [&before_reset_count,
14✔
1055
                                                      expected = expected_schema](SharedRealm frozen_before) {
14✔
1056
                ++before_reset_count;
14✔
1057
                REQUIRE(frozen_before->schema().size() > 0);
14!
1058
                REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1059
                REQUIRE(frozen_before->schema() == expected);
14!
1060
            };
14✔
1061

7✔
1062
            auto [promise, future] = util::make_promise_future<void>();
14✔
1063
            sync_config.notify_after_client_reset =
14✔
1064
                [&after_reset_count, promise = util::CopyablePromiseHolder<void>(std::move(promise)), expected_schema,
14✔
1065
                 reset_mode = config.sync_config->client_resync_mode, has_schema = config.schema.has_value()](
14✔
1066
                    SharedRealm frozen_before, ThreadSafeReference after_ref, bool did_recover) mutable {
14✔
1067
                    ++after_reset_count;
14✔
1068
                    REQUIRE(frozen_before->schema().size() > 0);
14!
1069
                    REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1070
                    REQUIRE(frozen_before->schema() == expected_schema);
14!
1071
                    SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_dummy());
14✔
1072
                    if (!has_schema) {
14✔
1073
                        after->set_schema_subset(expected_schema);
4✔
1074
                    }
4✔
1075
                    REQUIRE(after);
14!
1076
                    REQUIRE(after->schema() == expected_schema);
14!
1077
                    // the above check is sufficient unless operator==() is changed to not care about ordering
7✔
1078
                    // so future proof that by explicitly checking the order of properties here as well
7✔
1079
                    REQUIRE(after->schema().size() == frozen_before->schema().size());
14!
1080
                    auto after_it = after->schema().find("TopLevel");
14✔
1081
                    auto before_it = frozen_before->schema().find("TopLevel");
14✔
1082
                    REQUIRE(after_it != after->schema().end());
14!
1083
                    REQUIRE(before_it != frozen_before->schema().end());
14!
1084
                    REQUIRE(after_it->name == before_it->name);
14!
1085
                    REQUIRE(after_it->persisted_properties.size() == before_it->persisted_properties.size());
14!
1086
                    REQUIRE(after_it->persisted_properties[1].name == "queryable_int_field");
14!
1087
                    REQUIRE(after_it->persisted_properties[2].name == "queryable_str_field");
14!
1088
                    REQUIRE(before_it->persisted_properties[1].name == "queryable_int_field");
14!
1089
                    REQUIRE(before_it->persisted_properties[2].name == "queryable_str_field");
14!
1090
                    REQUIRE(did_recover == (reset_mode == ClientResyncMode::Recover));
14!
1091
                    promise.get_promise().emplace_value();
14✔
1092
                };
14✔
1093
            return std::move(future); // move is not redundant here because of how destructing works
14✔
1094
        };
14✔
1095

21✔
1096
    SECTION("Recover: schema indexes match in before and after states") {
42✔
1097
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1098
        // reorder a property such that it does not match the on disk property order
1✔
1099
        std::vector<ObjectSchema> local_schema = schema;
2✔
1100
        std::swap(local_schema[0].persisted_properties[1], local_schema[0].persisted_properties[2]);
2✔
1101
        local_schema[0].persisted_properties.push_back(
2✔
1102
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
2✔
1103
        config_local.schema = local_schema;
2✔
1104
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1105
        auto future = setup_reset_handlers_for_schema_validation(config_local, local_schema);
2✔
1106
        SharedRealm realm = Realm::get_shared_realm(config_local);
2✔
1107
        future.get();
2✔
1108
        CHECK(before_reset_count == 1);
2!
1109
        CHECK(after_reset_count == 1);
2!
1110
    }
2✔
1111

21✔
1112
    SECTION("Adding a local property matching a server addition is allowed") {
42✔
1113
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1114
        config_local.sync_config->client_resync_mode = mode;
4✔
1115
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1116
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1117
        changed_schema[0].persisted_properties.push_back(
4✔
1118
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1119
        // In a separate Realm, make the property addition.
2✔
1120
        // Since this is dev mode, it will be added to the server's schema.
2✔
1121
        config_remote.schema = changed_schema;
4✔
1122
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1123
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1124
        config_local.schema = changed_schema;
4✔
1125
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1126

2✔
1127
        async_open_realm(config_local,
4✔
1128
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1129
                             REQUIRE(ref);
4!
1130
                             REQUIRE_FALSE(error);
4!
1131
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1132
                             fut.get();
4✔
1133
                             CHECK(before_reset_count == 1);
4!
1134
                             CHECK(after_reset_count == 1);
4!
1135
                         });
4✔
1136
    }
4✔
1137

21✔
1138
    SECTION("Adding a local property matching a server addition inside the before reset callback is allowed") {
42✔
1139
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1140
        config_local.sync_config->client_resync_mode = mode;
4✔
1141
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1142
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1143
        changed_schema[0].persisted_properties.push_back(
4✔
1144
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1145
        // In a separate Realm, make the property addition.
2✔
1146
        // Since this is dev mode, it will be added to the server's schema.
2✔
1147
        config_remote.schema = changed_schema;
4✔
1148
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1149
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1150
        config_local.schema.reset();
4✔
1151
        config_local.sync_config->freeze_before_reset_realm = false;
4✔
1152
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1153

2✔
1154
        auto notify_before = std::move(config_local.sync_config->notify_before_client_reset);
4✔
1155
        config_local.sync_config->notify_before_client_reset = [=](std::shared_ptr<Realm> realm) {
4✔
1156
            realm->update_schema(changed_schema);
4✔
1157
            notify_before(realm);
4✔
1158
        };
4✔
1159

2✔
1160
        auto notify_after = std::move(config_local.sync_config->notify_after_client_reset);
4✔
1161
        config_local.sync_config->notify_after_client_reset = [=](std::shared_ptr<Realm> before,
4✔
1162
                                                                  ThreadSafeReference after, bool did_recover) {
4✔
1163
            before->set_schema_subset(changed_schema);
4✔
1164
            notify_after(before, std::move(after), did_recover);
4✔
1165
        };
4✔
1166

2✔
1167
        async_open_realm(config_local,
4✔
1168
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1169
                             REQUIRE(ref);
4!
1170
                             REQUIRE_FALSE(error);
4!
1171
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1172
                             fut.get();
4✔
1173
                             CHECK(before_reset_count == 1);
4!
1174
                             CHECK(after_reset_count == 1);
4!
1175
                         });
4✔
1176
    }
4✔
1177

21✔
1178
    auto make_additive_changes = [](std::vector<ObjectSchema> schema) {
24✔
1179
        schema[0].persisted_properties.push_back(
6✔
1180
            {"added_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
6✔
1181
        std::swap(schema[0].persisted_properties[1], schema[0].persisted_properties[2]);
6✔
1182
        schema.push_back({"AddedClass",
6✔
1183
                          {
6✔
1184
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
6✔
1185
                              {"str_field", PropertyType::String | PropertyType::Nullable},
6✔
1186
                          }});
6✔
1187
        return schema;
6✔
1188
    };
6✔
1189
    SECTION("Recover: additive schema changes are recovered in dev mode") {
42✔
1190
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1191
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1192
        config_local.schema = changed_schema;
2✔
1193
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1194
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1195
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1196
            REQUIRE(ref);
2!
1197
            REQUIRE_FALSE(error);
2!
1198
        });
2✔
1199
        future.get();
2✔
1200
        CHECK(before_reset_count == 1);
2!
1201
        CHECK(after_reset_count == 1);
2!
1202
        auto realm = Realm::get_shared_realm(config_local);
2✔
1203
        {
2✔
1204
            // make changes to the newly added property
1✔
1205
            realm->begin_transaction();
2✔
1206
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
1207
            ColKey new_col = table->get_column_key("added_oid_field");
2✔
1208
            REQUIRE(new_col);
2!
1209
            for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1210
                it->set(new_col, ObjectId::gen());
2✔
1211
            }
2✔
1212
            realm->commit_transaction();
2✔
1213
            // subscribe to the new Class and add an object
1✔
1214
            auto new_table = realm->read_group().get_table("class_AddedClass");
2✔
1215
            auto sub_set = realm->get_latest_subscription_set();
2✔
1216
            auto mut_sub = sub_set.make_mutable_copy();
2✔
1217
            mut_sub.insert_or_assign(Query(new_table));
2✔
1218
            mut_sub.commit();
2✔
1219
            realm->begin_transaction();
2✔
1220
            REQUIRE(new_table);
2!
1221
            new_table->create_object_with_primary_key(ObjectId::gen());
2✔
1222
            realm->commit_transaction();
2✔
1223
        }
2✔
1224
        auto result = realm->get_latest_subscription_set()
2✔
1225
                          .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
1226
                          .get();
2✔
1227
        CHECK(result == sync::SubscriptionSet::State::Complete);
2!
1228
        wait_for_download(*realm);
2✔
1229
    }
2✔
1230

21✔
1231
    SECTION("DiscardLocal: additive schema changes not allowed") {
42✔
1232
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1233
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1234
        config_local.schema = changed_schema;
2✔
1235
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1236
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1237
        config_local.sync_config->error_handler = err_handler;
2✔
1238
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1239
            REQUIRE(!ref);
2!
1240
            REQUIRE(error);
2!
1241
            REQUIRE_THROWS_CONTAINING(std::rethrow_exception(error),
2✔
1242
                                      "A fatal error occurred during client reset: 'Client reset cannot recover when "
2✔
1243
                                      "classes have been removed: {AddedClass}'");
2✔
1244
        });
2✔
1245
        error_future.get();
2✔
1246
        CHECK(before_reset_count == 1);
2!
1247
        CHECK(after_reset_count == 0);
2!
1248
    }
2✔
1249

21✔
1250
    SECTION("Recover: incompatible schema changes on async open are an error") {
42✔
1251
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1252
        std::vector<ObjectSchema> changed_schema = schema;
2✔
1253
        changed_schema[0].persisted_properties[0].type = PropertyType::UUID; // incompatible type change
2✔
1254
        config_local.schema = changed_schema;
2✔
1255
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1256
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1257
        config_local.sync_config->error_handler = err_handler;
2✔
1258
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1259
            REQUIRE(!ref);
2!
1260
            REQUIRE(error);
2!
1261
            REQUIRE_THROWS_CONTAINING(
2✔
1262
                std::rethrow_exception(error),
2✔
1263
                "A fatal error occurred during client reset: 'The following changes cannot be "
2✔
1264
                "made in additive-only schema mode:\n"
2✔
1265
                "- Property 'TopLevel._id' has been changed from 'object id' to 'uuid'.\nIf your app is running in "
2✔
1266
                "development mode, you can delete the realm and restart the app to update your schema.'");
2✔
1267
        });
2✔
1268
        error_future.get();
2✔
1269
        CHECK(before_reset_count == 0); // we didn't even get this far because opening the frozen realm fails
2!
1270
        CHECK(after_reset_count == 0);
2!
1271
    }
2✔
1272

21✔
1273
    SECTION("Recover: additive schema changes without dev mode produce an error after client reset") {
42✔
1274
        const AppSession& app_session = harness.session().app_session();
2✔
1275
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
2✔
1276
        {
2✔
1277
            seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1278
            // Disable dev mode so that schema changes are not allowed
1✔
1279
            app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
1280
            std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1281
            config_local.schema = changed_schema;
2✔
1282
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1283
            (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1284
            auto&& [error_future, err_handler] = make_error_handler();
2✔
1285
            config_local.sync_config->error_handler = err_handler;
2✔
1286
            async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1287
                REQUIRE(ref);
2!
1288
                REQUIRE_FALSE(error);
2!
1289
                auto realm = Realm::get_shared_realm(std::move(ref));
2✔
1290
                // make changes to the new property
1✔
1291
                realm->begin_transaction();
2✔
1292
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
1293
                ColKey new_col = table->get_column_key("added_oid_field");
2✔
1294
                REQUIRE(new_col);
2!
1295
                for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1296
                    it->set(new_col, ObjectId::gen());
2✔
1297
                }
2✔
1298
                realm->commit_transaction();
2✔
1299
            });
2✔
1300
            auto realm = Realm::get_shared_realm(config_local);
2✔
1301
            auto err = error_future.get();
2✔
1302
            std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding "
2✔
1303
                                       "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", "
2✔
1304
                                       "schema changes from clients are restricted when developer mode is disabled";
2✔
1305
            std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema "
2✔
1306
                                    "for Realm table \"AddedClass\", schema changes from clients are restricted when "
2✔
1307
                                    "developer mode is disabled";
2✔
1308
            REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) ||
2✔
1309
                                                  Catch::Matchers::ContainsSubstring(class_err));
2✔
1310
            CHECK(before_reset_count == 1);
2!
1311
            CHECK(after_reset_count == 1);
2!
1312
        }
2✔
1313
    }
2✔
1314

21✔
1315
    // the previous section turns off dev mode, undo that now for later tests
21✔
1316
    const AppSession& app_session = harness.session().app_session();
42✔
1317
    app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
42✔
1318
}
42✔
1319

1320
TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") {
2✔
1321
    FLXSyncTestHarness harness("flx_bad_query", {g_simple_embedded_obj_schema, {"queryable_str_field"}});
2✔
1322
    harness.do_with_new_user([&](auto user) {
2✔
1323
        SyncTestFile config(user, harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
1324
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
1325
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
2✔
1326
        config.sync_config->error_handler = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>,
2✔
1327
                                                                                        SyncError err) {
1✔
1328
            CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1329
            error_promise->emplace_value(std::move(err));
×
1330
        };
×
1331

1✔
1332
        auto realm = Realm::get_shared_realm(config);
2✔
1333
        CppContext c(realm);
2✔
1334
        realm->begin_transaction();
2✔
1335
        REQUIRE_THROWS_AS(
2✔
1336
            Object::create(c, realm, "TopLevel",
2✔
1337
                           std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_str_field", "foo"s}})),
2✔
1338
            NoSubscriptionForWrite);
2✔
1339
        realm->cancel_transaction();
2✔
1340

1✔
1341
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
1342

1✔
1343
        REQUIRE(table->is_empty());
2!
1344
        auto col_key = table->get_column_key("queryable_str_field");
2✔
1345
        {
2✔
1346
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
1347
            new_subs.insert_or_assign(Query(table).equal(col_key, "foo"));
2✔
1348
            auto subs = new_subs.commit();
2✔
1349
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1350
        }
2✔
1351

1✔
1352
        realm->begin_transaction();
2✔
1353
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1354
                                  std::any(AnyDict{{"_id", ObjectId::gen()},
2✔
1355
                                                   {"queryable_str_field", "foo"s},
2✔
1356
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1357
        realm->commit_transaction();
2✔
1358

1✔
1359
        realm->begin_transaction();
2✔
1360
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1361
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1362
        realm->commit_transaction();
2✔
1363

1✔
1364
        wait_for_upload(*realm);
2✔
1365
        wait_for_download(*realm);
2✔
1366
    });
2✔
1367
}
2✔
1368

1369
TEST_CASE("flx: uploading an object that is out-of-view results in compensating write",
1370
          "[sync][flx][compensating write][baas]") {
16✔
1371
    static std::optional<FLXSyncTestHarness> harness;
16✔
1372
    if (!harness) {
16✔
1373
        Schema schema{{"TopLevel",
2✔
1374
                       {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
1375
                        {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1376
                        {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
2✔
1377
                      {"TopLevel_embedded_obj",
2✔
1378
                       ObjectSchema::ObjectType::Embedded,
2✔
1379
                       {{"str_field", PropertyType::String | PropertyType::Nullable}}},
2✔
1380
                      {"Int PK",
2✔
1381
                       {
2✔
1382
                           {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1383
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1384
                       }},
2✔
1385
                      {"String PK",
2✔
1386
                       {
2✔
1387
                           {"_id", PropertyType::String, Property::IsPrimary{true}},
2✔
1388
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1389
                       }},
2✔
1390
                      {"UUID PK",
2✔
1391
                       {
2✔
1392
                           {"_id", PropertyType::UUID, Property::IsPrimary{true}},
2✔
1393
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1394
                       }}};
2✔
1395

1✔
1396
        AppCreateConfig::ServiceRole role;
2✔
1397
        role.name = "compensating_write_perms";
2✔
1398

1✔
1399
        AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
1400
        doc_filters.read = true;
2✔
1401
        doc_filters.write = {{"queryable_str_field", {{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
1402
        role.document_filters = doc_filters;
2✔
1403

1✔
1404
        role.insert_filter = true;
2✔
1405
        role.delete_filter = true;
2✔
1406
        role.read = true;
2✔
1407
        role.write = true;
2✔
1408
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}, {role}};
2✔
1409
        harness.emplace("flx_bad_query", server_schema);
2✔
1410
    }
2✔
1411

8✔
1412
    create_user_and_log_in(harness->app());
16✔
1413
    auto user = harness->app()->current_user();
16✔
1414

8✔
1415
    auto make_error_handler = [] {
16✔
1416
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
16✔
1417
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
16✔
1418
        auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
15✔
1419
            if (!error_promise) {
14✔
1420
                util::format(std::cerr,
×
1421
                             "An unexpected sync error was caught by the default SyncTestFile handler: '%1'\n",
×
1422
                             err.status);
×
1423
                abort();
×
1424
            }
×
1425
            error_promise->emplace_value(std::move(err));
14✔
1426
            error_promise.reset();
14✔
1427
        };
14✔
1428

8✔
1429
        return std::make_pair(std::move(error_future), std::move(fn));
16✔
1430
    };
16✔
1431

8✔
1432
    auto validate_sync_error = [&](const SyncError& sync_error, Mixed expected_pk, const char* expected_object_name,
16✔
1433
                                   const std::string& error_msg_fragment) {
15✔
1434
        CHECK(sync_error.status == ErrorCodes::SyncCompensatingWrite);
14!
1435
        CHECK(!sync_error.is_client_reset_requested());
14!
1436
        CHECK(sync_error.compensating_writes_info.size() == 1);
14!
1437
        CHECK(sync_error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
14!
1438
        auto write_info = sync_error.compensating_writes_info[0];
14✔
1439
        CHECK(write_info.primary_key == expected_pk);
14!
1440
        CHECK(write_info.object_name == expected_object_name);
14!
1441
        CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring(error_msg_fragment));
14✔
1442
    };
14✔
1443

8✔
1444
    SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
1445
    auto&& [error_future, err_handler] = make_error_handler();
16✔
1446
    config.sync_config->error_handler = err_handler;
16✔
1447
    auto realm = Realm::get_shared_realm(config);
16✔
1448
    auto table = realm->read_group().get_table("class_TopLevel");
16✔
1449

8✔
1450
    auto create_subscription = [&](StringData table_name, auto make_query) {
15✔
1451
        auto table = realm->read_group().get_table(table_name);
14✔
1452
        auto queryable_str_field = table->get_column_key("queryable_str_field");
14✔
1453
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
14✔
1454
        new_query.insert_or_assign(make_query(Query(table), queryable_str_field));
14✔
1455
        new_query.commit();
14✔
1456
    };
14✔
1457

8✔
1458
    SECTION("compensating write because of permission violation") {
16✔
1459
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1460
            return q.equal(col, "bizz");
2✔
1461
        });
2✔
1462

1✔
1463
        CppContext c(realm);
2✔
1464
        realm->begin_transaction();
2✔
1465
        auto invalid_obj = ObjectId::gen();
2✔
1466
        Object::create(c, realm, "TopLevel",
2✔
1467
                       std::any(AnyDict{{"_id", invalid_obj}, {"queryable_str_field", "bizz"s}}));
2✔
1468
        realm->commit_transaction();
2✔
1469

1✔
1470
        validate_sync_error(
2✔
1471
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1472
            util::format("write to \"%1\" in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1473

1✔
1474
        wait_for_advance(*realm);
2✔
1475

1✔
1476
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1477
        REQUIRE(top_level_table->is_empty());
2!
1478
    }
2✔
1479

8✔
1480
    SECTION("compensating write because of permission violation with write on embedded object") {
16✔
1481
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1482
            return q.equal(col, "bizz").Or().equal(col, "foo");
2✔
1483
        });
2✔
1484

1✔
1485
        CppContext c(realm);
2✔
1486
        realm->begin_transaction();
2✔
1487
        auto invalid_obj = ObjectId::gen();
2✔
1488
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1489
                                  std::any(AnyDict{{"_id", invalid_obj},
2✔
1490
                                                   {"queryable_str_field", "foo"s},
2✔
1491
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1492
        realm->commit_transaction();
2✔
1493
        realm->begin_transaction();
2✔
1494
        obj.set_property_value(c, "queryable_str_field", std::any{"bizz"s});
2✔
1495
        realm->commit_transaction();
2✔
1496
        realm->begin_transaction();
2✔
1497
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1498
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1499
        realm->commit_transaction();
2✔
1500

1✔
1501
        validate_sync_error(
2✔
1502
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1503
            util::format("write to \"%1\" in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1504

1✔
1505
        wait_for_advance(*realm);
2✔
1506

1✔
1507
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1508
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1509
        REQUIRE(util::any_cast<std::string&&>(obj.get_property_value<std::any>(c, "queryable_str_field")) == "foo");
2!
1510
        REQUIRE(util::any_cast<std::string&&>(embedded_obj.get_property_value<std::any>(c, "str_field")) == "bar");
2!
1511

1✔
1512
        realm->begin_transaction();
2✔
1513
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1514
        realm->commit_transaction();
2✔
1515

1✔
1516
        wait_for_upload(*realm);
2✔
1517
        wait_for_download(*realm);
2✔
1518

1✔
1519
        wait_for_advance(*realm);
2✔
1520
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1521
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1522
        REQUIRE(embedded_obj.get_column_value<StringData>("str_field") == "baz");
2!
1523
    }
2✔
1524

8✔
1525
    SECTION("compensating write for writing a top-level object that is out-of-view") {
16✔
1526
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1527
            return q.equal(col, "foo");
2✔
1528
        });
2✔
1529

1✔
1530
        CppContext c(realm);
2✔
1531
        realm->begin_transaction();
2✔
1532
        auto valid_obj = ObjectId::gen();
2✔
1533
        auto invalid_obj = ObjectId::gen();
2✔
1534
        Object::create(c, realm, "TopLevel",
2✔
1535
                       std::any(AnyDict{
2✔
1536
                           {"_id", valid_obj},
2✔
1537
                           {"queryable_str_field", "foo"s},
2✔
1538
                       }));
2✔
1539
        Object::create(c, realm, "TopLevel",
2✔
1540
                       std::any(AnyDict{
2✔
1541
                           {"_id", invalid_obj},
2✔
1542
                           {"queryable_str_field", "bar"s},
2✔
1543
                       }));
2✔
1544
        realm->commit_transaction();
2✔
1545

1✔
1546
        validate_sync_error(std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1547
                            "object is outside of the current query view");
2✔
1548

1✔
1549
        wait_for_advance(*realm);
2✔
1550

1✔
1551
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1552
        REQUIRE(top_level_table->size() == 1);
2!
1553
        REQUIRE(top_level_table->get_object_with_primary_key(valid_obj));
2!
1554

1✔
1555
        // Verify that a valid object afterwards does not produce an error
1✔
1556
        realm->begin_transaction();
2✔
1557
        Object::create(c, realm, "TopLevel",
2✔
1558
                       std::any(AnyDict{
2✔
1559
                           {"_id", ObjectId::gen()},
2✔
1560
                           {"queryable_str_field", "foo"s},
2✔
1561
                       }));
2✔
1562
        realm->commit_transaction();
2✔
1563

1✔
1564
        wait_for_upload(*realm);
2✔
1565
        wait_for_download(*realm);
2✔
1566
    }
2✔
1567

8✔
1568
    SECTION("compensating writes for each primary key type") {
16✔
1569
        SECTION("int") {
8✔
1570
            create_subscription("class_Int PK", [](auto q, auto col) {
2✔
1571
                return q.equal(col, "foo");
2✔
1572
            });
2✔
1573
            realm->begin_transaction();
2✔
1574
            realm->read_group().get_table("class_Int PK")->create_object_with_primary_key(123456);
2✔
1575
            realm->commit_transaction();
2✔
1576

1✔
1577
            validate_sync_error(std::move(error_future).get(), 123456, "Int PK",
2✔
1578
                                "write to \"123456\" in table \"Int PK\" not allowed");
2✔
1579
        }
2✔
1580

4✔
1581
        SECTION("short string") {
8✔
1582
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1583
                return q.equal(col, "foo");
2✔
1584
            });
2✔
1585
            realm->begin_transaction();
2✔
1586
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key("short");
2✔
1587
            realm->commit_transaction();
2✔
1588

1✔
1589
            validate_sync_error(std::move(error_future).get(), "short", "String PK",
2✔
1590
                                "write to \"short\" in table \"String PK\" not allowed");
2✔
1591
        }
2✔
1592

4✔
1593
        SECTION("long string") {
8✔
1594
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1595
                return q.equal(col, "foo");
2✔
1596
            });
2✔
1597
            realm->begin_transaction();
2✔
1598
            const char* pk = "long string which won't fit in the SSO buffer";
2✔
1599
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key(pk);
2✔
1600
            realm->commit_transaction();
2✔
1601

1✔
1602
            validate_sync_error(std::move(error_future).get(), pk, "String PK",
2✔
1603
                                util::format("write to \"%1\" in table \"String PK\" not allowed", pk));
2✔
1604
        }
2✔
1605

4✔
1606
        SECTION("uuid") {
8✔
1607
            create_subscription("class_UUID PK", [](auto q, auto col) {
2✔
1608
                return q.equal(col, "foo");
2✔
1609
            });
2✔
1610
            realm->begin_transaction();
2✔
1611
            UUID pk("01234567-9abc-4def-9012-3456789abcde");
2✔
1612
            realm->read_group().get_table("class_UUID PK")->create_object_with_primary_key(pk);
2✔
1613
            realm->commit_transaction();
2✔
1614

1✔
1615
            validate_sync_error(std::move(error_future).get(), pk, "UUID PK",
2✔
1616
                                util::format("write to \"UUID(%1)\" in table \"UUID PK\" not allowed", pk));
2✔
1617
        }
2✔
1618
    }
8✔
1619

8✔
1620
    // Clear the Realm afterwards as we're reusing an app
8✔
1621
    realm->begin_transaction();
16✔
1622
    table->clear();
16✔
1623
    realm->commit_transaction();
16✔
1624
    wait_for_upload(*realm);
16✔
1625
    realm.reset();
16✔
1626

8✔
1627
    // Add new sections before this
8✔
1628
    SECTION("teardown") {
16✔
1629
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1630
        harness.reset();
2✔
1631
    }
2✔
1632
}
16✔
1633

1634
TEST_CASE("flx: query on non-queryable field results in query error message", "[sync][flx][query][baas]") {
6✔
1635
    static std::optional<FLXSyncTestHarness> harness;
6✔
1636
    if (!harness) {
6✔
1637
        harness.emplace("flx_bad_query");
2✔
1638
    }
2✔
1639

3✔
1640
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
8✔
1641
        auto table = realm->read_group().get_table(table_name);
8✔
1642
        auto queryable_field = table->get_column_key(column_name);
8✔
1643
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
8✔
1644
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
8✔
1645
        return new_query.commit();
8✔
1646
    };
8✔
1647

3✔
1648
    auto check_status = [](auto status) {
6✔
1649
        CHECK(!status.is_ok());
6!
1650
        std::string reason = status.get_status().reason();
6✔
1651
        // Depending on the version of baas used, it may return 'Invalid query:' or
3✔
1652
        // 'Client provided query with bad syntax:'
3✔
1653
        if ((reason.find("Invalid query:") == std::string::npos &&
6✔
1654
             reason.find("Client provided query with bad syntax:") == std::string::npos) ||
3!
1655
            reason.find("\"TopLevel\": key \"non_queryable_field\" is not a queryable field") == std::string::npos) {
6✔
NEW
1656
            FAIL(reason);
×
UNCOV
1657
        }
×
1658
    };
6✔
1659

3✔
1660
    SECTION("Good query after bad query") {
6✔
1661
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1662
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1663
                return q.equal(c, "bar");
2✔
1664
            });
2✔
1665
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1666
            check_status(sub_res);
2✔
1667

1✔
1668
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1669
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1670

1✔
1671
            subs = create_subscription(realm, "class_TopLevel", "queryable_str_field", [](auto q, auto c) {
2✔
1672
                return q.equal(c, "foo");
2✔
1673
            });
2✔
1674
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1675

1✔
1676
            CHECK(realm->get_active_subscription_set().version() == 2);
2!
1677
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1678
        });
2✔
1679
    }
2✔
1680

3✔
1681
    SECTION("Bad query after bad query") {
6✔
1682
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1683
            auto sync_session = realm->sync_session();
2✔
1684
            sync_session->pause();
2✔
1685

1✔
1686
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1687
                return q.equal(c, "bar");
2✔
1688
            });
2✔
1689
            auto subs2 = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1690
                return q.equal(c, "bar");
2✔
1691
            });
2✔
1692

1✔
1693
            sync_session->resume();
2✔
1694

1✔
1695
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1696
            auto sub_res2 =
2✔
1697
                subs2.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1698

1✔
1699
            check_status(sub_res);
2✔
1700
            check_status(sub_res2);
2✔
1701

1✔
1702
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1703
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1704
        });
2✔
1705
    }
2✔
1706

3✔
1707
    // Add new sections before this
3✔
1708
    SECTION("teardown") {
6✔
1709
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1710
        harness.reset();
2✔
1711
    }
2✔
1712
}
6✔
1713

1714
#if REALM_ENABLE_GEOSPATIAL
1715
TEST_CASE("flx: geospatial", "[sync][flx][geospatial][baas]") {
4✔
1716
    static std::optional<FLXSyncTestHarness> harness;
4✔
1717
    if (!harness) {
4✔
1718
        Schema schema{
2✔
1719
            {"restaurant",
2✔
1720
             {
2✔
1721
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1722
                 {"queryable_str_field", PropertyType::String},
2✔
1723
                 {"location", PropertyType::Object | PropertyType::Nullable, "geoPointType"},
2✔
1724
                 {"array", PropertyType::Object | PropertyType::Array, "geoPointType"},
2✔
1725
             }},
2✔
1726
            {"geoPointType",
2✔
1727
             ObjectSchema::ObjectType::Embedded,
2✔
1728
             {
2✔
1729
                 {"type", PropertyType::String},
2✔
1730
                 {"coordinates", PropertyType::Double | PropertyType::Array},
2✔
1731
             }},
2✔
1732
        };
2✔
1733
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}};
2✔
1734
        harness.emplace("flx_geospatial", server_schema);
2✔
1735
    }
2✔
1736

2✔
1737
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
3✔
1738
        auto table = realm->read_group().get_table(table_name);
2✔
1739
        auto queryable_field = table->get_column_key(column_name);
2✔
1740
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
2✔
1741
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
2✔
1742
        return new_query.commit();
2✔
1743
    };
2✔
1744

2✔
1745
    // TODO: when this test starts failing because the server implements the new
2✔
1746
    // syntax, then we should implement an actual geospatial FLX query test here
2✔
1747
    /*
2✔
1748
    auto check_failed_status = [](auto status) {
2✔
1749
        CHECK(!status.is_ok());
2✔
1750
        if (status.get_status().reason().find("Client provided query with bad syntax:") == std::string::npos ||
2✔
1751
            status.get_status().reason().find("\"restaurant\": syntax error") == std::string::npos) {
2✔
1752
            FAIL(status.get_status().reason());
2✔
1753
        }
2✔
1754
    };
2✔
1755

2✔
1756
    SECTION("Server doesn't support GEOWITHIN yet") {
2✔
1757
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1758
            auto subs = create_subscription(realm, "class_restaurant", "location", [](Query q, ColKey c) {
2✔
1759
                GeoBox area{GeoPoint{0.2, 0.2}, GeoPoint{0.7, 0.7}};
2✔
1760
                return q.get_table()->column<Link>(c).geo_within(area);
2✔
1761
            });
2✔
1762
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1763
            check_failed_status(sub_res);
2✔
1764
            CHECK(realm->get_active_subscription_set().version() == 0);
2✔
1765
            CHECK(realm->get_latest_subscription_set().version() == 1);
2✔
1766
        });
2✔
1767
    }
2✔
1768
     */
2✔
1769

2✔
1770
    SECTION("non-geospatial FLX query syncs data which can be queried locally") {
4✔
1771
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1772
            auto subs = create_subscription(realm, "class_restaurant", "queryable_str_field", [](Query q, ColKey c) {
2✔
1773
                return q.equal(c, "synced");
2✔
1774
            });
2✔
1775
            auto make_polygon_filter = [&](const GeoPolygon& polygon) -> bson::BsonDocument {
20✔
1776
                bson::BsonArray inner{};
20✔
1777
                REALM_ASSERT_3(polygon.points.size(), ==, 1);
20✔
1778
                for (auto& point : polygon.points[0]) {
94✔
1779
                    inner.push_back(bson::BsonArray{point.longitude, point.latitude});
94✔
1780
                }
94✔
1781
                bson::BsonArray coords;
20✔
1782
                coords.push_back(inner);
20✔
1783
                bson::BsonDocument geo_bson{{{"type", "Polygon"}, {"coordinates", coords}}};
20✔
1784
                bson::BsonDocument filter{
20✔
1785
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$geometry", geo_bson}}}}}};
20✔
1786
                return filter;
20✔
1787
            };
20✔
1788
            auto make_circle_filter = [&](const GeoCircle& circle) -> bson::BsonDocument {
6✔
1789
                bson::BsonArray coords{circle.center.longitude, circle.center.latitude};
6✔
1790
                bson::BsonArray inner;
6✔
1791
                inner.push_back(coords);
6✔
1792
                inner.push_back(circle.radius_radians);
6✔
1793
                bson::BsonDocument filter{
6✔
1794
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$centerSphere", inner}}}}}};
6✔
1795
                return filter;
6✔
1796
            };
6✔
1797
            auto run_query_on_server = [&](const bson::BsonDocument& filter,
2✔
1798
                                           std::optional<std::string> expected_error = {}) -> size_t {
26✔
1799
                auto remote_client = harness->app()->current_user()->mongo_client("BackingDB");
26✔
1800
                auto db = remote_client.db(harness->session().app_session().config.mongo_dbname);
26✔
1801
                auto restaurant_collection = db["restaurant"];
26✔
1802
                bool processed = false;
26✔
1803
                constexpr int64_t limit = 1000;
26✔
1804
                size_t matches = 0;
26✔
1805
                restaurant_collection.count(filter, limit, [&](uint64_t count, util::Optional<AppError> error) {
26✔
1806
                    processed = true;
26✔
1807
                    if (error) {
26✔
1808
                        if (!expected_error) {
12✔
1809
                            util::format(std::cout, "query error: %1\n", error->reason());
×
1810
                            FAIL(error);
×
1811
                        }
×
1812
                        else {
12✔
1813
                            std::string reason = std::string(error->reason());
12✔
1814
                            std::transform(reason.begin(), reason.end(), reason.begin(), toLowerAscii);
12✔
1815
                            std::transform(expected_error->begin(), expected_error->end(), expected_error->begin(),
12✔
1816
                                           toLowerAscii);
12✔
1817
                            auto pos = reason.find(*expected_error);
12✔
1818
                            if (pos == std::string::npos) {
12✔
1819
                                util::format(std::cout, "mismatch error: '%1' and '%2'\n", reason, *expected_error);
×
1820
                                FAIL(reason);
×
1821
                            }
×
1822
                        }
12✔
1823
                    }
12✔
1824
                    matches = size_t(count);
26✔
1825
                });
26✔
1826
                REQUIRE(processed);
26!
1827
                return matches;
26✔
1828
            };
26✔
1829
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1830
            CHECK(sub_res.is_ok());
2!
1831
            CHECK(realm->get_active_subscription_set().version() == 1);
2!
1832
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1833

1✔
1834
            realm->begin_transaction();
2✔
1835

1✔
1836
            CppContext c(realm);
2✔
1837
            int64_t pk = 0;
2✔
1838
            auto add_point = [&](GeoPoint p) {
16✔
1839
                Object::create(
16✔
1840
                    c, realm, "restaurant",
16✔
1841
                    std::any(AnyDict{
16✔
1842
                        {"_id", ++pk},
16✔
1843
                        {"queryable_str_field", "synced"s},
16✔
1844
                        {"location", AnyDict{{"type", "Point"s},
16✔
1845
                                             {"coordinates", std::vector<std::any>{p.longitude, p.latitude}}}}}));
16✔
1846
            };
16✔
1847
            std::vector<GeoPoint> points = {
2✔
1848
                GeoPoint{-74.006, 40.712800000000001},            // New York city
2✔
1849
                GeoPoint{12.568300000000001, 55.676099999999998}, // Copenhagen
2✔
1850
                GeoPoint{12.082599999999999, 55.628},             // ragnarok, Roskilde
2✔
1851
                GeoPoint{-180.1, -90.1},                          // invalid
2✔
1852
                GeoPoint{0, 90},                                  // north pole
2✔
1853
                GeoPoint{-82.68193, 84.74653},                    // northern point that falls within a box later
2✔
1854
                GeoPoint{82.55243, 84.54981}, // another northern point, but on the other side of the pole
2✔
1855
                GeoPoint{2129, 89},           // invalid
2✔
1856
            };
2✔
1857
            for (auto& point : points) {
16✔
1858
                add_point(point);
16✔
1859
            }
16✔
1860
            realm->commit_transaction();
2✔
1861
            wait_for_upload(*realm);
2✔
1862

1✔
1863
            {
2✔
1864
                auto table = realm->read_group().get_table("class_restaurant");
2✔
1865
                CHECK(table->size() == points.size());
2!
1866
                Obj obj = table->get_object_with_primary_key(Mixed{1});
2✔
1867
                REQUIRE(obj);
2!
1868
                Geospatial geo = obj.get<Geospatial>("location");
2✔
1869
                REQUIRE(geo.get_type_string() == "Point");
2!
1870
                REQUIRE(geo.get_type() == Geospatial::Type::Point);
2!
1871
                GeoPoint point = geo.get<GeoPoint>();
2✔
1872
                REQUIRE(point.longitude == points[0].longitude);
2!
1873
                REQUIRE(point.latitude == points[0].latitude);
2!
1874
                REQUIRE(!point.get_altitude());
2!
1875
                ColKey location_col = table->get_column_key("location");
2✔
1876
                auto run_query_locally = [&table, &location_col](Geospatial bounds) -> size_t {
26✔
1877
                    Query query = table->column<Link>(location_col).geo_within(Geospatial(bounds));
26✔
1878
                    return query.find_all().size();
26✔
1879
                };
26✔
1880

1✔
1881
                reset_utils::wait_for_num_objects_in_atlas(
2✔
1882
                    harness->app()->current_user(), harness->session().app_session(), "restaurant", points.size());
2✔
1883

1✔
1884
                {
2✔
1885
                    GeoPolygon bounds{
2✔
1886
                        {{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}, GeoPoint{-80, 40.7128}}}};
2✔
1887
                    size_t local_matches = run_query_locally(bounds);
2✔
1888
                    size_t server_results = run_query_on_server(make_polygon_filter(bounds));
2✔
1889
                    CHECK(server_results == local_matches);
2!
1890
                }
2✔
1891
                {
2✔
1892
                    GeoCircle circle{.5, GeoPoint{0, 90}};
2✔
1893
                    size_t local_matches = run_query_locally(circle);
2✔
1894
                    size_t server_results = run_query_on_server(make_circle_filter(circle));
2✔
1895
                    CHECK(server_results == local_matches);
2!
1896
                }
2✔
1897
                { // a ring with 3 points without a matching begin/end is an error
2✔
1898
                    GeoPolygon open_bounds{{{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}}}};
2✔
1899
                    CHECK_THROWS_WITH(run_query_locally(open_bounds),
2✔
1900
                                      "Invalid region in GEOWITHIN query for parameter 'GeoPolygon({[-80, 40.7128], "
2✔
1901
                                      "[20, 60], [20, 20]})': 'Ring is not closed, first vertex 'GeoPoint([-80, "
2✔
1902
                                      "40.7128])' does not equal last vertex 'GeoPoint([20, 20])''");
2✔
1903
                    run_query_on_server(make_polygon_filter(open_bounds), "(BadValue) Loop is not closed");
2✔
1904
                }
2✔
1905
                {
2✔
1906
                    GeoCircle circle = GeoCircle::from_kms(10, GeoPoint{-180.1, -90.1});
2✔
1907
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
1908
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([-180.1, -90.1], "
2✔
1909
                                      "0.00156787)': 'Longitude/latitude is out of bounds, lng: -180.1 lat: -90.1'");
2✔
1910
                    run_query_on_server(make_circle_filter(circle), "(BadValue) longitude/latitude is out of bounds");
2✔
1911
                }
2✔
1912
                {
2✔
1913
                    GeoCircle circle = GeoCircle::from_kms(-1, GeoPoint{0, 0});
2✔
1914
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
1915
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([0, 0], "
2✔
1916
                                      "-0.000156787)': 'The radius of a circle must be a non-negative number'");
2✔
1917
                    run_query_on_server(make_circle_filter(circle),
2✔
1918
                                        "(BadValue) radius must be a non-negative number");
2✔
1919
                }
2✔
1920
                {
2✔
1921
                    // This box is from Gershøj to CPH airport. It includes CPH and Ragnarok but not NYC.
1✔
1922
                    std::vector<Geospatial> valid_box_variations = {
2✔
1923
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1924
                               GeoPoint{12.64773, 55.61211}}, // Gershøj, CPH Airport (Top Left, Bottom Right)
2✔
1925
                        GeoBox{GeoPoint{12.64773, 55.61211},
2✔
1926
                               GeoPoint{11.97575, 55.71601}}, // CPH Airport, Gershøj (Bottom Right, Top Left)
2✔
1927
                        GeoBox{GeoPoint{12.64773, 55.71601},
2✔
1928
                               GeoPoint{11.97575, 55.61211}}, // Upper Right, Bottom Left
2✔
1929
                        GeoBox{GeoPoint{11.97575, 55.61211},
2✔
1930
                               GeoPoint{12.64773, 55.71601}}, // Bottom Left, Upper Right
2✔
1931
                    };
2✔
1932
                    constexpr size_t expected_results = 2;
2✔
1933
                    for (auto& geo : valid_box_variations) {
8✔
1934
                        size_t local_matches = run_query_locally(geo);
8✔
1935
                        size_t server_matches =
8✔
1936
                            run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()));
8✔
1937
                        CHECK(local_matches == expected_results);
8!
1938
                        CHECK(server_matches == expected_results);
8!
1939
                    }
8✔
1940
                    std::vector<Geospatial> invalid_boxes = {
2✔
1941
                        GeoBox{GeoPoint{11.97575, 55.71601}, GeoPoint{11.97575, 55.71601}}, // same point twice
2✔
1942
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1943
                               GeoPoint{11.97575, 57.0}}, // two points on the same longitude
2✔
1944
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1945
                               GeoPoint{12, 55.71601}}, // two points on the same latitude
2✔
1946
                    };
2✔
1947
                    for (auto& geo : invalid_boxes) {
6✔
1948
                        REQUIRE_THROWS_CONTAINING(run_query_locally(geo),
6✔
1949
                                                  "Invalid region in GEOWITHIN query for parameter 'GeoPolygon");
6✔
1950
                        run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()),
6✔
1951
                                            "(BadValue) Loop must have at least 3 different vertices");
6✔
1952
                    }
6✔
1953
                }
2✔
1954
                { // a box region that wraps the north pole. It contains the north pole point
2✔
1955
                    // and two others, one each on distinct sides of the globe.
1✔
1956
                    constexpr double lat = 82.83799;
2✔
1957
                    Geospatial north_pole_box =
2✔
1958
                        GeoPolygon{{{GeoPoint{-78.33951, lat}, GeoPoint{-90.33951, lat}, GeoPoint{90.33951, lat},
2✔
1959
                                     GeoPoint{78.33951, lat}, GeoPoint{-78.33951, lat}}}};
2✔
1960
                    constexpr size_t num_matching_points = 3;
2✔
1961
                    size_t local_matches = run_query_locally(north_pole_box);
2✔
1962
                    size_t server_matches =
2✔
1963
                        run_query_on_server(make_polygon_filter(north_pole_box.get<GeoPolygon>()));
2✔
1964
                    CHECK(local_matches == num_matching_points);
2!
1965
                    CHECK(server_matches == num_matching_points);
2!
1966
                }
2✔
1967
            }
2✔
1968
        });
2✔
1969
    }
2✔
1970

2✔
1971
    // Add new sections before this
2✔
1972
    SECTION("teardown") {
4✔
1973
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1974
        harness.reset();
2✔
1975
    }
2✔
1976
}
4✔
1977
#endif // REALM_ENABLE_GEOSPATIAL
1978

1979
TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][flx][bootstrap][baas]") {
2✔
1980
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
2✔
1981

1✔
1982
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
1983
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
1984
                                          SyncConfig::FLXSyncEnabled{});
2✔
1985
    interrupted_realm_config.cache = false;
2✔
1986

1✔
1987
    {
2✔
1988
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
1989
        Realm::Config config = interrupted_realm_config;
2✔
1990
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
1991
        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
1992
        config.sync_config->on_sync_client_event_hook =
2✔
1993
            [promise = std::move(shared_promise), seen_version_one = false](std::weak_ptr<SyncSession> weak_session,
2✔
1994
                                                                            const SyncClientHookData& data) mutable {
18✔
1995
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
1996
                    return SyncClientHookAction::NoAction;
10✔
1997
                }
10✔
1998

4✔
1999
                auto session = weak_session.lock();
8✔
2000
                if (!session) {
8✔
2001
                    return SyncClientHookAction::NoAction;
×
2002
                }
×
2003

4✔
2004
                // If we haven't seen at least one download message for query version 1, then do nothing yet.
4✔
2005
                if (data.query_version == 0 || (data.query_version == 1 && !std::exchange(seen_version_one, true))) {
8✔
2006
                    return SyncClientHookAction::NoAction;
6✔
2007
                }
6✔
2008

1✔
2009
                REQUIRE(data.query_version == 1);
2!
2010
                REQUIRE(data.batch_state == sync::DownloadBatchState::MoreToCome);
2!
2011
                auto latest_subs = session->get_flx_subscription_store()->get_latest();
2✔
2012
                REQUIRE(latest_subs.version() == 1);
2!
2013
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2014

1✔
2015
                session->close();
2✔
2016
                promise->emplace_value();
2✔
2017

1✔
2018
                return SyncClientHookAction::TriggerReconnect;
2✔
2019
            };
2✔
2020

1✔
2021
        auto realm = Realm::get_shared_realm(config);
2✔
2022
        {
2✔
2023
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2024
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2025
            mut_subs.insert_or_assign(Query(table));
2✔
2026
            mut_subs.commit();
2✔
2027
        }
2✔
2028

1✔
2029
        interrupted.get();
2✔
2030
        realm->sync_session()->shutdown_and_wait();
2✔
2031
    }
2✔
2032

1✔
2033
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
2034

1✔
2035
    {
2✔
2036
        DBOptions options;
2✔
2037
        options.encryption_key = test_util::crypt_key();
2✔
2038
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2039
        auto sub_store = sync::SubscriptionStore::create(realm);
2✔
2040
        auto version_info = sub_store->get_version_info();
2✔
2041
        REQUIRE(version_info.active == 0);
2!
2042
        REQUIRE(version_info.latest == 1);
2!
2043
        auto latest_subs = sub_store->get_latest();
2✔
2044
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2045
        REQUIRE(latest_subs.size() == 1);
2!
2046
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
2047
    }
2✔
2048

1✔
2049
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2050
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2051
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2052
    wait_for_upload(*realm);
2✔
2053
    wait_for_download(*realm);
2✔
2054

1✔
2055
    wait_for_advance(*realm);
2✔
2056
    REQUIRE(table->size() == obj_ids_at_end.size());
2!
2057
    for (auto& id : obj_ids_at_end) {
10✔
2058
        REQUIRE(table->find_primary_key(Mixed{id}));
10!
2059
    }
10✔
2060

1✔
2061
    auto active_subs = realm->get_active_subscription_set();
2✔
2062
    auto latest_subs = realm->get_latest_subscription_set();
2✔
2063
    REQUIRE(active_subs.version() == latest_subs.version());
2!
2064
    REQUIRE(active_subs.version() == int64_t(1));
2!
2065
}
2✔
2066

2067
TEST_CASE("flx: dev mode uploads schema before query change", "[sync][flx][query][baas]") {
2✔
2068
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
2069
    auto default_schema = FLXSyncTestHarness::default_server_schema();
2✔
2070
    server_schema.queryable_fields = default_schema.queryable_fields;
2✔
2071
    server_schema.dev_mode_enabled = true;
2✔
2072
    server_schema.schema = Schema{};
2✔
2073

1✔
2074
    FLXSyncTestHarness harness("flx_dev_mode", server_schema);
2✔
2075
    auto foo_obj_id = ObjectId::gen();
2✔
2076
    auto bar_obj_id = ObjectId::gen();
2✔
2077
    harness.do_with_new_realm(
2✔
2078
        [&](SharedRealm realm) {
2✔
2079
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2080
            // auto queryable_str_field = table->get_column_key("queryable_str_field");
1✔
2081
            // auto queryable_int_field = table->get_column_key("queryable_int_field");
1✔
2082
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2083
            new_query.insert_or_assign(Query(table));
2✔
2084
            new_query.commit();
2✔
2085

1✔
2086
            CppContext c(realm);
2✔
2087
            realm->begin_transaction();
2✔
2088
            Object::create(c, realm, "TopLevel",
2✔
2089
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
2090
                                            {"queryable_str_field", "foo"s},
2✔
2091
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2092
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
2093
            Object::create(c, realm, "TopLevel",
2✔
2094
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
2095
                                            {"queryable_str_field", "bar"s},
2✔
2096
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2097
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
2098
            realm->commit_transaction();
2✔
2099

1✔
2100
            wait_for_upload(*realm);
2✔
2101
        },
2✔
2102
        default_schema.schema);
2✔
2103

1✔
2104
    harness.do_with_new_realm(
2✔
2105
        [&](SharedRealm realm) {
2✔
2106
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2107
            auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2108
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2109
            new_query.insert_or_assign(Query(table).greater_equal(queryable_int_field, int64_t(5)));
2✔
2110
            auto subs = new_query.commit();
2✔
2111
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2112
            wait_for_download(*realm);
2✔
2113
            Results results(realm, table);
2✔
2114

1✔
2115
            realm->refresh();
2✔
2116
            CHECK(results.size() == 2);
2!
2117
            CHECK(table->get_object_with_primary_key({foo_obj_id}).is_valid());
2!
2118
            CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2119
        },
2✔
2120
        default_schema.schema);
2✔
2121
}
2✔
2122

2123
// This is a test case for the server's fix for RCORE-969
2124
TEST_CASE("flx: change-of-query history divergence", "[sync][flx][query][baas]") {
2✔
2125
    FLXSyncTestHarness harness("flx_coq_divergence");
2✔
2126

1✔
2127
    // first we create an object on the server and upload it.
1✔
2128
    auto foo_obj_id = ObjectId::gen();
2✔
2129
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2130
        CppContext c(realm);
2✔
2131
        Object::create(c, realm, "TopLevel",
2✔
2132
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2133
                                        {"queryable_str_field", "foo"s},
2✔
2134
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2135
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
2136
    });
2✔
2137

1✔
2138
    // Now create another realm and wait for it to be fully synchronized with bootstrap version zero. i.e.
1✔
2139
    // our progress counters should be past the history entry containing the object created above.
1✔
2140
    auto test_file_config = harness.make_test_file();
2✔
2141
    auto realm = Realm::get_shared_realm(test_file_config);
2✔
2142
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2143
    auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2144

1✔
2145
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2146
    wait_for_upload(*realm);
2✔
2147
    wait_for_download(*realm);
2✔
2148

1✔
2149
    // Now disconnect the sync session
1✔
2150
    realm->sync_session()->pause();
2✔
2151

1✔
2152
    // And move the "foo" object created above into view and create a different diverging copy of it locally.
1✔
2153
    auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2154
    mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2155
    auto subs = mut_subs.commit();
2✔
2156

1✔
2157
    realm->begin_transaction();
2✔
2158
    CppContext c(realm);
2✔
2159
    Object::create(c, realm, "TopLevel",
2✔
2160
                   std::any(AnyDict{{"_id", foo_obj_id},
2✔
2161
                                    {"queryable_str_field", "foo"s},
2✔
2162
                                    {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2163
                                    {"non_queryable_field", "created locally"s}}));
2✔
2164
    realm->commit_transaction();
2✔
2165

1✔
2166
    // Reconnect the sync session and wait for the subscription that moved "foo" into view to be fully synchronized.
1✔
2167
    realm->sync_session()->resume();
2✔
2168
    wait_for_upload(*realm);
2✔
2169
    wait_for_download(*realm);
2✔
2170
    subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2171

1✔
2172
    wait_for_advance(*realm);
2✔
2173

1✔
2174
    // The bootstrap should have erase/re-created our object and we should have the version from the server
1✔
2175
    // locally.
1✔
2176
    auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2177
    REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2178
    REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2179

1✔
2180
    // Likewise, if we create a new realm and download all the objects, we should see the initial server version
1✔
2181
    // in the new realm rather than the "created locally" one.
1✔
2182
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2183
        CppContext c(realm);
2✔
2184

1✔
2185
        auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2186
        REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2187
        REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2188
    });
2✔
2189
}
2✔
2190

2191
TEST_CASE("flx: writes work offline", "[sync][flx][baas]") {
2✔
2192
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2193

1✔
2194
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2195
        auto sync_session = realm->sync_session();
2✔
2196
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2197
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2198
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2199
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2200
        new_query.insert_or_assign(Query(table));
2✔
2201
        new_query.commit();
2✔
2202

1✔
2203
        auto foo_obj_id = ObjectId::gen();
2✔
2204
        auto bar_obj_id = ObjectId::gen();
2✔
2205

1✔
2206
        CppContext c(realm);
2✔
2207
        realm->begin_transaction();
2✔
2208
        Object::create(c, realm, "TopLevel",
2✔
2209
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2210
                                        {"queryable_str_field", "foo"s},
2✔
2211
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2212
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2213
        Object::create(c, realm, "TopLevel",
2✔
2214
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2215
                                        {"queryable_str_field", "bar"s},
2✔
2216
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2217
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2218
        realm->commit_transaction();
2✔
2219

1✔
2220
        wait_for_upload(*realm);
2✔
2221
        wait_for_download(*realm);
2✔
2222
        sync_session->pause();
2✔
2223

1✔
2224
        // Make it so the subscriptions only match the "foo" object
1✔
2225
        {
2✔
2226
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2227
            mut_subs.clear();
2✔
2228
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2229
            mut_subs.commit();
2✔
2230
        }
2✔
2231

1✔
2232
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2233
        // multiple subscription set updates offline and that the last one eventually takes effect when
1✔
2234
        // you come back online and fully synchronize.
1✔
2235
        {
2✔
2236
            Results results(realm, table);
2✔
2237
            realm->begin_transaction();
2✔
2238
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2239
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2240
            realm->commit_transaction();
2✔
2241
        }
2✔
2242

1✔
2243
        // Update our subscriptions so that both foo/bar will be included
1✔
2244
        {
2✔
2245
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2246
            mut_subs.clear();
2✔
2247
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2248
            mut_subs.commit();
2✔
2249
        }
2✔
2250

1✔
2251
        // Make foo out of view for the current subscription.
1✔
2252
        {
2✔
2253
            Results results(realm, table);
2✔
2254
            realm->begin_transaction();
2✔
2255
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2256
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2257
            realm->commit_transaction();
2✔
2258
        }
2✔
2259

1✔
2260
        sync_session->resume();
2✔
2261
        wait_for_upload(*realm);
2✔
2262
        wait_for_download(*realm);
2✔
2263

1✔
2264
        realm->refresh();
2✔
2265
        Results results(realm, table);
2✔
2266
        CHECK(results.size() == 1);
2!
2267
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2268
    });
2✔
2269
}
2✔
2270

2271
TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") {
2✔
2272
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2273

1✔
2274
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2275
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2276
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2277
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2278
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2279
        new_query.insert_or_assign(Query(table));
2✔
2280
        new_query.commit();
2✔
2281

1✔
2282
        auto foo_obj_id = ObjectId::gen();
2✔
2283
        auto bar_obj_id = ObjectId::gen();
2✔
2284

1✔
2285
        CppContext c(realm);
2✔
2286
        realm->begin_transaction();
2✔
2287
        Object::create(c, realm, "TopLevel",
2✔
2288
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2289
                                        {"queryable_str_field", "foo"s},
2✔
2290
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2291
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2292
        Object::create(c, realm, "TopLevel",
2✔
2293
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2294
                                        {"queryable_str_field", "bar"s},
2✔
2295
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2296
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2297
        realm->commit_transaction();
2✔
2298

1✔
2299
        wait_for_upload(*realm);
2✔
2300

1✔
2301
        // Make it so the subscriptions only match the "foo" object
1✔
2302
        {
2✔
2303
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2304
            mut_subs.clear();
2✔
2305
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2306
            mut_subs.commit();
2✔
2307
        }
2✔
2308

1✔
2309
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2310
        // multiple subscription set updates without waiting and that the last one eventually takes effect when
1✔
2311
        // you fully synchronize.
1✔
2312
        {
2✔
2313
            Results results(realm, table);
2✔
2314
            realm->begin_transaction();
2✔
2315
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2316
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2317
            realm->commit_transaction();
2✔
2318
        }
2✔
2319

1✔
2320
        // Update our subscriptions so that both foo/bar will be included
1✔
2321
        {
2✔
2322
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2323
            mut_subs.clear();
2✔
2324
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2325
            mut_subs.commit();
2✔
2326
        }
2✔
2327

1✔
2328
        // Make foo out-of-view for the current subscription.
1✔
2329
        {
2✔
2330
            Results results(realm, table);
2✔
2331
            realm->begin_transaction();
2✔
2332
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2333
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2334
            realm->commit_transaction();
2✔
2335
        }
2✔
2336

1✔
2337
        wait_for_upload(*realm);
2✔
2338
        wait_for_download(*realm);
2✔
2339

1✔
2340
        realm->refresh();
2✔
2341
        Results results(realm, table);
2✔
2342
        CHECK(results.size() == 1);
2!
2343
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2344
    });
2✔
2345
}
2✔
2346

2347
TEST_CASE("flx: verify websocket protocol number and prefixes", "[sync][protocol]") {
2✔
2348
    // Update the expected value whenever the protocol version is updated - this ensures
1✔
2349
    // that the current protocol version does not change unexpectedly.
1✔
2350
    REQUIRE(10 == sync::get_current_protocol_version());
2✔
2351
    // This was updated in Protocol V8 to use '#' instead of '/' to support the Web SDK
1✔
2352
    REQUIRE("com.mongodb.realm-sync#" == sync::get_pbs_websocket_protocol_prefix());
2✔
2353
    REQUIRE("com.mongodb.realm-query-sync#" == sync::get_flx_websocket_protocol_prefix());
2✔
2354
}
2✔
2355

2356
// TODO: remote-baas: This test fails consistently with Windows remote baas server - to be fixed in RCORE-1674
2357
#ifndef _WIN32
2358
TEST_CASE("flx: subscriptions persist after closing/reopening", "[sync][flx][baas]") {
2✔
2359
    FLXSyncTestHarness harness("flx_bad_query");
2✔
2360
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2361

1✔
2362
    {
2✔
2363
        auto orig_realm = Realm::get_shared_realm(config);
2✔
2364
        auto mut_subs = orig_realm->get_latest_subscription_set().make_mutable_copy();
2✔
2365
        mut_subs.insert_or_assign(Query(orig_realm->read_group().get_table("class_TopLevel")));
2✔
2366
        mut_subs.commit();
2✔
2367
        orig_realm->close();
2✔
2368
    }
2✔
2369

1✔
2370
    {
2✔
2371
        auto new_realm = Realm::get_shared_realm(config);
2✔
2372
        auto latest_subs = new_realm->get_latest_subscription_set();
2✔
2373
        CHECK(latest_subs.size() == 1);
2!
2374
        latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2375
    }
2✔
2376
}
2✔
2377
#endif
2378

2379
TEST_CASE("flx: no subscription store created for PBS app", "[sync][flx][baas]") {
2✔
2380
    const std::string base_url = get_base_url();
2✔
2381
    auto server_app_config = minimal_app_config(base_url, "flx_connect_as_pbs", g_minimal_schema);
2✔
2382
    TestAppSession session(create_app(server_app_config));
2✔
2383
    SyncTestFile config(session.app(), bson::Bson{}, g_minimal_schema);
2✔
2384

1✔
2385
    auto realm = Realm::get_shared_realm(config);
2✔
2386
    CHECK(!wait_for_download(*realm));
2!
2387
    CHECK(!wait_for_upload(*realm));
2!
2388

1✔
2389
    CHECK(!realm->sync_session()->get_flx_subscription_store());
2!
2390

1✔
2391
    CHECK_THROWS_AS(realm->get_active_subscription_set(), IllegalOperation);
2✔
2392
    CHECK_THROWS_AS(realm->get_latest_subscription_set(), IllegalOperation);
2✔
2393
}
2✔
2394

2395
TEST_CASE("flx: connect to FLX as PBS returns an error", "[sync][flx][baas]") {
2✔
2396
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2397
    SyncTestFile config(harness.app(), bson::Bson{}, harness.schema());
2✔
2398
    std::mutex sync_error_mutex;
2✔
2399
    util::Optional<SyncError> sync_error;
2✔
2400
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2401
        std::lock_guard<std::mutex> lk(sync_error_mutex);
2✔
2402
        sync_error = std::move(error);
2✔
2403
    };
2✔
2404
    auto realm = Realm::get_shared_realm(config);
2✔
2405
    timed_wait_for([&] {
6,938✔
2406
        std::lock_guard<std::mutex> lk(sync_error_mutex);
6,938✔
2407
        return static_cast<bool>(sync_error);
6,938✔
2408
    });
6,938✔
2409

1✔
2410
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2411
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2412
}
2✔
2413

2414
TEST_CASE("flx: connect to FLX with partition value returns an error", "[sync][flx][protocol][baas]") {
2✔
2415
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2416
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2417
    config.sync_config->partition_value = "\"foobar\"";
2✔
2418

1✔
2419
    REQUIRE_EXCEPTION(Realm::get_shared_realm(config), IllegalCombination,
2✔
2420
                      "Cannot specify a partition value when flexible sync is enabled");
2✔
2421
}
2✔
2422

2423
TEST_CASE("flx: connect to PBS as FLX returns an error", "[sync][flx][protocol][baas]") {
2✔
2424
    const std::string base_url = get_base_url();
2✔
2425

1✔
2426
    auto server_app_config = minimal_app_config(base_url, "flx_connect_as_pbs", g_minimal_schema);
2✔
2427
    TestAppSession session(create_app(server_app_config));
2✔
2428
    auto app = session.app();
2✔
2429
    auto user = app->current_user();
2✔
2430

1✔
2431
    SyncTestFile config(user, g_minimal_schema, SyncConfig::FLXSyncEnabled{});
2✔
2432

1✔
2433
    std::mutex sync_error_mutex;
2✔
2434
    util::Optional<SyncError> sync_error;
2✔
2435
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2436
        std::lock_guard lk(sync_error_mutex);
2✔
2437
        sync_error = std::move(error);
2✔
2438
    };
2✔
2439
    auto realm = Realm::get_shared_realm(config);
2✔
2440
    timed_wait_for([&] {
14,566✔
2441
        std::lock_guard lk(sync_error_mutex);
14,566✔
2442
        return static_cast<bool>(sync_error);
14,566✔
2443
    });
14,566✔
2444

1✔
2445
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2446
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2447
}
2✔
2448

2449
TEST_CASE("flx: commit subscription while refreshing the access token", "[sync][flx][token][baas]") {
2✔
2450
    class HookedTransport : public SynchronousTestTransport {
2✔
2451
    public:
2✔
2452
        void send_request_to_server(const Request& request,
2✔
2453
                                    util::UniqueFunction<void(const Response&)>&& completion) override
2✔
2454
        {
10✔
2455
            if (request_hook) {
10✔
2456
                request_hook(request);
2✔
2457
            }
2✔
2458
            SynchronousTestTransport::send_request_to_server(request, std::move(completion));
10✔
2459
        }
10✔
2460
        util::UniqueFunction<void(const Request&)> request_hook;
2✔
2461
    };
2✔
2462

1✔
2463
    auto transport = std::make_shared<HookedTransport>();
2✔
2464
    FLXSyncTestHarness harness("flx_wait_access_token2", FLXSyncTestHarness::default_server_schema(), transport);
2✔
2465
    auto app = harness.app();
2✔
2466
    std::shared_ptr<SyncUser> user = app->current_user();
2✔
2467
    REQUIRE(user);
2!
2468
    REQUIRE(!user->access_token_refresh_required());
2!
2469
    // Set a bad access token, with an expired time. This will trigger a refresh initiated by the client.
1✔
2470
    std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
2✔
2471
    using namespace std::chrono_literals;
2✔
2472
    auto expires = std::chrono::system_clock::to_time_t(now - 30s);
2✔
2473
    user->update_access_token(encode_fake_jwt("fake_access_token", expires));
2✔
2474
    REQUIRE(user->access_token_refresh_required());
2!
2475

1✔
2476
    bool seen_waiting_for_access_token = false;
2✔
2477
    // Commit a subcription set while there is no sync session.
1✔
2478
    // A session is created when the access token is refreshed.
1✔
2479
    transport->request_hook = [&](const Request&) {
2✔
2480
        auto user = app->current_user();
2✔
2481
        REQUIRE(user);
2!
2482
        for (auto& session : user->all_sessions()) {
2✔
2483
            if (session->state() == SyncSession::State::WaitingForAccessToken) {
2✔
2484
                REQUIRE(!seen_waiting_for_access_token);
2!
2485
                seen_waiting_for_access_token = true;
2✔
2486

1✔
2487
                auto store = session->get_flx_subscription_store();
2✔
2488
                REQUIRE(store);
2!
2489
                auto mut_subs = store->get_latest().make_mutable_copy();
2✔
2490
                mut_subs.commit();
2✔
2491
            }
2✔
2492
        }
2✔
2493
    };
2✔
2494
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2495
    // This triggers the token refresh.
1✔
2496
    auto r = Realm::get_shared_realm(config);
2✔
2497
    REQUIRE(seen_waiting_for_access_token);
2!
2498
}
2✔
2499

2500
TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][bootstrap][baas]") {
6✔
2501
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
6✔
2502

3✔
2503
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
6✔
2504
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
6✔
2505
                                          SyncConfig::FLXSyncEnabled{});
6✔
2506
    interrupted_realm_config.cache = false;
6✔
2507

3✔
2508
    auto check_interrupted_state = [&](const DBRef& realm) {
6✔
2509
        auto tr = realm->start_read();
6✔
2510
        auto top_level = tr->get_table("class_TopLevel");
6✔
2511
        REQUIRE(top_level);
6!
2512
        REQUIRE(top_level->is_empty());
6!
2513

3✔
2514
        auto sub_store = sync::SubscriptionStore::create(realm);
6✔
2515
        auto version_info = sub_store->get_version_info();
6✔
2516
        REQUIRE(version_info.latest == 1);
6!
2517
        REQUIRE(version_info.active == 0);
6!
2518
        auto latest_subs = sub_store->get_latest();
6✔
2519
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
6!
2520
        REQUIRE(latest_subs.size() == 1);
6!
2521
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
6!
2522
    };
6✔
2523

3✔
2524
    auto mutate_realm = [&] {
5✔
2525
        harness.load_initial_data([&](SharedRealm realm) {
4✔
2526
            auto table = realm->read_group().get_table("class_TopLevel");
4✔
2527
            Results res(realm, Query(table).greater(table->get_column_key("queryable_int_field"), int64_t(10)));
4✔
2528
            REQUIRE(res.size() == 2);
4!
2529
            res.clear();
4✔
2530
        });
4✔
2531
    };
4✔
2532

3✔
2533
    SECTION("exception occurs during bootstrap application") {
6✔
2534
        {
2✔
2535
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2536
            Realm::Config config = interrupted_realm_config;
2✔
2537
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2538
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2539
            config.sync_config->on_sync_client_event_hook =
2✔
2540
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2541
                                                      const SyncClientHookData& data) mutable {
36✔
2542
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2543
                        return SyncClientHookAction::NoAction;
22✔
2544
                    }
22✔
2545
                    auto session = weak_session.lock();
14✔
2546
                    if (!session) {
14✔
2547
                        return SyncClientHookAction::NoAction;
×
2548
                    }
×
2549

7✔
2550
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2551
                        session->close();
2✔
2552
                        promise->emplace_value();
2✔
2553
                        return SyncClientHookAction::EarlyReturn;
2✔
2554
                    }
2✔
2555
                    return SyncClientHookAction::NoAction;
12✔
2556
                };
12✔
2557
            auto realm = Realm::get_shared_realm(config);
2✔
2558
            {
2✔
2559
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2560
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2561
                mut_subs.insert_or_assign(Query(table));
2✔
2562
                mut_subs.commit();
2✔
2563
            }
2✔
2564

1✔
2565
            interrupted.get();
2✔
2566
            realm->sync_session()->shutdown_and_wait();
2✔
2567
            realm->close();
2✔
2568
        }
2✔
2569

1✔
2570
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2571

1✔
2572
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2573
        // we expected it to be in.
1✔
2574
        {
2✔
2575
            DBOptions options;
2✔
2576
            options.encryption_key = test_util::crypt_key();
2✔
2577
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2578
            auto logger = util::Logger::get_default_logger();
2✔
2579
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2580
            REQUIRE(bootstrap_store.has_pending());
2!
2581
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2582
            REQUIRE(pending_batch.query_version == 1);
2!
2583
            REQUIRE(pending_batch.progress);
2!
2584

1✔
2585
            check_interrupted_state(realm);
2✔
2586
        }
2✔
2587

1✔
2588
        interrupted_realm_config.sync_config->simulate_integration_error = true;
2✔
2589
        auto error_pf = util::make_promise_future<SyncError>();
2✔
2590
        interrupted_realm_config.sync_config->error_handler =
2✔
2591
            [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
2✔
2592
                std::shared_ptr<SyncSession>, SyncError error) {
2✔
2593
                promise->emplace_value(std::move(error));
2✔
2594
            };
2✔
2595

1✔
2596
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2597
        const auto& error = error_pf.future.get();
2✔
2598
        REQUIRE(error.is_fatal);
2!
2599
        REQUIRE(error.status == ErrorCodes::BadChangeset);
2!
2600
    }
2✔
2601

3✔
2602
    SECTION("interrupted before final bootstrap message") {
6✔
2603
        {
2✔
2604
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2605
            Realm::Config config = interrupted_realm_config;
2✔
2606
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2607
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2608
            config.sync_config->on_sync_client_event_hook =
2✔
2609
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2610
                                                      const SyncClientHookData& data) mutable {
16✔
2611
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
16✔
2612
                        return SyncClientHookAction::NoAction;
12✔
2613
                    }
12✔
2614
                    auto session = weak_session.lock();
4✔
2615
                    if (!session) {
4✔
2616
                        return SyncClientHookAction::NoAction;
×
2617
                    }
×
2618

2✔
2619
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::MoreToCome) {
4✔
2620
                        session->force_close();
2✔
2621
                        promise->emplace_value();
2✔
2622
                        return SyncClientHookAction::TriggerReconnect;
2✔
2623
                    }
2✔
2624
                    return SyncClientHookAction::NoAction;
2✔
2625
                };
2✔
2626
            auto realm = Realm::get_shared_realm(config);
2✔
2627
            {
2✔
2628
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2629
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2630
                mut_subs.insert_or_assign(Query(table));
2✔
2631
                mut_subs.commit();
2✔
2632
            }
2✔
2633

1✔
2634
            interrupted.get();
2✔
2635
            realm->sync_session()->shutdown_and_wait();
2✔
2636
            realm->close();
2✔
2637
        }
2✔
2638

1✔
2639
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2640

1✔
2641
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2642
        // we expected it to be in.
1✔
2643
        {
2✔
2644
            DBOptions options;
2✔
2645
            options.encryption_key = test_util::crypt_key();
2✔
2646
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2647
            auto logger = util::Logger::get_default_logger();
2✔
2648
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2649
            REQUIRE(bootstrap_store.has_pending());
2!
2650
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2651
            REQUIRE(pending_batch.query_version == 1);
2!
2652
            REQUIRE(!pending_batch.progress);
2!
2653
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2654
            REQUIRE(pending_batch.changesets.size() == 1);
2!
2655

1✔
2656
            check_interrupted_state(realm);
2✔
2657
        }
2✔
2658

1✔
2659
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2660
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2661
        mutate_realm();
2✔
2662

1✔
2663
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
2664
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2665
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2666
        realm->get_latest_subscription_set()
2✔
2667
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
2668
            .get();
2✔
2669
        wait_for_upload(*realm);
2✔
2670
        wait_for_download(*realm);
2✔
2671

1✔
2672
        wait_for_advance(*realm);
2✔
2673
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
2674

1✔
2675
        REQUIRE(table->size() == expected_obj_ids.size());
2!
2676
        for (auto& id : expected_obj_ids) {
6✔
2677
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
2678
        }
6✔
2679
    }
2✔
2680

3✔
2681
    SECTION("interrupted after final bootstrap message before processing") {
6✔
2682
        {
2✔
2683
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2684
            Realm::Config config = interrupted_realm_config;
2✔
2685
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2686
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2687
            config.sync_config->on_sync_client_event_hook =
2✔
2688
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2689
                                                      const SyncClientHookData& data) mutable {
36✔
2690
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2691
                        return SyncClientHookAction::NoAction;
22✔
2692
                    }
22✔
2693
                    auto session = weak_session.lock();
14✔
2694
                    if (!session) {
14✔
2695
                        return SyncClientHookAction::NoAction;
×
2696
                    }
×
2697

7✔
2698
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2699
                        session->force_close();
2✔
2700
                        promise->emplace_value();
2✔
2701
                        return SyncClientHookAction::TriggerReconnect;
2✔
2702
                    }
2✔
2703
                    return SyncClientHookAction::NoAction;
12✔
2704
                };
12✔
2705
            auto realm = Realm::get_shared_realm(config);
2✔
2706
            {
2✔
2707
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2708
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2709
                mut_subs.insert_or_assign(Query(table));
2✔
2710
                mut_subs.commit();
2✔
2711
            }
2✔
2712

1✔
2713
            interrupted.get();
2✔
2714
            realm->sync_session()->shutdown_and_wait();
2✔
2715
            realm->close();
2✔
2716
        }
2✔
2717

1✔
2718
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2719

1✔
2720
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2721
        // we expected it to be in.
1✔
2722
        {
2✔
2723
            DBOptions options;
2✔
2724
            options.encryption_key = test_util::crypt_key();
2✔
2725
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2726
            auto logger = util::Logger::get_default_logger();
2✔
2727
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2728
            REQUIRE(bootstrap_store.has_pending());
2!
2729
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2730
            REQUIRE(pending_batch.query_version == 1);
2!
2731
            REQUIRE(static_cast<bool>(pending_batch.progress));
2!
2732
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2733
            REQUIRE(pending_batch.changesets.size() == 6);
2!
2734

1✔
2735
            check_interrupted_state(realm);
2✔
2736
        }
2✔
2737

1✔
2738
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2739
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2740
        mutate_realm();
2✔
2741

1✔
2742
        auto [saw_valid_state_promise, saw_valid_state_future] = util::make_promise_future<void>();
2✔
2743
        auto shared_saw_valid_state_promise =
2✔
2744
            std::make_shared<decltype(saw_valid_state_promise)>(std::move(saw_valid_state_promise));
2✔
2745
        // This hook will let us check what the state of the realm is before it's integrated any new download
1✔
2746
        // messages from the server. This should be the full 5 object bootstrap that was received before we
1✔
2747
        // called mutate_realm().
1✔
2748
        interrupted_realm_config.sync_config->on_sync_client_event_hook =
2✔
2749
            [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2750
                                                                     const SyncClientHookData& data) {
18✔
2751
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
2752
                    return SyncClientHookAction::NoAction;
16✔
2753
                }
16✔
2754
                auto session = weak_session.lock();
2✔
2755
                if (!session) {
2✔
2756
                    return SyncClientHookAction::NoAction;
×
2757
                }
×
2758

1✔
2759
                if (data.query_version != 1 || data.batch_state == sync::DownloadBatchState::MoreToCome) {
2✔
2760
                    return SyncClientHookAction::NoAction;
×
2761
                }
×
2762

1✔
2763
                auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
2✔
2764
                auto active_sub_set = session->get_flx_subscription_store()->get_active();
2✔
2765
                auto version_info = session->get_flx_subscription_store()->get_version_info();
2✔
2766
                REQUIRE(version_info.pending_mark == active_sub_set.version());
2!
2767
                REQUIRE(version_info.active == active_sub_set.version());
2!
2768
                REQUIRE(version_info.latest == latest_sub_set.version());
2!
2769
                REQUIRE(latest_sub_set.version() == active_sub_set.version());
2!
2770
                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
2771

1✔
2772
                auto db = SyncSession::OnlyForTesting::get_db(*session);
2✔
2773
                auto tr = db->start_read();
2✔
2774

1✔
2775
                auto table = tr->get_table("class_TopLevel");
2✔
2776
                REQUIRE(table->size() == obj_ids_at_end.size());
2!
2777
                for (auto& id : obj_ids_at_end) {
10✔
2778
                    REQUIRE(table->find_primary_key(Mixed{id}));
10!
2779
                }
10✔
2780

1✔
2781
                promise->emplace_value();
2✔
2782
                return SyncClientHookAction::NoAction;
2✔
2783
            };
2✔
2784

1✔
2785
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
2786
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2787
        saw_valid_state_future.get();
2✔
2788
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2789
        realm->get_latest_subscription_set()
2✔
2790
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
2791
            .get();
2✔
2792
        wait_for_upload(*realm);
2✔
2793
        wait_for_download(*realm);
2✔
2794
        wait_for_advance(*realm);
2✔
2795

1✔
2796
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
2797

1✔
2798
        // After we've downloaded all the mutations there should only by 3 objects left.
1✔
2799
        REQUIRE(table->size() == expected_obj_ids.size());
2!
2800
        for (auto& id : expected_obj_ids) {
6✔
2801
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
2802
        }
6✔
2803
    }
2✔
2804
}
6✔
2805

2806
// Check that a document with the given id is present and has the expected fields
2807
static void check_document(const std::vector<bson::BsonDocument>& documents, ObjectId id,
2808
                           std::initializer_list<std::pair<const char*, bson::Bson>> fields)
2809
{
428✔
2810
    auto it = std::find_if(documents.begin(), documents.end(), [&](auto&& doc) {
43,096✔
2811
        auto it = doc.entries().find("_id");
43,096✔
2812
        REQUIRE(it != doc.entries().end());
43,096!
2813
        return it->second == id;
43,096✔
2814
    });
43,096✔
2815
    REQUIRE(it != documents.end());
428!
2816
    auto& doc = it->entries();
428✔
2817
    for (auto& [name, expected_value] : fields) {
434✔
2818
        auto it = doc.find(name);
434✔
2819
        REQUIRE(it != doc.end());
434!
2820

217✔
2821
        // bson documents are ordered  but Realm dictionaries aren't, so the
217✔
2822
        // document might validly be in a different order than we expected and
217✔
2823
        // we need to do a comparison that doesn't check order.
217✔
2824
        if (expected_value.type() == bson::Bson::Type::Document) {
434✔
2825
            REQUIRE(static_cast<const bson::BsonDocument&>(it->second).entries() ==
8!
2826
                    static_cast<const bson::BsonDocument&>(expected_value).entries());
8✔
2827
        }
8✔
2828
        else {
426✔
2829
            REQUIRE(it->second == expected_value);
426!
2830
        }
426✔
2831
    }
434✔
2832
}
428✔
2833

2834
TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") {
22✔
2835
    using namespace ::realm::bson;
22✔
2836

11✔
2837
    static auto server_schema = [] {
12✔
2838
        FLXSyncTestHarness::ServerSchema server_schema;
2✔
2839
        server_schema.queryable_fields = {"queryable_str_field"};
2✔
2840
        server_schema.schema = {
2✔
2841
            {"Asymmetric",
2✔
2842
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
2843
             {
2✔
2844
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
2845
                 {"location", PropertyType::String | PropertyType::Nullable},
2✔
2846
                 {"embedded obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
2✔
2847
                 {"embedded list", PropertyType::Object | PropertyType::Array, "Embedded"},
2✔
2848
                 {"embedded dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
2✔
2849
                  "Embedded"},
2✔
2850
                 {"link obj", PropertyType::Object | PropertyType::Nullable, "TopLevel"},
2✔
2851
                 {"link list", PropertyType::Object | PropertyType::Array, "TopLevel"},
2✔
2852
                 {"link dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
2✔
2853
                  "TopLevel"},
2✔
2854
             }},
2✔
2855
            {"Embedded", ObjectSchema::ObjectType::Embedded, {{"value", PropertyType::String}}},
2✔
2856
            {"TopLevel",
2✔
2857
             {
2✔
2858
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
2859
                 {"value", PropertyType::Int},
2✔
2860
             }},
2✔
2861
        };
2✔
2862
        return server_schema;
2✔
2863
    }();
2✔
2864
    static auto harness = std::make_unique<FLXSyncTestHarness>("asymmetric_sync", server_schema);
22✔
2865

11✔
2866
    // We reuse a single app for each section, so tests will see the documents
11✔
2867
    // created by previous tests and we need to add those documents to the count
11✔
2868
    // we're waiting for
11✔
2869
    static std::unordered_map<std::string, size_t> previous_count;
22✔
2870
    auto get_documents = [&](const char* name, size_t expected_count) {
20✔
2871
        auto& count = previous_count[name];
18✔
2872
        auto documents =
18✔
2873
            harness->session().get_documents(*harness->app()->current_user(), name, count + expected_count);
18✔
2874
        count = documents.size();
18✔
2875
        return documents;
18✔
2876
    };
18✔
2877

11✔
2878
    SECTION("basic object construction") {
22✔
2879
        auto foo_obj_id = ObjectId::gen();
2✔
2880
        auto bar_obj_id = ObjectId::gen();
2✔
2881
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2882
            realm->begin_transaction();
2✔
2883
            CppContext c(realm);
2✔
2884
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
2885
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
2886
            realm->commit_transaction();
2✔
2887

1✔
2888
            auto documents = get_documents("Asymmetric", 2);
2✔
2889
            check_document(documents, foo_obj_id, {{"location", "foo"}});
2✔
2890
            check_document(documents, bar_obj_id, {{"location", "bar"}});
2✔
2891
        });
2✔
2892

1✔
2893
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2894
            wait_for_download(*realm);
2✔
2895

1✔
2896
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2897
            REQUIRE(table->size() == 0);
2!
2898
            // Cannot query asymmetric tables.
1✔
2899
            CHECK_THROWS_AS(Query(table), LogicError);
2✔
2900
        });
2✔
2901
    }
2✔
2902

11✔
2903
    SECTION("do not allow objects with same key within the same transaction") {
22✔
2904
        auto foo_obj_id = ObjectId::gen();
2✔
2905
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2906
            realm->begin_transaction();
2✔
2907
            CppContext c(realm);
2✔
2908
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
2909
            CHECK_THROWS_WITH(
2✔
2910
                Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "bar"s}})),
2✔
2911
                "Attempting to create an object of type 'Asymmetric' with an existing primary key value 'not "
2✔
2912
                "implemented'");
2✔
2913
            realm->commit_transaction();
2✔
2914

1✔
2915
            auto documents = get_documents("Asymmetric", 1);
2✔
2916
            check_document(documents, foo_obj_id, {{"location", "foo"}});
2✔
2917
        });
2✔
2918
    }
2✔
2919

11✔
2920
    SECTION("create multiple objects - separate commits") {
22✔
2921
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2922
            CppContext c(realm);
2✔
2923
            std::vector<ObjectId> obj_ids;
2✔
2924
            for (int i = 0; i < 100; ++i) {
202✔
2925
                realm->begin_transaction();
200✔
2926
                obj_ids.push_back(ObjectId::gen());
200✔
2927
                Object::create(c, realm, "Asymmetric",
200✔
2928
                               std::any(AnyDict{
200✔
2929
                                   {"_id", obj_ids.back()},
200✔
2930
                                   {"location", util::format("foo_%1", i)},
200✔
2931
                               }));
200✔
2932
                realm->commit_transaction();
200✔
2933
            }
200✔
2934

1✔
2935
            wait_for_upload(*realm);
2✔
2936
            wait_for_download(*realm);
2✔
2937

1✔
2938
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2939
            REQUIRE(table->size() == 0);
2!
2940

1✔
2941
            auto documents = get_documents("Asymmetric", 100);
2✔
2942
            for (int i = 0; i < 100; ++i) {
202✔
2943
                check_document(documents, obj_ids[i], {{"location", util::format("foo_%1", i)}});
200✔
2944
            }
200✔
2945
        });
2✔
2946
    }
2✔
2947

11✔
2948
    SECTION("create multiple objects - same commit") {
22✔
2949
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2950
            CppContext c(realm);
2✔
2951
            realm->begin_transaction();
2✔
2952
            std::vector<ObjectId> obj_ids;
2✔
2953
            for (int i = 0; i < 100; ++i) {
202✔
2954
                obj_ids.push_back(ObjectId::gen());
200✔
2955
                Object::create(c, realm, "Asymmetric",
200✔
2956
                               std::any(AnyDict{
200✔
2957
                                   {"_id", obj_ids.back()},
200✔
2958
                                   {"location", util::format("bar_%1", i)},
200✔
2959
                               }));
200✔
2960
            }
200✔
2961
            realm->commit_transaction();
2✔
2962

1✔
2963
            wait_for_upload(*realm);
2✔
2964
            wait_for_download(*realm);
2✔
2965

1✔
2966
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2967
            REQUIRE(table->size() == 0);
2!
2968

1✔
2969
            auto documents = get_documents("Asymmetric", 100);
2✔
2970
            for (int i = 0; i < 100; ++i) {
202✔
2971
                check_document(documents, obj_ids[i], {{"location", util::format("bar_%1", i)}});
200✔
2972
            }
200✔
2973
        });
2✔
2974
    }
2✔
2975

11✔
2976
    SECTION("open with schema mismatch on IsAsymmetric") {
22✔
2977
        auto schema = server_schema.schema;
2✔
2978
        schema.find("Asymmetric")->table_type = ObjectSchema::ObjectType::TopLevel;
2✔
2979

1✔
2980
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
2✔
2981
            SyncTestFile config(user, schema, SyncConfig::FLXSyncEnabled{});
2✔
2982
            auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
2983
            auto error_count = 0;
2✔
2984
            auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
2985
                                &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
2986
                ++error_count;
4✔
2987
                if (error_count == 1) {
4✔
2988
                    // Bad changeset detected by the client.
1✔
2989
                    CHECK(err.status == ErrorCodes::BadChangeset);
2!
2990
                }
2✔
2991
                else if (error_count == 2) {
2✔
2992
                    // Server asking for a client reset.
1✔
2993
                    CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
2994
                    CHECK(err.is_client_reset_requested());
2!
2995
                    promise.get_promise().emplace_value(std::move(err));
2✔
2996
                }
2✔
2997
            };
4✔
2998

1✔
2999
            config.sync_config->error_handler = err_handler;
2✔
3000
            auto realm = Realm::get_shared_realm(config);
2✔
3001

1✔
3002
            auto err = error_future.get();
2✔
3003
            CHECK(error_count == 2);
2!
3004
        });
2✔
3005
    }
2✔
3006

11✔
3007
    SECTION("basic embedded object construction") {
22✔
3008
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3009
            auto obj_id = ObjectId::gen();
2✔
3010
            realm->begin_transaction();
2✔
3011
            CppContext c(realm);
2✔
3012
            Object::create(c, realm, "Asymmetric",
2✔
3013
                           std::any(AnyDict{
2✔
3014
                               {"_id", obj_id},
2✔
3015
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
2✔
3016
                           }));
2✔
3017
            realm->commit_transaction();
2✔
3018
            wait_for_upload(*realm);
2✔
3019

1✔
3020
            auto documents = get_documents("Asymmetric", 1);
2✔
3021
            check_document(documents, obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
2✔
3022
        });
2✔
3023

1✔
3024
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3025
            wait_for_download(*realm);
2✔
3026

1✔
3027
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3028
            REQUIRE(table->size() == 0);
2!
3029
        });
2✔
3030
    }
2✔
3031

11✔
3032
    SECTION("replace embedded object") {
22✔
3033
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3034
            CppContext c(realm);
2✔
3035
            auto foo_obj_id = ObjectId::gen();
2✔
3036

1✔
3037
            realm->begin_transaction();
2✔
3038
            Object::create(c, realm, "Asymmetric",
2✔
3039
                           std::any(AnyDict{
2✔
3040
                               {"_id", foo_obj_id},
2✔
3041
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
2✔
3042
                           }));
2✔
3043
            realm->commit_transaction();
2✔
3044

1✔
3045
            // Update embedded field to `null`. The server discards this write
1✔
3046
            // as asymmetric sync can only create new objects.
1✔
3047
            realm->begin_transaction();
2✔
3048
            Object::create(c, realm, "Asymmetric",
2✔
3049
                           std::any(AnyDict{
2✔
3050
                               {"_id", foo_obj_id},
2✔
3051
                               {"embedded obj", std::any()},
2✔
3052
                           }));
2✔
3053
            realm->commit_transaction();
2✔
3054

1✔
3055
            // create a second object so that we can know when the translator
1✔
3056
            // has processed everything
1✔
3057
            realm->begin_transaction();
2✔
3058
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", ObjectId::gen()}, {}}));
2✔
3059
            realm->commit_transaction();
2✔
3060

1✔
3061
            wait_for_upload(*realm);
2✔
3062
            wait_for_download(*realm);
2✔
3063

1✔
3064
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3065
            REQUIRE(table->size() == 0);
2!
3066

1✔
3067
            auto documents = get_documents("Asymmetric", 2);
2✔
3068
            check_document(documents, foo_obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
2✔
3069
        });
2✔
3070
    }
2✔
3071

11✔
3072
    SECTION("embedded collections") {
22✔
3073
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3074
            CppContext c(realm);
2✔
3075
            auto obj_id = ObjectId::gen();
2✔
3076

1✔
3077
            realm->begin_transaction();
2✔
3078
            Object::create(c, realm, "Asymmetric",
2✔
3079
                           std::any(AnyDict{
2✔
3080
                               {"_id", obj_id},
2✔
3081
                               {"embedded list", AnyVector{AnyDict{{"value", "foo"s}}, AnyDict{{"value", "bar"s}}}},
2✔
3082
                               {"embedded dictionary",
2✔
3083
                                AnyDict{
2✔
3084
                                    {"key1", AnyDict{{"value", "foo"s}}},
2✔
3085
                                    {"key2", AnyDict{{"value", "bar"s}}},
2✔
3086
                                }},
2✔
3087
                           }));
2✔
3088
            realm->commit_transaction();
2✔
3089

1✔
3090
            auto documents = get_documents("Asymmetric", 1);
2✔
3091
            check_document(
2✔
3092
                documents, obj_id,
2✔
3093
                {
2✔
3094
                    {"embedded list", BsonArray{BsonDocument{{"value", "foo"}}, BsonDocument{{"value", "bar"}}}},
2✔
3095
                    {"embedded dictionary",
2✔
3096
                     BsonDocument{
2✔
3097
                         {"key1", BsonDocument{{"value", "foo"}}},
2✔
3098
                         {"key2", BsonDocument{{"value", "bar"}}},
2✔
3099
                     }},
2✔
3100
                });
2✔
3101
        });
2✔
3102
    }
2✔
3103

11✔
3104
    SECTION("asymmetric table not allowed in PBS") {
22✔
3105
        Schema schema{
2✔
3106
            {"Asymmetric2",
2✔
3107
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3108
             {
2✔
3109
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
3110
                 {"location", PropertyType::Int},
2✔
3111
                 {"reading", PropertyType::Int},
2✔
3112
             }},
2✔
3113
        };
2✔
3114

1✔
3115
        SyncTestFile config(harness->app(), Bson{}, schema);
2✔
3116
        REQUIRE_EXCEPTION(
2✔
3117
            Realm::get_shared_realm(config), SchemaValidationFailed,
2✔
3118
            Catch::Matchers::ContainsSubstring("Asymmetric table 'Asymmetric2' not allowed in partition based sync"));
2✔
3119
    }
2✔
3120

11✔
3121
    SECTION("links to top-level objects") {
22✔
3122
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3123
            subscribe_to_all_and_bootstrap(*realm);
2✔
3124

1✔
3125
            ObjectId obj_id = ObjectId::gen();
2✔
3126
            std::array<ObjectId, 5> target_obj_ids;
2✔
3127
            for (auto& id : target_obj_ids) {
10✔
3128
                id = ObjectId::gen();
10✔
3129
            }
10✔
3130

1✔
3131
            realm->begin_transaction();
2✔
3132
            CppContext c(realm);
2✔
3133
            Object::create(c, realm, "Asymmetric",
2✔
3134
                           std::any(AnyDict{
2✔
3135
                               {"_id", obj_id},
2✔
3136
                               {"link obj", AnyDict{{"_id", target_obj_ids[0]}, {"value", INT64_C(10)}}},
2✔
3137
                               {"link list",
2✔
3138
                                AnyVector{
2✔
3139
                                    AnyDict{{"_id", target_obj_ids[1]}, {"value", INT64_C(11)}},
2✔
3140
                                    AnyDict{{"_id", target_obj_ids[2]}, {"value", INT64_C(12)}},
2✔
3141
                                }},
2✔
3142
                               {"link dictionary",
2✔
3143
                                AnyDict{
2✔
3144
                                    {"key1", AnyDict{{"_id", target_obj_ids[3]}, {"value", INT64_C(13)}}},
2✔
3145
                                    {"key2", AnyDict{{"_id", target_obj_ids[4]}, {"value", INT64_C(14)}}},
2✔
3146
                                }},
2✔
3147
                           }));
2✔
3148
            realm->commit_transaction();
2✔
3149
            wait_for_upload(*realm);
2✔
3150

1✔
3151
            auto docs1 = get_documents("Asymmetric", 1);
2✔
3152
            check_document(docs1, obj_id,
2✔
3153
                           {{"link obj", target_obj_ids[0]},
2✔
3154
                            {"link list", BsonArray{{target_obj_ids[1], target_obj_ids[2]}}},
2✔
3155
                            {
2✔
3156
                                "link dictionary",
2✔
3157
                                BsonDocument{
2✔
3158
                                    {"key1", target_obj_ids[3]},
2✔
3159
                                    {"key2", target_obj_ids[4]},
2✔
3160
                                },
2✔
3161
                            }});
2✔
3162

1✔
3163
            auto docs2 = get_documents("TopLevel", 5);
2✔
3164
            for (int64_t i = 0; i < 5; ++i) {
12✔
3165
                check_document(docs2, target_obj_ids[i], {{"value", 10 + i}});
10✔
3166
            }
10✔
3167
        });
2✔
3168
    }
2✔
3169

11✔
3170
    // Add any new test sections above this point
11✔
3171

11✔
3172
    SECTION("teardown") {
22✔
3173
        harness.reset();
2✔
3174
    }
2✔
3175
}
22✔
3176

3177
TEST_CASE("flx: data ingest - dev mode", "[sync][flx][data ingest][baas]") {
2✔
3178
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
3179
    server_schema.dev_mode_enabled = true;
2✔
3180
    server_schema.schema = Schema{};
2✔
3181

1✔
3182
    auto schema = Schema{{"Asymmetric",
2✔
3183
                          ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3184
                          {
2✔
3185
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3186
                              {"location", PropertyType::String | PropertyType::Nullable},
2✔
3187
                          }},
2✔
3188
                         {"TopLevel",
2✔
3189
                          {
2✔
3190
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3191
                              {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
3192
                          }}};
2✔
3193

1✔
3194
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
2✔
3195

1✔
3196
    auto foo_obj_id = ObjectId::gen();
2✔
3197
    auto bar_obj_id = ObjectId::gen();
2✔
3198

1✔
3199
    harness.do_with_new_realm(
2✔
3200
        [&](SharedRealm realm) {
2✔
3201
            CppContext c(realm);
2✔
3202
            realm->begin_transaction();
2✔
3203
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
3204
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
3205
            realm->commit_transaction();
2✔
3206

1✔
3207
            auto docs = harness.session().get_documents(*realm->config().sync_config->user, "Asymmetric", 2);
2✔
3208
            check_document(docs, foo_obj_id, {{"location", "foo"}});
2✔
3209
            check_document(docs, bar_obj_id, {{"location", "bar"}});
2✔
3210
        },
2✔
3211
        schema);
2✔
3212
}
2✔
3213

3214
TEST_CASE("flx: data ingest - write not allowed", "[sync][flx][data ingest][baas]") {
2✔
3215
    AppCreateConfig::ServiceRole role;
2✔
3216
    role.name = "asymmetric_write_perms";
2✔
3217

1✔
3218
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
3219
    doc_filters.read = true;
2✔
3220
    doc_filters.write = false;
2✔
3221
    role.document_filters = doc_filters;
2✔
3222

1✔
3223
    role.insert_filter = true;
2✔
3224
    role.delete_filter = true;
2✔
3225
    role.read = true;
2✔
3226
    role.write = true;
2✔
3227

1✔
3228
    Schema schema({
2✔
3229
        {"Asymmetric",
2✔
3230
         ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3231
         {
2✔
3232
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3233
             {"location", PropertyType::String | PropertyType::Nullable},
2✔
3234
             {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
2✔
3235
         }},
2✔
3236
        {"Embedded",
2✔
3237
         ObjectSchema::ObjectType::Embedded,
2✔
3238
         {
2✔
3239
             {"value", PropertyType::String | PropertyType::Nullable},
2✔
3240
         }},
2✔
3241
    });
2✔
3242
    FLXSyncTestHarness::ServerSchema server_schema{schema, {}, {role}};
2✔
3243
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
2✔
3244

1✔
3245
    auto error_received_pf = util::make_promise_future<void>();
2✔
3246
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3247
    config.sync_config->on_sync_client_event_hook =
2✔
3248
        [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
2✔
3249
            std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
16✔
3250
            if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
16✔
3251
                return SyncClientHookAction::NoAction;
12✔
3252
            }
12✔
3253
            auto session = weak_session.lock();
4✔
3254
            REQUIRE(session);
4!
3255

2✔
3256
            auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
4✔
3257

2✔
3258
            if (error_code == sync::ProtocolError::initial_sync_not_completed) {
4✔
3259
                return SyncClientHookAction::NoAction;
2✔
3260
            }
2✔
3261

1✔
3262
            REQUIRE(error_code == sync::ProtocolError::write_not_allowed);
2!
3263
            REQUIRE_FALSE(data.error_info->compensating_write_server_version.has_value());
2!
3264
            REQUIRE_FALSE(data.error_info->compensating_writes.empty());
2!
3265
            promise.get_promise().emplace_value();
2✔
3266

1✔
3267
            return SyncClientHookAction::EarlyReturn;
2✔
3268
        };
2✔
3269

1✔
3270
    auto realm = Realm::get_shared_realm(config);
2✔
3271

1✔
3272
    // Create an asymmetric object and upload it to the server.
1✔
3273
    {
2✔
3274
        realm->begin_transaction();
2✔
3275
        CppContext c(realm);
2✔
3276
        Object::create(c, realm, "Asymmetric",
2✔
3277
                       std::any(AnyDict{{"_id", ObjectId::gen()}, {"embedded_obj", AnyDict{{"value", "foo"s}}}}));
2✔
3278
        realm->commit_transaction();
2✔
3279
        wait_for_upload(*realm);
2✔
3280
    }
2✔
3281

1✔
3282
    error_received_pf.future.get();
2✔
3283
    realm->close();
2✔
3284
}
2✔
3285

3286
TEST_CASE("flx: send client error", "[sync][flx][baas]") {
2✔
3287
    FLXSyncTestHarness harness("flx_client_error");
2✔
3288

1✔
3289
    // An integration error is simulated while bootstrapping.
1✔
3290
    // This results in the client sending an error message to the server.
1✔
3291
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3292
    config.sync_config->simulate_integration_error = true;
2✔
3293

1✔
3294
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
3295
    auto error_count = 0;
2✔
3296
    auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
3297
                        &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
3298
        ++error_count;
4✔
3299
        if (error_count == 1) {
4✔
3300
            // Bad changeset detected by the client.
1✔
3301
            CHECK(err.status == ErrorCodes::BadChangeset);
2!
3302
        }
2✔
3303
        else if (error_count == 2) {
2✔
3304
            // Server asking for a client reset.
1✔
3305
            CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
3306
            CHECK(err.is_client_reset_requested());
2!
3307
            promise.get_promise().emplace_value(std::move(err));
2✔
3308
        }
2✔
3309
    };
4✔
3310

1✔
3311
    config.sync_config->error_handler = err_handler;
2✔
3312
    auto realm = Realm::get_shared_realm(config);
2✔
3313
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3314
    auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3315
    new_query.insert_or_assign(Query(table));
2✔
3316
    new_query.commit();
2✔
3317

1✔
3318
    auto err = error_future.get();
2✔
3319
    CHECK(error_count == 2);
2!
3320
}
2✔
3321

3322
TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") {
6✔
3323
    FLXSyncTestHarness harness("bootstrap_full_sync");
6✔
3324

3✔
3325
    auto setup_subs = [](SharedRealm& realm) {
12✔
3326
        auto table = realm->read_group().get_table("class_TopLevel");
12✔
3327
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
12✔
3328
        new_query.clear();
12✔
3329
        auto col = table->get_column_key("queryable_str_field");
12✔
3330
        new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
12✔
3331
        return new_query.commit();
12✔
3332
    };
12✔
3333

3✔
3334
    auto bar_obj_id = ObjectId::gen();
6✔
3335
    auto bizz_obj_id = ObjectId::gen();
6✔
3336
    auto setup_and_poison_cache = [&] {
6✔
3337
        harness.load_initial_data([&](SharedRealm realm) {
6✔
3338
            CppContext c(realm);
6✔
3339
            Object::create(c, realm, "TopLevel",
6✔
3340
                           std::any(AnyDict{{"_id", bar_obj_id},
6✔
3341
                                            {"queryable_str_field", std::string{"bar"}},
6✔
3342
                                            {"queryable_int_field", static_cast<int64_t>(10)},
6✔
3343
                                            {"non_queryable_field", std::string{"non queryable 2"}}}));
6✔
3344
        });
6✔
3345

3✔
3346
        harness.do_with_new_realm([&](SharedRealm realm) {
6✔
3347
            // first set a subscription to force the creation/caching of a broker snapshot on the server.
3✔
3348
            setup_subs(realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
6✔
3349
            wait_for_advance(*realm);
6✔
3350
            auto table = realm->read_group().get_table("class_TopLevel");
6✔
3351
            REQUIRE(table->find_primary_key(bar_obj_id));
6!
3352

3✔
3353
            // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
3✔
3354
            // wait for a MARK message to come back, we'd miss it in our results.
3✔
3355
            CppContext c(realm);
6✔
3356
            realm->begin_transaction();
6✔
3357
            Object::create(c, realm, "TopLevel",
6✔
3358
                           std::any(AnyDict{{"_id", bizz_obj_id},
6✔
3359
                                            {"queryable_str_field", std::string{"bizz"}},
6✔
3360
                                            {"queryable_int_field", static_cast<int64_t>(15)},
6✔
3361
                                            {"non_queryable_field", std::string{"non queryable 3"}}}));
6✔
3362
            realm->commit_transaction();
6✔
3363
            wait_for_upload(*realm);
6✔
3364
        });
6✔
3365
    };
6✔
3366

3✔
3367
    SECTION("regular subscription change") {
6✔
3368
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3369
        std::atomic<bool> saw_truncated_bootstrap{false};
2✔
3370
        triggered_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
2✔
3371
                                                                      const SyncClientHookData& data) {
27✔
3372
            auto sess = weak_sess.lock();
27✔
3373
            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
27✔
3374
                return SyncClientHookAction::NoAction;
25✔
3375
            }
25✔
3376

1✔
3377
            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3378
            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3379
            REQUIRE(data.num_changesets == 1);
2!
3380
            auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3381
            auto read_tr = db->start_read();
2✔
3382
            auto table = read_tr->get_table("class_TopLevel");
2✔
3383
            REQUIRE(table->find_primary_key(bar_obj_id));
2!
3384
            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3385
            saw_truncated_bootstrap.store(true);
2✔
3386

1✔
3387
            return SyncClientHookAction::NoAction;
2✔
3388
        };
2✔
3389
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3390

1✔
3391
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3392
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3393
        // the changes we're about to create.
1✔
3394
        wait_for_upload(*problem_realm);
2✔
3395
        wait_for_download(*problem_realm);
2✔
3396

1✔
3397
        nlohmann::json command_request = {
2✔
3398
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3399
        };
2✔
3400
        auto resp_body =
2✔
3401
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3402
                .get();
2✔
3403
        REQUIRE(resp_body == "{}");
2!
3404

1✔
3405
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3406
        setup_and_poison_cache();
2✔
3407

1✔
3408
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3409
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3410
        // processed.
1✔
3411
        setup_subs(problem_realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3412
        wait_for_advance(*problem_realm);
2✔
3413

1✔
3414
        REQUIRE(saw_truncated_bootstrap.load());
2!
3415
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3416
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3417
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3418
    }
2✔
3419

3✔
3420
// TODO: remote-baas: This test fails intermittently with Windows remote baas server - to be fixed in RCORE-1674
3✔
3421
#ifndef _WIN32
6✔
3422
    SECTION("disconnect between bootstrap and mark") {
6✔
3423
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3424
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
3425
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3426
            [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &bizz_obj_id,
2✔
3427
             &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
26✔
3428
                auto sess = weak_sess.lock();
26✔
3429
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
26✔
3430
                    return SyncClientHookAction::NoAction;
24✔
3431
                }
24✔
3432

1✔
3433
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3434
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3435
                REQUIRE(data.num_changesets == 1);
2!
3436
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3437
                auto read_tr = db->start_read();
2✔
3438
                auto table = read_tr->get_table("class_TopLevel");
2✔
3439
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3440
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3441

1✔
3442
                sess->pause();
2✔
3443
                promise.get_promise().emplace_value();
2✔
3444
                return SyncClientHookAction::NoAction;
2✔
3445
            };
2✔
3446
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3447

1✔
3448
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3449
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3450
        // the changes we're about to create.
1✔
3451
        wait_for_upload(*problem_realm);
2✔
3452
        wait_for_download(*problem_realm);
2✔
3453

1✔
3454
        nlohmann::json command_request = {
2✔
3455
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3456
        };
2✔
3457
        auto resp_body =
2✔
3458
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3459
                .get();
2✔
3460
        REQUIRE(resp_body == "{}");
2!
3461

1✔
3462
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3463
        setup_and_poison_cache();
2✔
3464

1✔
3465
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3466
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3467
        // processed.
1✔
3468
        auto sub_set = setup_subs(problem_realm);
2✔
3469
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3470

1✔
3471
        interrupted.get();
2✔
3472
        problem_realm->sync_session()->shutdown_and_wait();
2✔
3473
        REQUIRE(!sub_complete_future.is_ready());
2!
3474
        sub_set.refresh();
2✔
3475
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3476

1✔
3477
        problem_realm->sync_session()->resume();
2✔
3478
        sub_complete_future.get();
2✔
3479
        wait_for_advance(*problem_realm);
2✔
3480

1✔
3481
        sub_set.refresh();
2✔
3482
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3483
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3484
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3485
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3486
    }
2✔
3487
#endif
6✔
3488
    SECTION("error/suspend between bootstrap and mark") {
6✔
3489
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3490
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3491
            [&bizz_obj_id, &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) {
27✔
3492
                auto sess = weak_sess.lock();
27✔
3493
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
27✔
3494
                    return SyncClientHookAction::NoAction;
25✔
3495
                }
25✔
3496

1✔
3497
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3498
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3499
                REQUIRE(data.num_changesets == 1);
2!
3500
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3501
                auto read_tr = db->start_read();
2✔
3502
                auto table = read_tr->get_table("class_TopLevel");
2✔
3503
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3504
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3505

1✔
3506
                return SyncClientHookAction::TriggerReconnect;
2✔
3507
            };
2✔
3508
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3509

1✔
3510
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3511
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3512
        // the changes we're about to create.
1✔
3513
        wait_for_upload(*problem_realm);
2✔
3514
        wait_for_download(*problem_realm);
2✔
3515

1✔
3516
        nlohmann::json command_request = {
2✔
3517
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3518
        };
2✔
3519
        auto resp_body =
2✔
3520
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3521
                .get();
2✔
3522
        REQUIRE(resp_body == "{}");
2!
3523

1✔
3524
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3525
        setup_and_poison_cache();
2✔
3526

1✔
3527
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3528
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3529
        // processed.
1✔
3530
        auto sub_set = setup_subs(problem_realm);
2✔
3531
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3532

1✔
3533
        sub_complete_future.get();
2✔
3534
        wait_for_advance(*problem_realm);
2✔
3535

1✔
3536
        sub_set.refresh();
2✔
3537
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3538
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3539
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3540
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3541
    }
2✔
3542
}
6✔
3543

3544
TEST_CASE("flx: convert flx sync realm to bundled realm", "[app][flx][baas]") {
12✔
3545
    static auto foo_obj_id = ObjectId::gen();
12✔
3546
    static auto bar_obj_id = ObjectId::gen();
12✔
3547
    static auto bizz_obj_id = ObjectId::gen();
12✔
3548
    static std::optional<FLXSyncTestHarness> harness;
12✔
3549
    if (!harness) {
12✔
3550
        harness.emplace("bundled_flx_realms");
2✔
3551
        harness->load_initial_data([&](SharedRealm realm) {
2✔
3552
            CppContext c(realm);
2✔
3553
            Object::create(c, realm, "TopLevel",
2✔
3554
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
3555
                                            {"queryable_str_field", "foo"s},
2✔
3556
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3557
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
3558
            Object::create(c, realm, "TopLevel",
2✔
3559
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
3560
                                            {"queryable_str_field", "bar"s},
2✔
3561
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
3562
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
3563
        });
2✔
3564
    }
2✔
3565

6✔
3566
    SECTION("flx to flx (should succeed)") {
12✔
3567
        create_user_and_log_in(harness->app());
2✔
3568
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3569
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3570
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3571
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3572
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3573
            auto subs = std::move(mut_subs).commit();
2✔
3574

1✔
3575
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3576
            wait_for_advance(*realm);
2✔
3577

1✔
3578
            realm->convert(target_config);
2✔
3579
        });
2✔
3580

1✔
3581
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3582

1✔
3583
        target_realm->begin_transaction();
2✔
3584
        CppContext c(target_realm);
2✔
3585
        Object::create(c, target_realm, "TopLevel",
2✔
3586
                       std::any(AnyDict{{"_id", bizz_obj_id},
2✔
3587
                                        {"queryable_str_field", "bizz"s},
2✔
3588
                                        {"queryable_int_field", static_cast<int64_t>(15)},
2✔
3589
                                        {"non_queryable_field", "non queryable 3"s}}));
2✔
3590
        target_realm->commit_transaction();
2✔
3591

1✔
3592
        wait_for_upload(*target_realm);
2✔
3593
        wait_for_download(*target_realm);
2✔
3594

1✔
3595
        auto latest_subs = target_realm->get_active_subscription_set();
2✔
3596
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3597
        REQUIRE(latest_subs.size() == 1);
2!
3598
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
3599
        REQUIRE(latest_subs.at(0).query_string ==
2!
3600
                Query(table).greater(table->get_column_key("queryable_int_field"), 5).get_description());
2✔
3601

1✔
3602
        REQUIRE(table->size() == 2);
2!
3603
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3604
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3605
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3606
    }
2✔
3607

6✔
3608
    SECTION("flx to local (should succeed)") {
12✔
3609
        TestFile target_config;
2✔
3610

1✔
3611
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3612
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3613
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3614
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3615
            auto subs = std::move(mut_subs).commit();
2✔
3616

1✔
3617
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3618
            wait_for_advance(*realm);
2✔
3619

1✔
3620
            target_config.schema = realm->schema();
2✔
3621
            target_config.schema_version = realm->schema_version();
2✔
3622
            realm->convert(target_config);
2✔
3623
        });
2✔
3624

1✔
3625
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3626
        REQUIRE_THROWS(target_realm->get_active_subscription_set());
2✔
3627

1✔
3628
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3629
        REQUIRE(table->size() == 2);
2!
3630
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3631
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3632
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3633
    }
2✔
3634

6✔
3635
    SECTION("flx to pbs (should fail to convert)") {
12✔
3636
        create_user_and_log_in(harness->app());
2✔
3637
        SyncTestFile target_config(harness->app()->current_user(), "12345"s, harness->schema());
2✔
3638
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3639
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3640
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3641
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3642
            auto subs = std::move(mut_subs).commit();
2✔
3643

1✔
3644
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3645
            wait_for_advance(*realm);
2✔
3646

1✔
3647
            REQUIRE_THROWS(realm->convert(target_config));
2✔
3648
        });
2✔
3649
    }
2✔
3650

6✔
3651
    SECTION("pbs to flx (should fail to convert)") {
12✔
3652
        create_user_and_log_in(harness->app());
2✔
3653
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3654

1✔
3655
        auto pbs_app_config = minimal_app_config(harness->app()->base_url(), "pbs_to_flx_convert", harness->schema());
2✔
3656

1✔
3657
        TestAppSession pbs_app_session(create_app(pbs_app_config));
2✔
3658
        SyncTestFile source_config(pbs_app_session.app()->current_user(), "54321"s, pbs_app_config.schema);
2✔
3659
        auto realm = Realm::get_shared_realm(source_config);
2✔
3660

1✔
3661
        realm->begin_transaction();
2✔
3662
        CppContext c(realm);
2✔
3663
        Object::create(c, realm, "TopLevel",
2✔
3664
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3665
                                        {"queryable_str_field", "foo"s},
2✔
3666
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3667
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3668
        realm->commit_transaction();
2✔
3669

1✔
3670
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3671
    }
2✔
3672

6✔
3673
    SECTION("local to flx (should fail to convert)") {
12✔
3674
        TestFile source_config;
2✔
3675
        source_config.schema = harness->schema();
2✔
3676
        source_config.schema_version = 1;
2✔
3677

1✔
3678
        auto realm = Realm::get_shared_realm(source_config);
2✔
3679
        auto foo_obj_id = ObjectId::gen();
2✔
3680

1✔
3681
        realm->begin_transaction();
2✔
3682
        CppContext c(realm);
2✔
3683
        Object::create(c, realm, "TopLevel",
2✔
3684
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3685
                                        {"queryable_str_field", "foo"s},
2✔
3686
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3687
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3688
        realm->commit_transaction();
2✔
3689

1✔
3690
        create_user_and_log_in(harness->app());
2✔
3691
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3692

1✔
3693
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3694
    }
2✔
3695

6✔
3696
    // Add new sections before this
6✔
3697
    SECTION("teardown") {
12✔
3698
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
3699
        harness.reset();
2✔
3700
    }
2✔
3701
}
12✔
3702

3703
TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][flx][compensating write][baas]") {
2✔
3704
    AppCreateConfig::ServiceRole role;
2✔
3705
    role.name = "compensating_write_perms";
2✔
3706

1✔
3707
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
3708
    doc_filters.read = true;
2✔
3709
    doc_filters.write =
2✔
3710
        nlohmann::json{{"queryable_str_field", nlohmann::json{{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
3711
    role.document_filters = doc_filters;
2✔
3712

1✔
3713
    role.insert_filter = true;
2✔
3714
    role.delete_filter = true;
2✔
3715
    role.read = true;
2✔
3716
    role.write = true;
2✔
3717
    FLXSyncTestHarness::ServerSchema server_schema{
2✔
3718
        g_simple_embedded_obj_schema, {"queryable_str_field", "queryable_int_field"}, {role}};
2✔
3719
    FLXSyncTestHarness::Config harness_config("flx_bad_query", server_schema);
2✔
3720
    harness_config.reconnect_mode = ReconnectMode::testing;
2✔
3721
    FLXSyncTestHarness harness(std::move(harness_config));
2✔
3722

1✔
3723
    auto test_obj_id_1 = ObjectId::gen();
2✔
3724
    auto test_obj_id_2 = ObjectId::gen();
2✔
3725

1✔
3726
    create_user_and_log_in(harness.app());
2✔
3727
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3728
    config.cache = false;
2✔
3729

1✔
3730
    {
2✔
3731
        auto error_received_pf = util::make_promise_future<void>();
2✔
3732
        config.sync_config->on_sync_client_event_hook =
2✔
3733
            [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
2✔
3734
                std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
25✔
3735
                if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
25✔
3736
                    return SyncClientHookAction::NoAction;
20✔
3737
                }
20✔
3738
                auto session = weak_session.lock();
5✔
3739
                REQUIRE(session);
5!
3740

3✔
3741
                auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
5✔
3742

3✔
3743
                if (error_code == sync::ProtocolError::initial_sync_not_completed) {
5✔
3744
                    return SyncClientHookAction::NoAction;
3✔
3745
                }
3✔
3746

1✔
3747
                REQUIRE(error_code == sync::ProtocolError::compensating_write);
2!
3748
                REQUIRE_FALSE(data.error_info->compensating_writes.empty());
2!
3749
                promise.get_promise().emplace_value();
2✔
3750

1✔
3751
                return SyncClientHookAction::TriggerReconnect;
2✔
3752
            };
2✔
3753

1✔
3754
        auto realm = Realm::get_shared_realm(config);
2✔
3755
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
3756
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
3757
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3758
        new_query.insert_or_assign(Query(table).equal(queryable_str_field, "bizz"));
2✔
3759
        std::move(new_query).commit();
2✔
3760

1✔
3761
        wait_for_upload(*realm);
2✔
3762
        wait_for_download(*realm);
2✔
3763

1✔
3764
        CppContext c(realm);
2✔
3765
        realm->begin_transaction();
2✔
3766
        Object::create(c, realm, "TopLevel",
2✔
3767
                       util::Any(AnyDict{
2✔
3768
                           {"_id", test_obj_id_1},
2✔
3769
                           {"queryable_str_field", std::string{"foo"}},
2✔
3770
                       }));
2✔
3771
        realm->commit_transaction();
2✔
3772

1✔
3773
        realm->begin_transaction();
2✔
3774
        Object::create(c, realm, "TopLevel",
2✔
3775
                       util::Any(AnyDict{
2✔
3776
                           {"_id", test_obj_id_2},
2✔
3777
                           {"queryable_str_field", std::string{"baz"}},
2✔
3778
                       }));
2✔
3779
        realm->commit_transaction();
2✔
3780

1✔
3781
        error_received_pf.future.get();
2✔
3782
        realm->sync_session()->shutdown_and_wait();
2✔
3783
        config.sync_config->on_sync_client_event_hook = {};
2✔
3784
    }
2✔
3785

1✔
3786
    _impl::RealmCoordinator::clear_all_caches();
2✔
3787

1✔
3788
    std::mutex errors_mutex;
2✔
3789
    std::condition_variable new_compensating_write;
2✔
3790
    std::vector<std::pair<ObjectId, sync::version_type>> error_to_download_version;
2✔
3791
    std::vector<sync::CompensatingWriteErrorInfo> compensating_writes;
2✔
3792
    sync::version_type download_version;
2✔
3793

1✔
3794
    config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
2✔
3795
                                                        const SyncClientHookData& data) mutable {
18✔
3796
        auto session = weak_session.lock();
18✔
3797
        if (!session) {
18✔
3798
            return SyncClientHookAction::NoAction;
×
3799
        }
×
3800

10✔
3801
        if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
18✔
3802
            if (data.event == SyncClientHookEvent::DownloadMessageReceived) {
14✔
3803
                download_version = data.progress.download.server_version;
7✔
3804
            }
7✔
3805

8✔
3806
            return SyncClientHookAction::NoAction;
14✔
3807
        }
14✔
3808

2✔
3809
        auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
4✔
3810
        REQUIRE(error_code == sync::ProtocolError::compensating_write);
4!
3811
        REQUIRE(!data.error_info->compensating_writes.empty());
4!
3812
        std::lock_guard<std::mutex> lk(errors_mutex);
4✔
3813
        for (const auto& compensating_write : data.error_info->compensating_writes) {
4✔
3814
            error_to_download_version.emplace_back(compensating_write.primary_key.get_object_id(),
4✔
3815
                                                   *data.error_info->compensating_write_server_version);
4✔
3816
        }
4✔
3817

2✔
3818
        return SyncClientHookAction::NoAction;
4✔
3819
    };
4✔
3820

1✔
3821
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) {
4✔
3822
        std::unique_lock<std::mutex> lk(errors_mutex);
4✔
3823
        REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
4!
3824
        for (const auto& compensating_write : error.compensating_writes_info) {
4✔
3825
            auto tracked_error = std::find_if(error_to_download_version.begin(), error_to_download_version.end(),
4✔
3826
                                              [&](const auto& pair) {
6✔
3827
                                                  return pair.first == compensating_write.primary_key.get_object_id();
6✔
3828
                                              });
6✔
3829
            REQUIRE(tracked_error != error_to_download_version.end());
4!
3830
            CHECK(tracked_error->second <= download_version);
4!
3831
            compensating_writes.push_back(compensating_write);
4✔
3832
        }
4✔
3833
        new_compensating_write.notify_one();
4✔
3834
    };
4✔
3835

1✔
3836
    auto realm = Realm::get_shared_realm(config);
2✔
3837

1✔
3838
    wait_for_upload(*realm);
2✔
3839
    wait_for_download(*realm);
2✔
3840

1✔
3841
    std::unique_lock<std::mutex> lk(errors_mutex);
2✔
3842
    new_compensating_write.wait_for(lk, std::chrono::seconds(30), [&] {
2✔
3843
        return compensating_writes.size() == 2;
2✔
3844
    });
2✔
3845

1✔
3846
    REQUIRE(compensating_writes.size() == 2);
2!
3847
    auto& write_info = compensating_writes[0];
2✔
3848
    CHECK(write_info.primary_key.is_type(type_ObjectId));
2!
3849
    CHECK(write_info.primary_key.get_object_id() == test_obj_id_1);
2!
3850
    CHECK(write_info.object_name == "TopLevel");
2!
3851
    CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring("object is outside of the current query view"));
2✔
3852

1✔
3853
    write_info = compensating_writes[1];
2✔
3854
    REQUIRE(write_info.primary_key.is_type(type_ObjectId));
2!
3855
    REQUIRE(write_info.primary_key.get_object_id() == test_obj_id_2);
2!
3856
    REQUIRE(write_info.object_name == "TopLevel");
2!
3857
    REQUIRE(write_info.reason == util::format("write to \"%1\" in table \"TopLevel\" not allowed", test_obj_id_2));
2!
3858
    auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
3859
    REQUIRE(top_level_table->is_empty());
2!
3860
}
2✔
3861

3862
TEST_CASE("flx: bootstrap changesets are applied continuously", "[sync][flx][bootstrap][baas]") {
2✔
3863
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
2✔
3864
    fill_large_array_schema(harness);
2✔
3865

1✔
3866
    std::unique_ptr<std::thread> th;
2✔
3867
    sync::version_type user_commit_version = UINT_FAST64_MAX;
2✔
3868
    sync::version_type bootstrap_version = UINT_FAST64_MAX;
2✔
3869
    SharedRealm realm;
2✔
3870
    std::condition_variable cv;
2✔
3871
    std::mutex mutex;
2✔
3872
    bool allow_to_commit = false;
2✔
3873

1✔
3874
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3875
    auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
3876
    auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
3877
    config.sync_config->on_sync_client_event_hook =
2✔
3878
        [promise = std::move(shared_promise), &th, &realm, &user_commit_version, &bootstrap_version, &cv, &mutex,
2✔
3879
         &allow_to_commit](std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) {
50✔
3880
            if (data.query_version == 0) {
50✔
3881
                return SyncClientHookAction::NoAction;
12✔
3882
            }
12✔
3883
            if (data.event != SyncClientHookEvent::DownloadMessageIntegrated) {
38✔
3884
                return SyncClientHookAction::NoAction;
26✔
3885
            }
26✔
3886
            auto session = weak_session.lock();
12✔
3887
            if (!session) {
12✔
3888
                return SyncClientHookAction::NoAction;
×
3889
            }
×
3890
            if (data.batch_state != sync::DownloadBatchState::MoreToCome) {
12✔
3891
                // Read version after bootstrap is done.
1✔
3892
                auto db = TestHelper::get_db(realm);
2✔
3893
                ReadTransaction rt(db);
2✔
3894
                bootstrap_version = rt.get_version();
2✔
3895
                {
2✔
3896
                    std::lock_guard<std::mutex> lock(mutex);
2✔
3897
                    allow_to_commit = true;
2✔
3898
                }
2✔
3899
                cv.notify_one();
2✔
3900
                session->force_close();
2✔
3901
                promise->emplace_value();
2✔
3902
                return SyncClientHookAction::NoAction;
2✔
3903
            }
2✔
3904

5✔
3905
            if (th) {
10✔
3906
                return SyncClientHookAction::NoAction;
8✔
3907
            }
8✔
3908

1✔
3909
            auto func = [&] {
2✔
3910
                // Attempt to commit a local change after the first bootstrap batch was committed.
1✔
3911
                auto db = TestHelper::get_db(realm);
2✔
3912
                WriteTransaction wt(db);
2✔
3913
                TableRef table = wt.get_table("class_TopLevel");
2✔
3914
                table->create_object_with_primary_key(ObjectId::gen());
2✔
3915
                {
2✔
3916
                    std::unique_lock<std::mutex> lock(mutex);
2✔
3917
                    // Wait to commit until we read the final bootstrap version.
1✔
3918
                    cv.wait(lock, [&] {
2✔
3919
                        return allow_to_commit;
2✔
3920
                    });
2✔
3921
                }
2✔
3922
                user_commit_version = wt.commit();
2✔
3923
            };
2✔
3924
            th = std::make_unique<std::thread>(std::move(func));
2✔
3925

1✔
3926
            return SyncClientHookAction::NoAction;
2✔
3927
        };
2✔
3928

1✔
3929
    realm = Realm::get_shared_realm(config);
2✔
3930
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3931
    Query query(table);
2✔
3932
    {
2✔
3933
        auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3934
        new_subs.insert_or_assign(query);
2✔
3935
        new_subs.commit();
2✔
3936
    }
2✔
3937
    interrupted.get();
2✔
3938
    th->join();
2✔
3939

1✔
3940
    // The user commit is the last one.
1✔
3941
    CHECK(user_commit_version == bootstrap_version + 1);
2!
3942
}
2✔
3943

3944
TEST_CASE("flx: open realm + register subscription callback while bootstrapping",
3945
          "[sync][flx][bootstrap][async open][baas]") {
12✔
3946
    FLXSyncTestHarness harness("flx_bootstrap_batching");
12✔
3947
    auto foo_obj_id = ObjectId::gen();
12✔
3948
    harness.load_initial_data([&](SharedRealm realm) {
12✔
3949
        CppContext c(realm);
12✔
3950
        Object::create(c, realm, "TopLevel",
12✔
3951
                       std::any(AnyDict{{"_id", foo_obj_id},
12✔
3952
                                        {"queryable_str_field", "foo"s},
12✔
3953
                                        {"queryable_int_field", static_cast<int64_t>(5)},
12✔
3954
                                        {"non_queryable_field", "created as initial data seed"s}}));
12✔
3955
    });
12✔
3956
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
12✔
3957

6✔
3958
    std::atomic<bool> subscription_invoked = false;
12✔
3959
    auto subscription_pf = util::make_promise_future<bool>();
12✔
3960
    // create a subscription to commit when realm is open for the first time or asked to rerun on open
6✔
3961
    auto init_subscription_callback_with_promise =
12✔
3962
        [&, promise_holder = util::CopyablePromiseHolder(std::move(subscription_pf.promise))](
12✔
3963
            std::shared_ptr<Realm> realm) mutable {
10✔
3964
            REQUIRE(realm);
8!
3965
            auto table = realm->read_group().get_table("class_TopLevel");
8✔
3966
            Query query(table);
8✔
3967
            auto subscription = realm->get_latest_subscription_set();
8✔
3968
            auto mutable_subscription = subscription.make_mutable_copy();
8✔
3969
            mutable_subscription.insert_or_assign(query);
8✔
3970
            auto promise = promise_holder.get_promise();
8✔
3971
            mutable_subscription.commit();
8✔
3972
            subscription_invoked = true;
8✔
3973
            promise.emplace_value(true);
8✔
3974
        };
8✔
3975
    // verify that the subscription has changed the database
6✔
3976
    auto verify_subscription = [](SharedRealm realm) {
12✔
3977
        REQUIRE(realm);
12!
3978
        auto table_ref = realm->read_group().get_table("class_TopLevel");
12✔
3979
        REQUIRE(table_ref);
12!
3980
        REQUIRE(table_ref->get_column_count() == 4);
12!
3981
        REQUIRE(table_ref->get_column_key("_id"));
12!
3982
        REQUIRE(table_ref->get_column_key("queryable_str_field"));
12!
3983
        REQUIRE(table_ref->get_column_key("queryable_int_field"));
12!
3984
        REQUIRE(table_ref->get_column_key("non_queryable_field"));
12!
3985
        REQUIRE(table_ref->size() == 1);
12!
3986
        auto str_col = table_ref->get_column_key("queryable_str_field");
12✔
3987
        REQUIRE(table_ref->get_object(0).get<String>(str_col) == "foo");
12!
3988
        return true;
12✔
3989
    };
12✔
3990

6✔
3991
    SECTION("Sync open") {
12✔
3992
        // sync open with subscription callback. Subscription will be run, since this is the first time that realm is
1✔
3993
        // opened
1✔
3994
        subscription_invoked = false;
2✔
3995
        config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
3996
        auto realm = Realm::get_shared_realm(config);
2✔
3997
        REQUIRE(subscription_pf.future.get());
2!
3998
        auto sb = realm->get_latest_subscription_set();
2✔
3999
        auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4000
        auto state = future.get();
2✔
4001
        REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4002
        realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4003
        REQUIRE(verify_subscription(realm));
2!
4004
    }
2✔
4005

6✔
4006
    SECTION("Sync Open + Async Open") {
12✔
4007
        {
2✔
4008
            subscription_invoked = false;
2✔
4009
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
4010
            auto realm = Realm::get_shared_realm(config);
2✔
4011
            REQUIRE(subscription_pf.future.get());
2!
4012
            auto sb = realm->get_latest_subscription_set();
2✔
4013
            auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4014
            auto state = future.get();
2✔
4015
            REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4016
            realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4017
            REQUIRE(verify_subscription(realm));
2!
4018
        }
2✔
4019
        {
2✔
4020
            auto subscription_pf_async = util::make_promise_future<bool>();
2✔
4021
            auto init_subscription_asyc_callback =
2✔
4022
                [promise_holder_async = util::CopyablePromiseHolder(std::move(subscription_pf_async.promise))](
2✔
4023
                    std::shared_ptr<Realm> realm) mutable {
2✔
4024
                    REQUIRE(realm);
2!
4025
                    auto table = realm->read_group().get_table("class_TopLevel");
2✔
4026
                    Query query(table);
2✔
4027
                    auto subscription = realm->get_latest_subscription_set();
2✔
4028
                    auto mutable_subscription = subscription.make_mutable_copy();
2✔
4029
                    mutable_subscription.insert_or_assign(query);
2✔
4030
                    auto promise = promise_holder_async.get_promise();
2✔
4031
                    mutable_subscription.commit();
2✔
4032
                    promise.emplace_value(true);
2✔
4033
                };
2✔
4034
            auto open_realm_pf = util::make_promise_future<bool>();
2✔
4035
            auto open_realm_completed_callback =
2✔
4036
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4037
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
2✔
4038
                    auto promise = promise_holder.get_promise();
2✔
4039
                    if (err)
2✔
4040
                        promise.emplace_value(false);
×
4041
                    else
2✔
4042
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
2✔
4043
                };
2✔
4044

1✔
4045
            config.sync_config->subscription_initializer = init_subscription_asyc_callback;
2✔
4046
            config.sync_config->rerun_init_subscription_on_open = true;
2✔
4047
            auto async_open = Realm::get_synchronized_realm(config);
2✔
4048
            async_open->start(open_realm_completed_callback);
2✔
4049
            REQUIRE(open_realm_pf.future.get());
2!
4050
            REQUIRE(subscription_pf_async.future.get());
2!
4051
            config.sync_config->rerun_init_subscription_on_open = false;
2✔
4052
            auto realm = Realm::get_shared_realm(config);
2✔
4053
            REQUIRE(realm->get_latest_subscription_set().version() == 2);
2!
4054
            REQUIRE(realm->get_active_subscription_set().version() == 2);
2!
4055
        }
2✔
4056
    }
2✔
4057

6✔
4058
    SECTION("Async open") {
12✔
4059
        SECTION("Initial async open with no rerun on open set") {
8✔
4060
            // subscription will be run since this is the first time we are opening the realm file.
2✔
4061
            auto open_realm_pf = util::make_promise_future<bool>();
4✔
4062
            auto open_realm_completed_callback =
4✔
4063
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
4✔
4064
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4065
                    auto promise = promise_holder.get_promise();
4✔
4066
                    if (err)
4✔
4067
                        promise.emplace_value(false);
×
4068
                    else
4✔
4069
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
4✔
4070
                };
4✔
4071

2✔
4072
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
4✔
4073
            auto async_open = Realm::get_synchronized_realm(config);
4✔
4074
            async_open->start(open_realm_completed_callback);
4✔
4075
            REQUIRE(open_realm_pf.future.get());
4!
4076
            REQUIRE(subscription_pf.future.get());
4!
4077

2✔
4078
            SECTION("rerun on open = false. Subscription not run") {
4✔
4079
                subscription_invoked = false;
2✔
4080
                auto async_open = Realm::get_synchronized_realm(config);
2✔
4081
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
4082
                auto open_realm_completed_callback =
2✔
4083
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4084
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
4085
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
4086
                        // below
1✔
4087
                        promise_holder.get_promise().emplace_value(true);
2✔
4088
                    };
2✔
4089
                async_open->start(open_realm_completed_callback);
2✔
4090
                REQUIRE(open_realm_pf.future.get());
2!
4091
                REQUIRE_FALSE(subscription_invoked.load());
2!
4092
            }
2✔
4093

2✔
4094
            SECTION("rerun on open = true. Subscription not run cause realm already opened once") {
4✔
4095
                subscription_invoked = false;
2✔
4096
                auto realm = Realm::get_shared_realm(config);
2✔
4097
                auto init_subscription = [&subscription_invoked](std::shared_ptr<Realm> realm) mutable {
1✔
4098
                    REQUIRE(realm);
×
4099
                    auto table = realm->read_group().get_table("class_TopLevel");
×
4100
                    Query query(table);
×
4101
                    auto subscription = realm->get_latest_subscription_set();
×
4102
                    auto mutable_subscription = subscription.make_mutable_copy();
×
4103
                    mutable_subscription.insert_or_assign(query);
×
4104
                    mutable_subscription.commit();
×
4105
                    subscription_invoked.store(true);
×
4106
                };
×
4107
                config.sync_config->rerun_init_subscription_on_open = true;
2✔
4108
                config.sync_config->subscription_initializer = init_subscription;
2✔
4109
                auto async_open = Realm::get_synchronized_realm(config);
2✔
4110
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
4111
                auto open_realm_completed_callback =
2✔
4112
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4113
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
4114
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
4115
                        // below
1✔
4116
                        promise_holder.get_promise().emplace_value(true);
2✔
4117
                    };
2✔
4118
                async_open->start(open_realm_completed_callback);
2✔
4119
                REQUIRE(open_realm_pf.future.get());
2!
4120
                REQUIRE_FALSE(subscription_invoked.load());
2!
4121
                REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
4122
                REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
4123
            }
2✔
4124
        }
4✔
4125

4✔
4126
        SECTION("rerun on open set for multiple async open tasks (subscription runs only once)") {
8✔
4127
            auto init_subscription = [](std::shared_ptr<Realm> realm) mutable {
8✔
4128
                REQUIRE(realm);
8!
4129
                auto table = realm->read_group().get_table("class_TopLevel");
8✔
4130
                Query query(table);
8✔
4131
                auto subscription = realm->get_latest_subscription_set();
8✔
4132
                auto mutable_subscription = subscription.make_mutable_copy();
8✔
4133
                mutable_subscription.insert_or_assign(query);
8✔
4134
                mutable_subscription.commit();
8✔
4135
            };
8✔
4136

2✔
4137
            auto open_task1_pf = util::make_promise_future<SharedRealm>();
4✔
4138
            auto open_task2_pf = util::make_promise_future<SharedRealm>();
4✔
4139
            auto open_callback1 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task1_pf.promise))](
4✔
4140
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4141
                REQUIRE_FALSE(err);
4!
4142
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
4143
                REQUIRE(realm);
4!
4144
                promise_holder.get_promise().emplace_value(realm);
4✔
4145
            };
4✔
4146
            auto open_callback2 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task2_pf.promise))](
4✔
4147
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4148
                REQUIRE_FALSE(err);
4!
4149
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
4150
                REQUIRE(realm);
4!
4151
                promise_holder.get_promise().emplace_value(realm);
4✔
4152
            };
4✔
4153

2✔
4154
            config.sync_config->rerun_init_subscription_on_open = true;
4✔
4155
            config.sync_config->subscription_initializer = init_subscription;
4✔
4156

2✔
4157
            SECTION("Realm was already created, but we want to rerun on first open using multiple tasks") {
4✔
4158
                {
2✔
4159
                    subscription_invoked = false;
2✔
4160
                    auto realm = Realm::get_shared_realm(config);
2✔
4161
                    auto sb = realm->get_latest_subscription_set();
2✔
4162
                    auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4163
                    auto state = future.get();
2✔
4164
                    REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4165
                    realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4166
                    REQUIRE(verify_subscription(realm));
2!
4167
                    REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
4168
                    REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
4169
                }
2✔
4170

1✔
4171
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
4172
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
4173
                async_open_task1->start(open_callback1);
2✔
4174
                async_open_task2->start(open_callback2);
2✔
4175

1✔
4176
                auto realm1 = open_task1_pf.future.get();
2✔
4177
                auto realm2 = open_task2_pf.future.get();
2✔
4178

1✔
4179
                const auto version_expected = 2;
2✔
4180
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
4181
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
4182
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
4183
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
4184
                REQUIRE(r1_latest == version_expected);
2!
4185
                REQUIRE(r1_active == version_expected);
2!
4186
            }
2✔
4187
            SECTION("First time realm is created but opened via open async. Both tasks could run the subscription") {
4✔
4188
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
4189
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
4190
                async_open_task1->start(open_callback1);
2✔
4191
                async_open_task2->start(open_callback2);
2✔
4192
                auto realm1 = open_task1_pf.future.get();
2✔
4193
                auto realm2 = open_task2_pf.future.get();
2✔
4194

1✔
4195
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
4196
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
4197
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
4198
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
4199
                // the callback may be run twice, if task1 is the first task to open realm
1✔
4200
                // but it is scheduled after tasks2, which have opened realm later but
1✔
4201
                // by the time it runs, subscription version is equal to 0 (realm creation).
1✔
4202
                // This can only happen the first time that realm is created. All the other times
1✔
4203
                // the init_sb callback is guaranteed to run once.
1✔
4204
                REQUIRE(r1_latest >= 1);
2!
4205
                REQUIRE(r1_latest <= 2);
2!
4206
                REQUIRE(r1_active >= 1);
2!
4207
                REQUIRE(r1_active <= 2);
2!
4208
            }
2✔
4209
        }
4✔
4210
    }
8✔
4211
}
12✔
4212
TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset][async open][baas]") {
2✔
4213
    FLXSyncTestHarness harness("flx_bootstrap_batching");
2✔
4214
    auto foo_obj_id = ObjectId::gen();
2✔
4215
    std::atomic<bool> subscription_invoked = false;
2✔
4216
    harness.load_initial_data([&](SharedRealm realm) {
2✔
4217
        CppContext c(realm);
2✔
4218
        Object::create(c, realm, "TopLevel",
2✔
4219
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
4220
                                        {"queryable_str_field", "foo"s},
2✔
4221
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
4222
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
4223
    });
2✔
4224
    SyncTestFile realm_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
4225

1✔
4226
    auto subscription_callback = [&](std::shared_ptr<Realm> realm) {
2✔
4227
        REQUIRE(realm);
2!
4228
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
4229
        Query query(table);
2✔
4230
        auto subscription = realm->get_latest_subscription_set();
2✔
4231
        auto mutable_subscription = subscription.make_mutable_copy();
2✔
4232
        mutable_subscription.insert_or_assign(query);
2✔
4233
        subscription_invoked = true;
2✔
4234
        mutable_subscription.commit();
2✔
4235
    };
2✔
4236

1✔
4237
    auto before_callback_called = util::make_promise_future<void>();
2✔
4238
    auto after_callback_called = util::make_promise_future<void>();
2✔
4239
    realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
4240
    realm_config.sync_config->subscription_initializer = subscription_callback;
2✔
4241

1✔
4242
    realm_config.sync_config->on_sync_client_event_hook =
2✔
4243
        [&, client_reset_triggered = false](std::weak_ptr<SyncSession> weak_sess,
2✔
4244
                                            const SyncClientHookData& event_data) mutable {
23✔
4245
            auto sess = weak_sess.lock();
23✔
4246
            if (!sess) {
23✔
4247
                return SyncClientHookAction::NoAction;
×
4248
            }
×
4249
            if (sess->path() != realm_config.path) {
23✔
4250
                return SyncClientHookAction::NoAction;
16✔
4251
            }
16✔
4252

4✔
4253
            if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
7✔
4254
                return SyncClientHookAction::NoAction;
5✔
4255
            }
5✔
4256

1✔
4257
            if (client_reset_triggered) {
2✔
4258
                return SyncClientHookAction::NoAction;
×
4259
            }
×
4260
            client_reset_triggered = true;
2✔
4261
            reset_utils::trigger_client_reset(harness.session().app_session());
2✔
4262
            return SyncClientHookAction::EarlyReturn;
2✔
4263
        };
2✔
4264

1✔
4265
    realm_config.sync_config->notify_before_client_reset =
2✔
4266
        [promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))](
2✔
4267
            std::shared_ptr<Realm> realm) mutable {
2✔
4268
            CHECK(realm->schema_version() == 1);
2!
4269
            promise.get_promise().emplace_value();
2✔
4270
        };
2✔
4271

1✔
4272
    realm_config.sync_config->notify_after_client_reset =
2✔
4273
        [promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))](
2✔
4274
            std::shared_ptr<Realm> realm, ThreadSafeReference, bool) mutable {
2✔
4275
            CHECK(realm->schema_version() == 1);
2!
4276
            promise.get_promise().emplace_value();
2✔
4277
        };
2✔
4278

1✔
4279
    auto realm_task = Realm::get_synchronized_realm(realm_config);
2✔
4280
    auto realm_pf = util::make_promise_future<SharedRealm>();
2✔
4281
    realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))](
2✔
4282
                          ThreadSafeReference ref, std::exception_ptr ex) mutable {
2✔
4283
        auto promise = promise_holder.get_promise();
2✔
4284
        if (ex) {
2✔
4285
            try {
×
4286
                std::rethrow_exception(ex);
×
4287
            }
×
4288
            catch (...) {
×
4289
                promise.set_error(exception_to_status());
×
4290
            }
×
4291
            return;
×
4292
        }
2✔
4293
        auto realm = Realm::get_shared_realm(std::move(ref));
2✔
4294
        if (!realm) {
2✔
4295
            promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"});
×
4296
        }
×
4297
        promise.emplace_value(std::move(realm));
2✔
4298
    });
2✔
4299
    auto realm = realm_pf.future.get();
2✔
4300
    before_callback_called.future.get();
2✔
4301
    after_callback_called.future.get();
2✔
4302
    REQUIRE(subscription_invoked.load());
2!
4303
}
2✔
4304

4305
// Test that resending pending subscription sets does not cause any inconsistencies in the progress cursors.
4306
TEST_CASE("flx sync: resend pending subscriptions when reconnecting", "[sync][flx][baas]") {
2✔
4307
    FLXSyncTestHarness harness("flx_pending_subscriptions", {g_large_array_schema, {"queryable_int_field"}});
2✔
4308

1✔
4309
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
4310
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
4311
                                          SyncConfig::FLXSyncEnabled{});
2✔
4312
    interrupted_realm_config.cache = false;
2✔
4313

1✔
4314
    {
2✔
4315
        auto pf = util::make_promise_future<void>();
2✔
4316
        Realm::Config config = interrupted_realm_config;
2✔
4317
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
4318
        config.sync_config->on_sync_client_event_hook =
2✔
4319
            [promise = util::CopyablePromiseHolder(std::move(pf.promise))](std::weak_ptr<SyncSession> weak_session,
2✔
4320
                                                                           const SyncClientHookData& data) mutable {
50✔
4321
                if (data.event != SyncClientHookEvent::BootstrapMessageProcessed &&
50✔
4322
                    data.event != SyncClientHookEvent::BootstrapProcessed) {
43✔
4323
                    return SyncClientHookAction::NoAction;
32✔
4324
                }
32✔
4325
                auto session = weak_session.lock();
18✔
4326
                if (!session) {
18✔
4327
                    return SyncClientHookAction::NoAction;
×
4328
                }
×
4329
                if (data.query_version != 1) {
18✔
4330
                    return SyncClientHookAction::NoAction;
4✔
4331
                }
4✔
4332

7✔
4333
                // Commit a subscriptions set whenever a bootstrap message is received for query version 1.
7✔
4334
                if (data.event == SyncClientHookEvent::BootstrapMessageProcessed) {
14✔
4335
                    auto latest_subs = session->get_flx_subscription_store()->get_latest().make_mutable_copy();
12✔
4336
                    latest_subs.commit();
12✔
4337
                    return SyncClientHookAction::NoAction;
12✔
4338
                }
12✔
4339
                // At least one subscription set was created.
1✔
4340
                CHECK(session->get_flx_subscription_store()->get_latest().version() > 1);
2!
4341
                promise.get_promise().emplace_value();
2✔
4342
                // Reconnect once query version 1 is bootstrapped.
1✔
4343
                return SyncClientHookAction::TriggerReconnect;
2✔
4344
            };
2✔
4345

1✔
4346
        auto realm = Realm::get_shared_realm(config);
2✔
4347
        {
2✔
4348
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
4349
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
4350
            mut_subs.insert_or_assign(Query(table));
2✔
4351
            mut_subs.commit();
2✔
4352
        }
2✔
4353
        pf.future.get();
2✔
4354
        realm->sync_session()->shutdown_and_wait();
2✔
4355
        realm->close();
2✔
4356
    }
2✔
4357

1✔
4358
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
4359

1✔
4360
    // Check at least one subscription set needs to be resent.
1✔
4361
    {
2✔
4362
        DBOptions options;
2✔
4363
        options.encryption_key = test_util::crypt_key();
2✔
4364
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
4365
        auto sub_store = sync::SubscriptionStore::create(realm);
2✔
4366
        auto version_info = sub_store->get_version_info();
2✔
4367
        REQUIRE(version_info.latest > version_info.active);
2!
4368
    }
2✔
4369

1✔
4370
    // Resend the pending subscriptions.
1✔
4371
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
4372
    wait_for_upload(*realm);
2✔
4373
    wait_for_download(*realm);
2✔
4374
}
2✔
4375

4376
} // namespace realm::app
4377

4378
#endif // REALM_ENABLE_AUTH_TESTS
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc