• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1672

15 Sep 2023 05:11PM UTC coverage: 91.217% (+0.02%) from 91.193%
1672

push

Evergreen

web-flow
Fix open async when rerun on open is set. (#6973)

96008 of 175924 branches covered (0.0%)

160 of 170 new or added lines in 3 files covered. (94.12%)

48 existing lines in 15 files now uncovered.

233765 of 256274 relevant lines covered (91.22%)

7112563.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.28
/test/object-store/sync/flx_sync.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2021 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#if REALM_ENABLE_AUTH_TESTS
20

21
#include <catch2/catch_all.hpp>
22

23
#include <util/test_file.hpp>
24
#include <util/crypt_key.hpp>
25
#include <util/sync/flx_sync_harness.hpp>
26
#include <util/sync/sync_test_utils.hpp>
27

28
#include <realm/object_id.hpp>
29
#include <realm/query_expression.hpp>
30

31
#include <realm/object-store/binding_context.hpp>
32
#include <realm/object-store/impl/object_accessor_impl.hpp>
33
#include <realm/object-store/impl/realm_coordinator.hpp>
34
#include <realm/object-store/schema.hpp>
35
#include <realm/object-store/sync/generic_network_transport.hpp>
36
#include <realm/object-store/sync/mongo_client.hpp>
37
#include <realm/object-store/sync/mongo_database.hpp>
38
#include <realm/object-store/sync/mongo_collection.hpp>
39
#include <realm/object-store/sync/async_open_task.hpp>
40
#include <realm/object-store/util/bson/bson.hpp>
41
#include <realm/object-store/sync/sync_session.hpp>
42

43
#include <realm/sync/client_base.hpp>
44
#include <realm/sync/config.hpp>
45
#include <realm/sync/noinst/client_history_impl.hpp>
46
#include <realm/sync/noinst/client_reset.hpp>
47
#include <realm/sync/noinst/client_reset_operation.hpp>
48
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
49
#include <realm/sync/noinst/server/access_token.hpp>
50
#include <realm/sync/protocol.hpp>
51
#include <realm/sync/subscriptions.hpp>
52

53
#include <realm/util/future.hpp>
54
#include <realm/util/logger.hpp>
55

56
#include <catch2/catch_all.hpp>
57

58
#include <filesystem>
59
#include <iostream>
60
#include <stdexcept>
61

62
using namespace std::string_literals;
63

64
namespace realm {
65

66
class TestHelper {
67
public:
68
    static DBRef& get_db(SharedRealm const& shared_realm)
69
    {
70
        return Realm::Internal::get_db(*shared_realm);
71
    }
72
};
73

74
} // namespace realm
75

76
namespace realm::app {
77

78
namespace {
79
const Schema g_minimal_schema{
80
    {"TopLevel",
81
     {
82
         {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
83
     }},
84
};
85

86
const Schema g_large_array_schema{
87
    ObjectSchema("TopLevel",
88
                 {
89
                     {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
90
                     {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
91
                     {"list_of_strings", PropertyType::Array | PropertyType::String},
92
                 }),
93
};
94

95
const Schema g_simple_embedded_obj_schema{
96
    {"TopLevel",
97
     {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
98
      {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
99
      {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
100
    {"TopLevel_embedded_obj",
101
     ObjectSchema::ObjectType::Embedded,
102
     {
103
         {"str_field", PropertyType::String | PropertyType::Nullable},
104
     }},
105
};
106

107
// Populates a FLXSyncTestHarness with the g_large_array_schema with objects that are large enough that
108
// they are guaranteed to fill multiple bootstrap download messages. Currently this means generating 5
109
// objects each with 1024 array entries of 1024 bytes each.
110
//
111
// Returns a list of the _id values for the objects created.
112
std::vector<ObjectId> fill_large_array_schema(FLXSyncTestHarness& harness)
113
{
12✔
114
    std::vector<ObjectId> ret;
12✔
115
    REQUIRE(harness.schema() == g_large_array_schema);
12!
116
    harness.load_initial_data([&](SharedRealm realm) {
12✔
117
        CppContext c(realm);
12✔
118
        for (int i = 0; i < 5; ++i) {
72✔
119
            auto id = ObjectId::gen();
60✔
120
            auto obj = Object::create(c, realm, "TopLevel",
60✔
121
                                      std::any(AnyDict{{"_id", id},
60✔
122
                                                       {"list_of_strings", AnyVector{}},
60✔
123
                                                       {"queryable_int_field", static_cast<int64_t>(i * 5)}}));
60✔
124
            List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings"));
60✔
125
            for (int j = 0; j < 1024; ++j) {
61,500✔
126
                str_list.add(c, std::any(std::string(1024, 'a' + (j % 26))));
61,440✔
127
            }
61,440✔
128

30✔
129
            ret.push_back(id);
60✔
130
        }
60✔
131
    });
12✔
132
    return ret;
12✔
133
}
12✔
134

135
} // namespace
136

137
TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][baas]") {
2✔
138
    FLXSyncTestHarness harness("basic_flx_connect");
2✔
139

1✔
140
    auto foo_obj_id = ObjectId::gen();
2✔
141
    auto bar_obj_id = ObjectId::gen();
2✔
142
    harness.load_initial_data([&](SharedRealm realm) {
2✔
143
        CppContext c(realm);
2✔
144
        Object::create(c, realm, "TopLevel",
2✔
145
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
146
                                        {"queryable_str_field", "foo"s},
2✔
147
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
148
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
149
        Object::create(c, realm, "TopLevel",
2✔
150
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
151
                                        {"queryable_str_field", "bar"s},
2✔
152
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
153
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
154
    });
2✔
155

1✔
156

1✔
157
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
158
        wait_for_download(*realm);
2✔
159
        {
2✔
160
            auto empty_subs = realm->get_latest_subscription_set();
2✔
161
            CHECK(empty_subs.size() == 0);
2!
162
            CHECK(empty_subs.version() == 0);
2!
163
            empty_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
164
        }
2✔
165

1✔
166
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
167
        auto col_key = table->get_column_key("queryable_str_field");
2✔
168
        Query query_foo(table);
2✔
169
        query_foo.equal(col_key, "foo");
2✔
170
        {
2✔
171
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
172
            new_subs.insert_or_assign(query_foo);
2✔
173
            auto subs = new_subs.commit();
2✔
174
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
175
        }
2✔
176

1✔
177
        wait_for_download(*realm);
2✔
178
        {
2✔
179
            wait_for_advance(*realm);
2✔
180
            Results results(realm, table);
2✔
181
            CHECK(results.size() == 1);
2!
182
            auto obj = results.get<Obj>(0);
2✔
183
            CHECK(obj.is_valid());
2!
184
            CHECK(obj.get<ObjectId>("_id") == foo_obj_id);
2!
185
        }
2✔
186

1✔
187
        {
2✔
188
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
189
            Query new_query_bar(table);
2✔
190
            new_query_bar.equal(col_key, "bar");
2✔
191
            mut_subs.insert_or_assign(new_query_bar);
2✔
192
            auto subs = mut_subs.commit();
2✔
193
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
194
        }
2✔
195

1✔
196
        {
2✔
197
            wait_for_advance(*realm);
2✔
198
            Results results(realm, Query(table));
2✔
199
            CHECK(results.size() == 2);
2!
200
        }
2✔
201

1✔
202
        {
2✔
203
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
204
            CHECK(mut_subs.erase(query_foo));
2!
205
            Query new_query_bar(table);
2✔
206
            new_query_bar.equal(col_key, "bar");
2✔
207
            mut_subs.insert_or_assign(new_query_bar);
2✔
208
            auto subs = mut_subs.commit();
2✔
209
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
210
        }
2✔
211

1✔
212
        {
2✔
213
            wait_for_advance(*realm);
2✔
214
            Results results(realm, Query(table));
2✔
215
            CHECK(results.size() == 1);
2!
216
            auto obj = results.get<Obj>(0);
2✔
217
            CHECK(obj.is_valid());
2!
218
            CHECK(obj.get<ObjectId>("_id") == bar_obj_id);
2!
219
        }
2✔
220

1✔
221
        {
2✔
222
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
223
            mut_subs.clear();
2✔
224
            auto subs = mut_subs.commit();
2✔
225
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
226
        }
2✔
227

1✔
228
        {
2✔
229
            wait_for_advance(*realm);
2✔
230
            Results results(realm, table);
2✔
231
            CHECK(results.size() == 0);
2!
232
        }
2✔
233
    });
2✔
234
}
2✔
235

236
TEST_CASE("flx: test commands work", "[sync][flx][test command][baas]") {
2✔
237
    FLXSyncTestHarness harness("test_commands");
2✔
238
    harness.do_with_new_realm([&](const SharedRealm& realm) {
2✔
239
        wait_for_upload(*realm);
2✔
240
        nlohmann::json command_request = {
2✔
241
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
242
        };
2✔
243
        auto resp_body =
2✔
244
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), command_request.dump()).get();
2✔
245
        REQUIRE(resp_body == "{}");
2!
246

1✔
247
        auto bad_status =
2✔
248
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "foobar: }").get_no_throw();
2✔
249
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
250
        REQUIRE_THAT(bad_status.get_status().reason(),
2✔
251
                     Catch::Matchers::ContainsSubstring("Invalid json input to send_test_command"));
2✔
252

1✔
253
        bad_status =
2✔
254
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "{\"cmd\": \"\"}").get_no_throw();
2✔
255
        REQUIRE_FALSE(bad_status.is_ok());
2!
256
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
257
        REQUIRE(bad_status.get_status().reason() ==
2!
258
                "Must supply command name in \"command\" field of test command json object");
2✔
259
    });
2✔
260
}
2✔
261

262

263
static auto make_error_handler()
264
{
38✔
265
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
38✔
266
    auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
38✔
267
    auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) {
37✔
268
        error_promise->emplace_value(std::move(err));
36✔
269
    };
36✔
270
    return std::make_pair(std::move(error_future), std::move(fn));
38✔
271
}
38✔
272

273
static auto make_client_reset_handler()
274
{
16✔
275
    auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
16✔
276
    auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
16✔
277
    auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
16✔
278
        reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
12✔
279
    };
16✔
280
    return std::make_pair(std::move(reset_future), std::move(fn));
16✔
281
}
16✔
282

283

284
TEST_CASE("app: error handling integration test", "[sync][flx][baas]") {
16✔
285
    static std::optional<FLXSyncTestHarness> harness{"error_handling"};
16✔
286
    create_user_and_log_in(harness->app());
16✔
287
    SyncTestFile config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
288
    auto&& [error_future, error_handler] = make_error_handler();
16✔
289
    config.sync_config->error_handler = std::move(error_handler);
16✔
290
    config.sync_config->client_resync_mode = ClientResyncMode::Manual;
16✔
291

8✔
292
    SECTION("handles unknown errors gracefully") {
16✔
293
        auto r = Realm::get_shared_realm(config);
2✔
294
        wait_for_download(*r);
2✔
295
        nlohmann::json error_body = {
2✔
296
            {"tryAgain", false},         {"message", "fake error"},
2✔
297
            {"shouldClientReset", true}, {"isRecoveryModeDisabled", false},
2✔
298
            {"action", "ClientReset"},
2✔
299
        };
2✔
300
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
301
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
302
        auto test_cmd_res =
2✔
303
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
304
                .get();
2✔
305
        REQUIRE(test_cmd_res == "{}");
2!
306
        auto error = wait_for_future(std::move(error_future)).get();
2✔
307
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
308
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
2!
309
        REQUIRE(error.is_fatal);
2!
310
        REQUIRE_THAT(error.status.reason(),
2✔
311
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
312
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
313
    }
2✔
314

8✔
315
    SECTION("unknown errors without actions are application bugs") {
16✔
316
        auto r = Realm::get_shared_realm(config);
2✔
317
        wait_for_download(*r);
2✔
318
        nlohmann::json error_body = {
2✔
319
            {"tryAgain", false},
2✔
320
            {"message", "fake error"},
2✔
321
            {"shouldClientReset", false},
2✔
322
            {"isRecoveryModeDisabled", false},
2✔
323
        };
2✔
324
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
325
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
326
        auto test_cmd_res =
2✔
327
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
328
                .get();
2✔
329
        REQUIRE(test_cmd_res == "{}");
2!
330
        auto error = wait_for_future(std::move(error_future)).get();
2✔
331
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
332
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
333
        REQUIRE(error.is_fatal);
2!
334
        REQUIRE_THAT(error.status.reason(),
2✔
335
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
336
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
337
    }
2✔
338

8✔
339
    SECTION("handles unknown actions gracefully") {
16✔
340
        auto r = Realm::get_shared_realm(config);
2✔
341
        wait_for_download(*r);
2✔
342
        nlohmann::json error_body = {
2✔
343
            {"tryAgain", false},
2✔
344
            {"message", "fake error"},
2✔
345
            {"shouldClientReset", true},
2✔
346
            {"isRecoveryModeDisabled", false},
2✔
347
            {"action", "FakeActionThatWillNeverExist"},
2✔
348
        };
2✔
349
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
350
                                       {"args", nlohmann::json{{"errorCode", 201}, {"errorBody", error_body}}}};
2✔
351
        auto test_cmd_res =
2✔
352
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
353
                .get();
2✔
354
        REQUIRE(test_cmd_res == "{}");
2!
355
        auto error = wait_for_future(std::move(error_future)).get();
2✔
356
        REQUIRE(error.status == ErrorCodes::RuntimeError);
2!
357
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
358
        REQUIRE(error.is_fatal);
2!
359
        REQUIRE_THAT(error.status.reason(), !Catch::Matchers::ContainsSubstring("Unknown sync protocol error code"));
2✔
360
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
361
    }
2✔
362

8✔
363

8✔
364
    SECTION("unknown connection-level errors are still errors") {
16✔
365
        auto r = Realm::get_shared_realm(config);
2✔
366
        wait_for_download(*r);
2✔
367
        nlohmann::json error_body = {{"tryAgain", false},
2✔
368
                                     {"message", "fake error"},
2✔
369
                                     {"shouldClientReset", false},
2✔
370
                                     {"isRecoveryModeDisabled", false},
2✔
371
                                     {"action", "ApplicationBug"}};
2✔
372
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
373
                                       {"args", nlohmann::json{{"errorCode", 199}, {"errorBody", error_body}}}};
2✔
374
        auto test_cmd_res =
2✔
375
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
376
                .get();
2✔
377
        REQUIRE(test_cmd_res == "{}");
2!
378
        auto error = wait_for_future(std::move(error_future)).get();
2✔
379
        REQUIRE(error.status == ErrorCodes::SyncProtocolInvariantFailed);
2!
380
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ProtocolViolation);
2!
381
        REQUIRE(error.is_fatal);
2!
382
    }
2✔
383

8✔
384
    SECTION("client reset errors") {
16✔
385
        auto r = Realm::get_shared_realm(config);
6✔
386
        wait_for_download(*r);
6✔
387
        nlohmann::json error_body = {{"tryAgain", false},
6✔
388
                                     {"message", "fake error"},
6✔
389
                                     {"shouldClientReset", true},
6✔
390
                                     {"isRecoveryModeDisabled", false},
6✔
391
                                     {"action", "ClientReset"}};
6✔
392
        auto code = GENERATE(sync::ProtocolError::bad_client_file_ident, sync::ProtocolError::bad_server_version,
6✔
393
                             sync::ProtocolError::diverging_histories);
6✔
394
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
6✔
395
                                       {"args", nlohmann::json{{"errorCode", code}, {"errorBody", error_body}}}};
6✔
396
        auto test_cmd_res =
6✔
397
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
6✔
398
                .get();
6✔
399
        REQUIRE(test_cmd_res == "{}");
6!
400
        auto error = wait_for_future(std::move(error_future)).get();
6✔
401
        REQUIRE(error.status == ErrorCodes::SyncClientResetRequired);
6!
402
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
6!
403
        REQUIRE(error.is_client_reset_requested());
6!
404
        REQUIRE(error.is_fatal);
6!
405
    }
6✔
406

8✔
407
    SECTION("teardown") {
16✔
408
        harness.reset();
2✔
409
    }
2✔
410
}
16✔
411

412

413
TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
42✔
414
    std::vector<ObjectSchema> schema{
42✔
415
        {"TopLevel",
42✔
416
         {
42✔
417
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
418
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
419
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
42✔
420
             {"non_queryable_field", PropertyType::String | PropertyType::Nullable},
42✔
421
             {"list_of_ints_field", PropertyType::Int | PropertyType::Array},
42✔
422
             {"sum_of_list_field", PropertyType::Int},
42✔
423
         }},
42✔
424
        {"TopLevel2",
42✔
425
         {
42✔
426
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
427
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
428
         }},
42✔
429
    };
42✔
430

21✔
431
    // some of these tests make additive schema changes which is only allowed in dev mode
21✔
432
    constexpr bool dev_mode = true;
42✔
433
    FLXSyncTestHarness harness("flx_client_reset",
42✔
434
                               {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode});
42✔
435

21✔
436
    auto add_object = [](SharedRealm realm, std::string str_field, int64_t int_field,
42✔
437
                         ObjectId oid = ObjectId::gen()) {
116✔
438
        CppContext c(realm);
116✔
439
        realm->begin_transaction();
116✔
440

58✔
441
        int64_t r1 = random_int();
116✔
442
        int64_t r2 = random_int();
116✔
443
        int64_t r3 = random_int();
116✔
444
        int64_t sum = uint64_t(r1) + r2 + r3;
116✔
445

58✔
446
        Object::create(c, realm, "TopLevel",
116✔
447
                       std::any(AnyDict{{"_id", oid},
116✔
448
                                        {"queryable_str_field", str_field},
116✔
449
                                        {"queryable_int_field", int_field},
116✔
450
                                        {"non_queryable_field", "non queryable 1"s},
116✔
451
                                        {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
116✔
452
                                        {"sum_of_list_field", sum}}));
116✔
453
        realm->commit_transaction();
116✔
454
    };
116✔
455

21✔
456
    auto subscribe_to_and_add_objects = [&](SharedRealm realm, size_t num_objects) {
41✔
457
        auto table = realm->read_group().get_table("class_TopLevel");
40✔
458
        auto id_col = table->get_primary_key_column();
40✔
459
        auto sub_set = realm->get_latest_subscription_set();
40✔
460
        for (size_t i = 0; i < num_objects; ++i) {
130✔
461
            auto oid = ObjectId::gen();
90✔
462
            auto mut_sub = sub_set.make_mutable_copy();
90✔
463
            mut_sub.clear();
90✔
464
            mut_sub.insert_or_assign(Query(table).equal(id_col, oid));
90✔
465
            sub_set = mut_sub.commit();
90✔
466
            add_object(realm, util::format("added _id='%1'", oid), 0, oid);
90✔
467
        }
90✔
468
    };
40✔
469

21✔
470
    auto add_subscription_for_new_object = [&](SharedRealm realm, std::string str_field,
42✔
471
                                               int64_t int_field) -> sync::SubscriptionSet {
32✔
472
        auto table = realm->read_group().get_table("class_TopLevel");
22✔
473
        auto queryable_str_field = table->get_column_key("queryable_str_field");
22✔
474
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
22✔
475
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, StringData(str_field)));
22✔
476
        auto resulting_set = sub_set.commit();
22✔
477
        add_object(realm, str_field, int_field);
22✔
478
        return resulting_set;
22✔
479
    };
22✔
480

21✔
481
    auto add_invalid_subscription = [&](SharedRealm realm) -> sync::SubscriptionSet {
22✔
482
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
483
        auto queryable_str_field = table->get_column_key("non_queryable_field");
2✔
484
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
2✔
485
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
486
        auto resulting_set = sub_set.commit();
2✔
487
        return resulting_set;
2✔
488
    };
2✔
489

21✔
490
    auto count_queries_with_str = [](sync::SubscriptionSet subs, std::string_view str) {
24✔
491
        size_t count = 0;
6✔
492
        for (auto sub : subs) {
10✔
493
            if (sub.query_string.find(str) != std::string::npos) {
10✔
494
                ++count;
4✔
495
            }
4✔
496
        }
10✔
497
        return count;
6✔
498
    };
6✔
499
    create_user_and_log_in(harness.app());
42✔
500
    auto user1 = harness.app()->current_user();
42✔
501
    create_user_and_log_in(harness.app());
42✔
502
    auto user2 = harness.app()->current_user();
42✔
503
    SyncTestFile config_local(user1, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
504
    config_local.path += ".local";
42✔
505
    SyncTestFile config_remote(user2, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
506
    config_remote.path += ".remote";
42✔
507
    const std::string str_field_value = "foo";
42✔
508
    const int64_t local_added_int = 100;
42✔
509
    const int64_t remote_added_int = 200;
42✔
510
    size_t before_reset_count = 0;
42✔
511
    size_t after_reset_count = 0;
42✔
512
    config_local.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) {
34✔
513
        ++before_reset_count;
26✔
514
    };
26✔
515
    config_local.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference,
42✔
516
                                                                               bool) {
21✔
517
        ++after_reset_count;
×
518
    };
×
519

21✔
520
    SECTION("Recover: offline writes and subscription (single subscription)") {
42✔
521
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
522
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
523
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
524
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
525
        test_reset
2✔
526
            ->populate_initial_object([&](SharedRealm realm) {
2✔
527
                auto pk_of_added_object = ObjectId::gen();
2✔
528
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
529
                auto table = realm->read_group().get_table(ObjectStore::table_name_for_object_type("TopLevel"));
2✔
530
                REALM_ASSERT(table);
2✔
531
                mut_subs.insert_or_assign(Query(table));
2✔
532
                mut_subs.commit();
2✔
533

1✔
534
                realm->begin_transaction();
2✔
535
                CppContext c(realm);
2✔
536
                int64_t r1 = random_int();
2✔
537
                int64_t r2 = random_int();
2✔
538
                int64_t r3 = random_int();
2✔
539
                int64_t sum = uint64_t(r1) + r2 + r3;
2✔
540

1✔
541
                Object::create(c, realm, "TopLevel",
2✔
542
                               std::any(AnyDict{{"_id"s, pk_of_added_object},
2✔
543
                                                {"queryable_str_field"s, "initial value"s},
2✔
544
                                                {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
2✔
545
                                                {"sum_of_list_field", sum}}));
2✔
546

1✔
547
                realm->commit_transaction();
2✔
548
                wait_for_upload(*realm);
2✔
549
                return pk_of_added_object;
2✔
550
            })
2✔
551
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
552
                add_object(local_realm, str_field_value, local_added_int);
2✔
553
            })
2✔
554
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
555
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
556
                sync::SubscriptionSet::State actual =
2✔
557
                    remote_realm->get_latest_subscription_set()
2✔
558
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
559
                        .get();
2✔
560
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
561
            })
2✔
562
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
563
                wait_for_advance(*local_realm);
2✔
564
                ClientResyncMode mode = client_reset_future.get();
2✔
565
                REQUIRE(mode == ClientResyncMode::Recover);
2!
566
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
567
                auto str_col = table->get_column_key("queryable_str_field");
2✔
568
                auto int_col = table->get_column_key("queryable_int_field");
2✔
569
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
570
                tv.sort(int_col);
2✔
571
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
572
                REQUIRE(tv.size() == 2);
2!
573
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
574
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
575
            })
2✔
576
            ->run();
2✔
577
    }
2✔
578

21✔
579
    SECTION("Recover: subscription and offline writes after client reset failure") {
42✔
580
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
581
        auto&& [error_future, error_handler] = make_error_handler();
2✔
582
        config_local.sync_config->error_handler = error_handler;
2✔
583

1✔
584
        std::string fresh_path = realm::_impl::ClientResetOperation::get_fresh_path_for(config_local.path);
2✔
585
        // create a non-empty directory that we'll fail to delete
1✔
586
        util::make_dir(fresh_path);
2✔
587
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
588

1✔
589
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
590
        test_reset
2✔
591
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
592
                auto mut_sub = local_realm->get_latest_subscription_set().make_mutable_copy();
2✔
593
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
594
                mut_sub.insert_or_assign(Query(table));
2✔
595
                mut_sub.commit();
2✔
596

1✔
597
                CppContext c(local_realm);
2✔
598
                local_realm->begin_transaction();
2✔
599
                Object::create(c, local_realm, "TopLevel2",
2✔
600
                               std::any(AnyDict{{"_id"s, ObjectId::gen()}, {"queryable_str_field"s, "foo"s}}));
2✔
601
                local_realm->commit_transaction();
2✔
602
            })
2✔
603
            ->on_post_reset([](SharedRealm local_realm) {
2✔
604
                // Verify offline subscription was not removed.
1✔
605
                auto subs = local_realm->get_latest_subscription_set();
2✔
606
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
607
                REQUIRE(subs.find(Query(table)));
2!
608
            })
2✔
609
            ->run();
2✔
610

1✔
611
        // Remove the folder preventing the completion of a client reset.
1✔
612
        util::try_remove_dir_recursive(fresh_path);
2✔
613

1✔
614
        auto config_copy = config_local;
2✔
615
        config_local.sync_config->error_handler = nullptr;
2✔
616
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
617
        config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
618

1✔
619
        // Attempt to open the realm again.
1✔
620
        // This time the client reset succeeds and the offline subscription and writes are recovered.
1✔
621
        auto realm = Realm::get_shared_realm(config_copy);
2✔
622
        ClientResyncMode mode = reset_future.get();
2✔
623
        REQUIRE(mode == ClientResyncMode::Recover);
2!
624

1✔
625
        auto table = realm->read_group().get_table("class_TopLevel2");
2✔
626
        auto str_col = table->get_column_key("queryable_str_field");
2✔
627
        REQUIRE(table->size() == 1);
2!
628
        REQUIRE(table->get_object(0).get<String>(str_col) == "foo");
2!
629
    }
2✔
630

21✔
631
    SECTION("Recover: offline writes and subscriptions (multiple subscriptions)") {
42✔
632
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
633
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
634
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
635
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
636
        test_reset
2✔
637
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
638
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
639
            })
2✔
640
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
641
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
642
                sync::SubscriptionSet::State actual =
2✔
643
                    remote_realm->get_latest_subscription_set()
2✔
644
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
645
                        .get();
2✔
646
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
647
            })
2✔
648
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
649
                ClientResyncMode mode = client_reset_future.get();
2✔
650
                REQUIRE(mode == ClientResyncMode::Recover);
2!
651
                auto subs = local_realm->get_latest_subscription_set();
2✔
652
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
653
                // make sure that the subscription for "foo" survived the reset
1✔
654
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
655
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
656
                REQUIRE(count_of_foo == 1);
2!
657
                local_realm->refresh();
2✔
658
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
659
                auto str_col = table->get_column_key("queryable_str_field");
2✔
660
                auto int_col = table->get_column_key("queryable_int_field");
2✔
661
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
662
                tv.sort(int_col);
2✔
663
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
664
                REQUIRE(tv.size() == 2);
2!
665
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
666
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
667
            })
2✔
668
            ->run();
2✔
669
    }
2✔
670

21✔
671
    auto validate_integrity_of_arrays = [](TableRef table) -> size_t {
29✔
672
        auto sum_col = table->get_column_key("sum_of_list_field");
16✔
673
        auto array_col = table->get_column_key("list_of_ints_field");
16✔
674
        auto query = table->column<Lst<Int>>(array_col).sum() == table->column<Int>(sum_col) &&
16✔
675
                     table->column<Lst<Int>>(array_col).size() > 0;
16✔
676
        return query.count();
16✔
677
    };
16✔
678

21✔
679
    SECTION("Recover: offline writes with associated subscriptions in the correct order") {
42✔
680
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
681
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
682
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
683
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
684
        constexpr size_t num_objects_added = 20;
2✔
685
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
686
        constexpr size_t num_objects_added_by_remote = 1;  // make_remote_changes()
2✔
687
        test_reset
2✔
688
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
689
                subscribe_to_and_add_objects(local_realm, num_objects_added);
2✔
690
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
691
                REQUIRE(table->size() == num_objects_added + num_objects_added_by_harness);
2!
692
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
693
                REQUIRE(count_of_valid_array_data == num_objects_added);
2!
694
            })
2✔
695
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
696
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
697
                sync::SubscriptionSet::State actual =
2✔
698
                    remote_realm->get_latest_subscription_set()
2✔
699
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
700
                        .get();
2✔
701
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
702
            })
2✔
703
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
704
                ClientResyncMode mode = client_reset_future.get();
2✔
705
                REQUIRE(mode == ClientResyncMode::Recover);
2!
706
                local_realm->refresh();
2✔
707
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
708
                auto state = latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
709
                REQUIRE(state == sync::SubscriptionSet::State::Complete);
2!
710
                local_realm->refresh();
2✔
711
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
712
                if (table->size() != 1) {
2✔
713
                    table->to_json(std::cout, 1, {});
×
714
                }
×
715
                REQUIRE(table->size() == 1);
2!
716
                auto mut_sub = latest_subs.make_mutable_copy();
2✔
717
                mut_sub.clear();
2✔
718
                mut_sub.insert_or_assign(Query(table));
2✔
719
                latest_subs = mut_sub.commit();
2✔
720
                latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
721
                local_realm->refresh();
2✔
722
                REQUIRE(table->size() ==
2!
723
                        num_objects_added + num_objects_added_by_harness + num_objects_added_by_remote);
2✔
724
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
725
                REQUIRE(count_of_valid_array_data == num_objects_added + num_objects_added_by_remote);
2!
726
            })
2✔
727
            ->run();
2✔
728
    }
2✔
729

21✔
730
    SECTION("Recover: incompatible property changes are rejected") {
42✔
731
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
732
        auto&& [error_future, err_handler] = make_error_handler();
2✔
733
        config_local.sync_config->error_handler = err_handler;
2✔
734
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
735
        constexpr size_t num_objects_added_before = 2;
2✔
736
        constexpr size_t num_objects_added_after = 2;
2✔
737
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
738
        constexpr std::string_view added_property_name = "new_property";
2✔
739
        test_reset
2✔
740
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
741
                subscribe_to_and_add_objects(local_realm, num_objects_added_before);
2✔
742
                Schema local_update = schema;
2✔
743
                Schema::iterator it = local_update.find("TopLevel");
2✔
744
                REQUIRE(it != local_update.end());
2!
745
                it->persisted_properties.push_back(
2✔
746
                    {std::string(added_property_name), PropertyType::Float | PropertyType::Nullable});
2✔
747
                local_realm->update_schema(local_update);
2✔
748
                subscribe_to_and_add_objects(local_realm, num_objects_added_after);
2✔
749
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
750
                REQUIRE(table->size() ==
2!
751
                        num_objects_added_before + num_objects_added_after + num_objects_added_by_harness);
2✔
752
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
753
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
754
            })
2✔
755
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
756
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
757
                Schema remote_update = schema;
2✔
758
                Schema::iterator it = remote_update.find("TopLevel");
2✔
759
                REQUIRE(it != remote_update.end());
2!
760
                it->persisted_properties.push_back(
2✔
761
                    {std::string(added_property_name), PropertyType::UUID | PropertyType::Nullable});
2✔
762
                remote_realm->update_schema(remote_update);
2✔
763
                sync::SubscriptionSet::State actual =
2✔
764
                    remote_realm->get_latest_subscription_set()
2✔
765
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
766
                        .get();
2✔
767
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
768
            })
2✔
769
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm local_realm) mutable {
2✔
770
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
771
                REQUIRE(before_reset_count == 1);
2!
772
                REQUIRE(after_reset_count == 0);
2!
773
                REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
774
                REQUIRE(sync_error.is_client_reset_requested());
2!
775
                local_realm->refresh();
2✔
776
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
777
                // since schema validation happens in the first recovery commit, that whole commit is rolled back
1✔
778
                // and the final state here is "pre reset"
1✔
779
                REQUIRE(table->size() ==
2!
780
                        num_objects_added_before + num_objects_added_by_harness + num_objects_added_after);
2✔
781
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
782
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
783
            })
2✔
784
            ->run();
2✔
785
    }
2✔
786

21✔
787
    SECTION("unsuccessful replay of local changes") {
42✔
788
        constexpr size_t num_objects_added_before = 2;
4✔
789
        constexpr size_t num_objects_added_after = 2;
4✔
790
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
4✔
791
        constexpr std::string_view added_property_name = "new_property";
4✔
792
        auto&& [error_future, err_handler] = make_error_handler();
4✔
793
        config_local.sync_config->error_handler = err_handler;
4✔
794

2✔
795
        // The local changes here are a bit contrived because removing a column is disallowed
2✔
796
        // at the object store layer for sync'd Realms. The only reason a recovery should fail in production
2✔
797
        // during the apply stage is due to programmer error or external factors such as out of disk space.
2✔
798
        // Any schema discrepancies are caught by the initial diff, so the way to make a recovery fail here is
2✔
799
        // to add and remove a column at the core level such that the schema diff passes, but instructions are
2✔
800
        // generated which will fail when applied.
2✔
801
        reset_utils::TestClientReset::Callback make_local_changes_that_will_fail = [&](SharedRealm local_realm) {
4✔
802
            subscribe_to_and_add_objects(local_realm, num_objects_added_before);
4✔
803
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
804
            REQUIRE(table->size() == num_objects_added_before + num_objects_added_by_harness);
4!
805
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
806
            REQUIRE(count_of_valid_array_data == num_objects_added_before);
4!
807
            local_realm->begin_transaction();
4✔
808
            ColKey added = table->add_column(type_Int, added_property_name);
4✔
809
            table->remove_column(added);
4✔
810
            local_realm->commit_transaction();
4✔
811
            subscribe_to_and_add_objects(local_realm, num_objects_added_after); // these are lost!
4✔
812
        };
4✔
813

2✔
814
        reset_utils::TestClientReset::Callback verify_post_reset_state = [&, err_future = std::move(error_future)](
4✔
815
                                                                             SharedRealm local_realm) mutable {
4✔
816
            auto sync_error = wait_for_future(std::move(err_future)).get();
4✔
817
            REQUIRE(before_reset_count == 1);
4!
818
            REQUIRE(after_reset_count == 0);
4!
819
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
4!
820
            REQUIRE(sync_error.is_client_reset_requested());
4!
821
            local_realm->refresh();
4✔
822
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
823
            ColKey added = table->get_column_key(added_property_name);
4✔
824
            REQUIRE(!added); // partial recovery halted at remove_column() but rolled back everything in the change
4!
825
            // table is missing num_objects_added_after and the last commit after the latest subscription
2✔
826
            // this is due to how recovery batches together changesets up until a subscription
2✔
827
            const size_t expected_added_objects = num_objects_added_before - 1;
4✔
828
            REQUIRE(table->size() == expected_added_objects + num_objects_added_by_harness);
4!
829
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
830
            REQUIRE(count_of_valid_array_data == expected_added_objects);
4!
831
        };
4✔
832

2✔
833
        SECTION("Recover: unsuccessful recovery leads to a manual reset") {
4✔
834
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
835
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
836
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
837
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
838
                ->run();
2✔
839
            auto config_copy = config_local;
2✔
840
            auto&& [error_future2, err_handler2] = make_error_handler();
2✔
841
            config_copy.sync_config->error_handler = err_handler2;
2✔
842
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
843
            auto sync_error = wait_for_future(std::move(error_future2)).get();
2✔
844
            REQUIRE(before_reset_count == 2);
2!
845
            REQUIRE(after_reset_count == 0);
2!
846
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
847
            REQUIRE(sync_error.is_client_reset_requested());
2!
848
        }
2✔
849

2✔
850
        SECTION("RecoverOrDiscard: unsuccessful reapply leads to discard") {
4✔
851
            config_local.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard;
2✔
852
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
853
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
854
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
855
                ->run();
2✔
856

1✔
857
            auto config_copy = config_local;
2✔
858
            auto&& [client_reset_future, reset_handler] = make_client_reset_handler();
2✔
859
            config_copy.sync_config->error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
1✔
860
                REALM_ASSERT_EX(!err.is_fatal, err.status);
×
861
                CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
862
            };
×
863
            config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
864
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
865
            ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
866
            REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
867
            realm_post_reset->refresh();
2✔
868
            auto table = realm_post_reset->read_group().get_table("class_TopLevel");
2✔
869
            ColKey added = table->get_column_key(added_property_name);
2✔
870
            REQUIRE(!added);                                        // reverted local changes
2!
871
            REQUIRE(table->size() == num_objects_added_by_harness); // discarded all offline local changes
2!
872
        }
2✔
873
    }
4✔
874

21✔
875
    SECTION("DiscardLocal: offline writes and subscriptions are lost") {
42✔
876
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
877
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
878
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
879
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
880
        test_reset
2✔
881
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
882
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
883
            })
2✔
884
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
885
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
886
            })
2✔
887
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) mutable {
2✔
888
                ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
889
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
890
                auto subs = local_realm->get_latest_subscription_set();
2✔
891
                wait_for_future(subs.get_state_change_notification(sync::SubscriptionSet::State::Complete)).get();
2✔
892
                local_realm->refresh();
2✔
893
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
894
                auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
895
                auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
896
                auto tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
897
                // the object we created while offline was discarded, and the remote object was not downloaded
1✔
898
                REQUIRE(tv.size() == 0);
2!
899
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
900
                // make sure that the subscription for "foo" did not survive the reset
1✔
901
                REQUIRE(count_of_foo == 0);
2!
902
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
903

1✔
904
                // adding data and subscriptions to a reset Realm works as normal
1✔
905
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
906
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
907
                REQUIRE(latest_subs.version() > subs.version());
2!
908
                wait_for_future(latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete))
2✔
909
                    .get();
2✔
910
                local_realm->refresh();
2✔
911
                count_of_foo = count_queries_with_str(latest_subs, util::format("\"%1\"", str_field_value));
2✔
912
                REQUIRE(count_of_foo == 1);
2!
913
                tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
914
                REQUIRE(tv.size() == 2);
2!
915
                tv.sort(queryable_int_field);
2✔
916
                REQUIRE(tv.get_object(0).get<int64_t>(queryable_int_field) == local_added_int);
2!
917
                REQUIRE(tv.get_object(1).get<int64_t>(queryable_int_field) == remote_added_int);
2!
918
            })
2✔
919
            ->run();
2✔
920
    }
2✔
921

21✔
922
    SECTION("DiscardLocal: an invalid subscription made while offline becomes superceeded") {
42✔
923
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
924
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
925
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
926
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
927
        std::unique_ptr<sync::SubscriptionSet> invalid_sub;
2✔
928
        test_reset
2✔
929
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
930
                invalid_sub = std::make_unique<sync::SubscriptionSet>(add_invalid_subscription(local_realm));
2✔
931
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
932
            })
2✔
933
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
934
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
935
            })
2✔
936
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
937
                local_realm->refresh();
2✔
938
                sync::SubscriptionSet::State actual =
2✔
939
                    invalid_sub->get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
940
                REQUIRE(actual == sync::SubscriptionSet::State::Superseded);
2!
941
                ClientResyncMode mode = client_reset_future.get();
2✔
942
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
943
            })
2✔
944
            ->run();
2✔
945
    }
2✔
946

21✔
947
    SECTION("DiscardLocal: an error is produced if a previously successful query becomes invalid due to "
42✔
948
            "server changes across a reset") {
22✔
949
        // Disable dev mode so non-queryable fields are not automatically added as queryable
1✔
950
        const AppSession& app_session = harness.session().app_session();
2✔
951
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
952
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
953
        auto&& [error_future, err_handler] = make_error_handler();
2✔
954
        config_local.sync_config->error_handler = err_handler;
2✔
955
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
956
        test_reset
2✔
957
            ->setup([&](SharedRealm realm) {
2✔
958
                if (realm->sync_session()->path() == config_local.path) {
2✔
959
                    auto added_sub = add_subscription_for_new_object(realm, str_field_value, 0);
2✔
960
                    added_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
961
                }
2✔
962
            })
2✔
963
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
964
                add_object(local_realm, str_field_value, local_added_int);
2✔
965
                // Make "queryable_str_field" not a valid query field.
1✔
966
                // Pre-reset, the Realm had a successful query on it, but now when the client comes back online
1✔
967
                // and tries to reset, the fresh Realm download will fail with a query error.
1✔
968
                const AppSession& app_session = harness.session().app_session();
2✔
969
                auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
2✔
970
                auto baas_sync_config =
2✔
971
                    app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service);
2✔
972
                REQUIRE(baas_sync_config.queryable_field_names->is_array());
2!
973
                auto it = baas_sync_config.queryable_field_names->begin();
2✔
974
                for (; it != baas_sync_config.queryable_field_names->end(); ++it) {
2✔
975
                    if (*it == "queryable_str_field") {
2✔
976
                        break;
2✔
977
                    }
2✔
978
                }
2✔
979
                REQUIRE(it != baas_sync_config.queryable_field_names->end());
2!
980
                baas_sync_config.queryable_field_names->erase(it);
2✔
981
                app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
2✔
982
            })
2✔
983
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm) mutable {
2✔
984
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
985
                INFO(sync_error.status);
2✔
986
                CHECK(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
987
            })
2✔
988
            ->run();
2✔
989
    }
2✔
990

21✔
991
    SECTION("DiscardLocal: completion callbacks fire after client reset even when there is no data to download") {
42✔
992
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
993
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
994
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
995
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
996
        test_reset
2✔
997
            ->on_post_local_changes([&](SharedRealm realm) {
2✔
998
                wait_for_upload(*realm);
2✔
999
                wait_for_download(*realm);
2✔
1000
            })
2✔
1001
            ->run();
2✔
1002
    }
2✔
1003

21✔
1004
    SECTION("DiscardLocal: open realm after client reset failure") {
42✔
1005
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1006
        auto&& [error_future, error_handler] = make_error_handler();
2✔
1007
        config_local.sync_config->error_handler = error_handler;
2✔
1008

1✔
1009
        std::string fresh_path = realm::_impl::ClientResetOperation::get_fresh_path_for(config_local.path);
2✔
1010
        // create a non-empty directory that we'll fail to delete
1✔
1011
        util::make_dir(fresh_path);
2✔
1012
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
1013

1✔
1014
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1015
        test_reset->run();
2✔
1016

1✔
1017
        // Client reset fails due to sync client not being able to create the fresh realm.
1✔
1018
        auto sync_error = wait_for_future(std::move(error_future)).get();
2✔
1019
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1020

1✔
1021
        // Open the realm again. This should not crash.
1✔
1022
        {
2✔
1023
            auto config_copy = config_local;
2✔
1024
            auto&& [err_future, err_handler] = make_error_handler();
2✔
1025
            config_local.sync_config->error_handler = err_handler;
2✔
1026

1✔
1027
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
1028
            auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
1029
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1030
        }
2✔
1031
    }
2✔
1032

21✔
1033
    enum class ResetMode { NoReset, InitiateClientReset };
42✔
1034
    auto seed_realm = [&harness, &subscribe_to_and_add_objects](RealmConfig config, ResetMode reset_mode) {
34✔
1035
        config.sync_config->error_handler = [path = config.path](std::shared_ptr<SyncSession>, SyncError err) {
13✔
1036
            // ignore spurious failures on this instance
1037
            util::format(std::cout, "spurious error while seeding a Realm at '%1': %2\n", path, err.status);
×
1038
        };
×
1039
        SharedRealm realm = Realm::get_shared_realm(config);
26✔
1040
        subscribe_to_and_add_objects(realm, 1);
26✔
1041
        auto subs = realm->get_latest_subscription_set();
26✔
1042
        auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
26✔
1043
        CHECK(result == sync::SubscriptionSet::State::Complete);
26!
1044
        if (reset_mode == ResetMode::InitiateClientReset) {
26✔
1045
            reset_utils::trigger_client_reset(harness.session().app_session(), realm);
18✔
1046
        }
18✔
1047
        realm->close();
26✔
1048
    };
26✔
1049

21✔
1050
    auto setup_reset_handlers_for_schema_validation =
42✔
1051
        [&before_reset_count, &after_reset_count](RealmConfig& config, Schema expected_schema) {
28✔
1052
            auto& sync_config = *config.sync_config;
14✔
1053
            sync_config.error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
7✔
1054
                FAIL(err.status);
×
1055
            };
×
1056
            sync_config.notify_before_client_reset = [&before_reset_count,
14✔
1057
                                                      expected = expected_schema](SharedRealm frozen_before) {
14✔
1058
                ++before_reset_count;
14✔
1059
                REQUIRE(frozen_before->schema().size() > 0);
14!
1060
                REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1061
                REQUIRE(frozen_before->schema() == expected);
14!
1062
            };
14✔
1063

7✔
1064
            auto [promise, future] = util::make_promise_future<void>();
14✔
1065
            sync_config.notify_after_client_reset =
14✔
1066
                [&after_reset_count, promise = util::CopyablePromiseHolder<void>(std::move(promise)), expected_schema,
14✔
1067
                 reset_mode = config.sync_config->client_resync_mode, has_schema = config.schema.has_value()](
14✔
1068
                    SharedRealm frozen_before, ThreadSafeReference after_ref, bool did_recover) mutable {
14✔
1069
                    ++after_reset_count;
14✔
1070
                    REQUIRE(frozen_before->schema().size() > 0);
14!
1071
                    REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1072
                    REQUIRE(frozen_before->schema() == expected_schema);
14!
1073
                    SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_dummy());
14✔
1074
                    if (!has_schema) {
14✔
1075
                        after->set_schema_subset(expected_schema);
4✔
1076
                    }
4✔
1077
                    REQUIRE(after);
14!
1078
                    REQUIRE(after->schema() == expected_schema);
14!
1079
                    // the above check is sufficient unless operator==() is changed to not care about ordering
7✔
1080
                    // so future proof that by explicitly checking the order of properties here as well
7✔
1081
                    REQUIRE(after->schema().size() == frozen_before->schema().size());
14!
1082
                    auto after_it = after->schema().find("TopLevel");
14✔
1083
                    auto before_it = frozen_before->schema().find("TopLevel");
14✔
1084
                    REQUIRE(after_it != after->schema().end());
14!
1085
                    REQUIRE(before_it != frozen_before->schema().end());
14!
1086
                    REQUIRE(after_it->name == before_it->name);
14!
1087
                    REQUIRE(after_it->persisted_properties.size() == before_it->persisted_properties.size());
14!
1088
                    REQUIRE(after_it->persisted_properties[1].name == "queryable_int_field");
14!
1089
                    REQUIRE(after_it->persisted_properties[2].name == "queryable_str_field");
14!
1090
                    REQUIRE(before_it->persisted_properties[1].name == "queryable_int_field");
14!
1091
                    REQUIRE(before_it->persisted_properties[2].name == "queryable_str_field");
14!
1092
                    REQUIRE(did_recover == (reset_mode == ClientResyncMode::Recover));
14!
1093
                    promise.get_promise().emplace_value();
14✔
1094
                };
14✔
1095
            return std::move(future); // move is not redundant here because of how destructing works
14✔
1096
        };
14✔
1097

21✔
1098
    SECTION("Recover: schema indexes match in before and after states") {
42✔
1099
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1100
        // reorder a property such that it does not match the on disk property order
1✔
1101
        std::vector<ObjectSchema> local_schema = schema;
2✔
1102
        std::swap(local_schema[0].persisted_properties[1], local_schema[0].persisted_properties[2]);
2✔
1103
        local_schema[0].persisted_properties.push_back(
2✔
1104
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
2✔
1105
        config_local.schema = local_schema;
2✔
1106
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1107
        auto future = setup_reset_handlers_for_schema_validation(config_local, local_schema);
2✔
1108
        SharedRealm realm = Realm::get_shared_realm(config_local);
2✔
1109
        future.get();
2✔
1110
        CHECK(before_reset_count == 1);
2!
1111
        CHECK(after_reset_count == 1);
2!
1112
    }
2✔
1113

21✔
1114
    SECTION("Adding a local property matching a server addition is allowed") {
42✔
1115
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1116
        config_local.sync_config->client_resync_mode = mode;
4✔
1117
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1118
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1119
        changed_schema[0].persisted_properties.push_back(
4✔
1120
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1121
        // In a separate Realm, make the property addition.
2✔
1122
        // Since this is dev mode, it will be added to the server's schema.
2✔
1123
        config_remote.schema = changed_schema;
4✔
1124
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1125
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1126
        config_local.schema = changed_schema;
4✔
1127
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1128

2✔
1129
        async_open_realm(config_local,
4✔
1130
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1131
                             REQUIRE(ref);
4!
1132
                             REQUIRE_FALSE(error);
4!
1133
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1134
                             fut.get();
4✔
1135
                             CHECK(before_reset_count == 1);
4!
1136
                             CHECK(after_reset_count == 1);
4!
1137
                         });
4✔
1138
    }
4✔
1139

21✔
1140
    SECTION("Adding a local property matching a server addition inside the before reset callback is allowed") {
42✔
1141
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1142
        config_local.sync_config->client_resync_mode = mode;
4✔
1143
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1144
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1145
        changed_schema[0].persisted_properties.push_back(
4✔
1146
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1147
        // In a separate Realm, make the property addition.
2✔
1148
        // Since this is dev mode, it will be added to the server's schema.
2✔
1149
        config_remote.schema = changed_schema;
4✔
1150
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1151
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1152
        config_local.schema.reset();
4✔
1153
        config_local.sync_config->freeze_before_reset_realm = false;
4✔
1154
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1155

2✔
1156
        auto notify_before = std::move(config_local.sync_config->notify_before_client_reset);
4✔
1157
        config_local.sync_config->notify_before_client_reset = [=](std::shared_ptr<Realm> realm) {
4✔
1158
            realm->update_schema(changed_schema);
4✔
1159
            notify_before(realm);
4✔
1160
        };
4✔
1161

2✔
1162
        auto notify_after = std::move(config_local.sync_config->notify_after_client_reset);
4✔
1163
        config_local.sync_config->notify_after_client_reset = [=](std::shared_ptr<Realm> before,
4✔
1164
                                                                  ThreadSafeReference after, bool did_recover) {
4✔
1165
            before->set_schema_subset(changed_schema);
4✔
1166
            notify_after(before, std::move(after), did_recover);
4✔
1167
        };
4✔
1168

2✔
1169
        async_open_realm(config_local,
4✔
1170
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1171
                             REQUIRE(ref);
4!
1172
                             REQUIRE_FALSE(error);
4!
1173
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1174
                             fut.get();
4✔
1175
                             CHECK(before_reset_count == 1);
4!
1176
                             CHECK(after_reset_count == 1);
4!
1177
                         });
4✔
1178
    }
4✔
1179

21✔
1180
    auto make_additive_changes = [](std::vector<ObjectSchema> schema) {
24✔
1181
        schema[0].persisted_properties.push_back(
6✔
1182
            {"added_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
6✔
1183
        std::swap(schema[0].persisted_properties[1], schema[0].persisted_properties[2]);
6✔
1184
        schema.push_back({"AddedClass",
6✔
1185
                          {
6✔
1186
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
6✔
1187
                              {"str_field", PropertyType::String | PropertyType::Nullable},
6✔
1188
                          }});
6✔
1189
        return schema;
6✔
1190
    };
6✔
1191
    SECTION("Recover: additive schema changes are recovered in dev mode") {
42✔
1192
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1193
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1194
        config_local.schema = changed_schema;
2✔
1195
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1196
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1197
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1198
            REQUIRE(ref);
2!
1199
            REQUIRE_FALSE(error);
2!
1200
        });
2✔
1201
        future.get();
2✔
1202
        CHECK(before_reset_count == 1);
2!
1203
        CHECK(after_reset_count == 1);
2!
1204
        auto realm = Realm::get_shared_realm(config_local);
2✔
1205
        {
2✔
1206
            // make changes to the newly added property
1✔
1207
            realm->begin_transaction();
2✔
1208
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
1209
            ColKey new_col = table->get_column_key("added_oid_field");
2✔
1210
            REQUIRE(new_col);
2!
1211
            for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1212
                it->set(new_col, ObjectId::gen());
2✔
1213
            }
2✔
1214
            realm->commit_transaction();
2✔
1215
            // subscribe to the new Class and add an object
1✔
1216
            auto new_table = realm->read_group().get_table("class_AddedClass");
2✔
1217
            auto sub_set = realm->get_latest_subscription_set();
2✔
1218
            auto mut_sub = sub_set.make_mutable_copy();
2✔
1219
            mut_sub.insert_or_assign(Query(new_table));
2✔
1220
            mut_sub.commit();
2✔
1221
            realm->begin_transaction();
2✔
1222
            REQUIRE(new_table);
2!
1223
            new_table->create_object_with_primary_key(ObjectId::gen());
2✔
1224
            realm->commit_transaction();
2✔
1225
        }
2✔
1226
        auto result = realm->get_latest_subscription_set()
2✔
1227
                          .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
1228
                          .get();
2✔
1229
        CHECK(result == sync::SubscriptionSet::State::Complete);
2!
1230
        wait_for_download(*realm);
2✔
1231
    }
2✔
1232

21✔
1233
    SECTION("DiscardLocal: additive schema changes not allowed") {
42✔
1234
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1235
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1236
        config_local.schema = changed_schema;
2✔
1237
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1238
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1239
        config_local.sync_config->error_handler = err_handler;
2✔
1240
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1241
            REQUIRE(!ref);
2!
1242
            REQUIRE(error);
2!
1243
            REQUIRE_THROWS_CONTAINING(std::rethrow_exception(error),
2✔
1244
                                      "A fatal error occurred during client reset: 'Client reset cannot recover when "
2✔
1245
                                      "classes have been removed: {AddedClass}'");
2✔
1246
        });
2✔
1247
        error_future.get();
2✔
1248
        CHECK(before_reset_count == 1);
2!
1249
        CHECK(after_reset_count == 0);
2!
1250
    }
2✔
1251

21✔
1252
    SECTION("Recover: incompatible schema changes on async open are an error") {
42✔
1253
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1254
        std::vector<ObjectSchema> changed_schema = schema;
2✔
1255
        changed_schema[0].persisted_properties[0].type = PropertyType::UUID; // incompatible type change
2✔
1256
        config_local.schema = changed_schema;
2✔
1257
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1258
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1259
        config_local.sync_config->error_handler = err_handler;
2✔
1260
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1261
            REQUIRE(!ref);
2!
1262
            REQUIRE(error);
2!
1263
            REQUIRE_THROWS_CONTAINING(
2✔
1264
                std::rethrow_exception(error),
2✔
1265
                "A fatal error occurred during client reset: 'The following changes cannot be "
2✔
1266
                "made in additive-only schema mode:\n"
2✔
1267
                "- Property 'TopLevel._id' has been changed from 'object id' to 'uuid'.\nIf your app is running in "
2✔
1268
                "development mode, you can delete the realm and restart the app to update your schema.'");
2✔
1269
        });
2✔
1270
        error_future.get();
2✔
1271
        CHECK(before_reset_count == 0); // we didn't even get this far because opening the frozen realm fails
2!
1272
        CHECK(after_reset_count == 0);
2!
1273
    }
2✔
1274

21✔
1275
    SECTION("Recover: additive schema changes without dev mode produce an error after client reset") {
42✔
1276
        const AppSession& app_session = harness.session().app_session();
2✔
1277
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
2✔
1278
        {
2✔
1279
            seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1280
            // Disable dev mode so that schema changes are not allowed
1✔
1281
            app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
1282
            std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1283
            config_local.schema = changed_schema;
2✔
1284
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1285
            (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1286
            auto&& [error_future, err_handler] = make_error_handler();
2✔
1287
            config_local.sync_config->error_handler = err_handler;
2✔
1288
            async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1289
                REQUIRE(ref);
2!
1290
                REQUIRE_FALSE(error);
2!
1291
                auto realm = Realm::get_shared_realm(std::move(ref));
2✔
1292
                // make changes to the new property
1✔
1293
                realm->begin_transaction();
2✔
1294
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
1295
                ColKey new_col = table->get_column_key("added_oid_field");
2✔
1296
                REQUIRE(new_col);
2!
1297
                for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1298
                    it->set(new_col, ObjectId::gen());
2✔
1299
                }
2✔
1300
                realm->commit_transaction();
2✔
1301
            });
2✔
1302
            auto realm = Realm::get_shared_realm(config_local);
2✔
1303
            auto err = error_future.get();
2✔
1304
            std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding "
2✔
1305
                                       "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", "
2✔
1306
                                       "schema changes from clients are restricted when developer mode is disabled";
2✔
1307
            std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema "
2✔
1308
                                    "for Realm table \"AddedClass\", schema changes from clients are restricted when "
2✔
1309
                                    "developer mode is disabled";
2✔
1310
            REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) ||
2✔
1311
                                                  Catch::Matchers::ContainsSubstring(class_err));
2✔
1312
            CHECK(before_reset_count == 1);
2!
1313
            CHECK(after_reset_count == 1);
2!
1314
        }
2✔
1315
    }
2✔
1316

21✔
1317
    // the previous section turns off dev mode, undo that now for later tests
21✔
1318
    const AppSession& app_session = harness.session().app_session();
42✔
1319
    app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
42✔
1320
}
42✔
1321

1322
TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") {
2✔
1323
    FLXSyncTestHarness harness("flx_bad_query", {g_simple_embedded_obj_schema, {"queryable_str_field"}});
2✔
1324
    harness.do_with_new_user([&](auto user) {
2✔
1325
        SyncTestFile config(user, harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
1326
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
1327
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
2✔
1328
        config.sync_config->error_handler = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>,
2✔
1329
                                                                                        SyncError err) {
1✔
1330
            CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1331
            error_promise->emplace_value(std::move(err));
×
1332
        };
×
1333

1✔
1334
        auto realm = Realm::get_shared_realm(config);
2✔
1335
        CppContext c(realm);
2✔
1336
        realm->begin_transaction();
2✔
1337
        REQUIRE_THROWS_AS(
2✔
1338
            Object::create(c, realm, "TopLevel",
2✔
1339
                           std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_str_field", "foo"s}})),
2✔
1340
            NoSubscriptionForWrite);
2✔
1341
        realm->cancel_transaction();
2✔
1342

1✔
1343
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
1344

1✔
1345
        REQUIRE(table->is_empty());
2!
1346
        auto col_key = table->get_column_key("queryable_str_field");
2✔
1347
        {
2✔
1348
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
1349
            new_subs.insert_or_assign(Query(table).equal(col_key, "foo"));
2✔
1350
            auto subs = new_subs.commit();
2✔
1351
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1352
        }
2✔
1353

1✔
1354
        realm->begin_transaction();
2✔
1355
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1356
                                  std::any(AnyDict{{"_id", ObjectId::gen()},
2✔
1357
                                                   {"queryable_str_field", "foo"s},
2✔
1358
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1359
        realm->commit_transaction();
2✔
1360

1✔
1361
        realm->begin_transaction();
2✔
1362
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1363
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1364
        realm->commit_transaction();
2✔
1365

1✔
1366
        wait_for_upload(*realm);
2✔
1367
        wait_for_download(*realm);
2✔
1368
    });
2✔
1369
}
2✔
1370

1371
TEST_CASE("flx: uploading an object that is out-of-view results in compensating write",
1372
          "[sync][flx][compensating write][baas]") {
16✔
1373
    static std::optional<FLXSyncTestHarness> harness;
16✔
1374
    if (!harness) {
16✔
1375
        Schema schema{{"TopLevel",
2✔
1376
                       {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
1377
                        {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1378
                        {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
2✔
1379
                      {"TopLevel_embedded_obj",
2✔
1380
                       ObjectSchema::ObjectType::Embedded,
2✔
1381
                       {{"str_field", PropertyType::String | PropertyType::Nullable}}},
2✔
1382
                      {"Int PK",
2✔
1383
                       {
2✔
1384
                           {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1385
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1386
                       }},
2✔
1387
                      {"String PK",
2✔
1388
                       {
2✔
1389
                           {"_id", PropertyType::String, Property::IsPrimary{true}},
2✔
1390
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1391
                       }},
2✔
1392
                      {"UUID PK",
2✔
1393
                       {
2✔
1394
                           {"_id", PropertyType::UUID, Property::IsPrimary{true}},
2✔
1395
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1396
                       }}};
2✔
1397

1✔
1398
        AppCreateConfig::ServiceRole role;
2✔
1399
        role.name = "compensating_write_perms";
2✔
1400

1✔
1401
        AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
1402
        doc_filters.read = true;
2✔
1403
        doc_filters.write = {{"queryable_str_field", {{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
1404
        role.document_filters = doc_filters;
2✔
1405

1✔
1406
        role.insert_filter = true;
2✔
1407
        role.delete_filter = true;
2✔
1408
        role.read = true;
2✔
1409
        role.write = true;
2✔
1410
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}, {role}};
2✔
1411
        harness.emplace("flx_bad_query", server_schema);
2✔
1412
    }
2✔
1413

8✔
1414
    create_user_and_log_in(harness->app());
16✔
1415
    auto user = harness->app()->current_user();
16✔
1416

8✔
1417
    auto make_error_handler = [] {
16✔
1418
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
16✔
1419
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
16✔
1420
        auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
15✔
1421
            if (!error_promise) {
14✔
1422
                util::format(std::cerr,
×
1423
                             "An unexpected sync error was caught by the default SyncTestFile handler: '%1'\n",
×
1424
                             err.status);
×
1425
                abort();
×
1426
            }
×
1427
            error_promise->emplace_value(std::move(err));
14✔
1428
            error_promise.reset();
14✔
1429
        };
14✔
1430

8✔
1431
        return std::make_pair(std::move(error_future), std::move(fn));
16✔
1432
    };
16✔
1433

8✔
1434
    auto validate_sync_error = [&](const SyncError& sync_error, Mixed expected_pk, const char* expected_object_name,
16✔
1435
                                   const std::string& error_msg_fragment) {
15✔
1436
        CHECK(sync_error.status == ErrorCodes::SyncCompensatingWrite);
14!
1437
        CHECK(!sync_error.is_client_reset_requested());
14!
1438
        CHECK(sync_error.compensating_writes_info.size() == 1);
14!
1439
        CHECK(sync_error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
14!
1440
        auto write_info = sync_error.compensating_writes_info[0];
14✔
1441
        CHECK(write_info.primary_key == expected_pk);
14!
1442
        CHECK(write_info.object_name == expected_object_name);
14!
1443
        CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring(error_msg_fragment));
14✔
1444
    };
14✔
1445

8✔
1446
    SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
1447
    auto&& [error_future, err_handler] = make_error_handler();
16✔
1448
    config.sync_config->error_handler = err_handler;
16✔
1449
    auto realm = Realm::get_shared_realm(config);
16✔
1450
    auto table = realm->read_group().get_table("class_TopLevel");
16✔
1451

8✔
1452
    auto create_subscription = [&](StringData table_name, auto make_query) {
15✔
1453
        auto table = realm->read_group().get_table(table_name);
14✔
1454
        auto queryable_str_field = table->get_column_key("queryable_str_field");
14✔
1455
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
14✔
1456
        new_query.insert_or_assign(make_query(Query(table), queryable_str_field));
14✔
1457
        new_query.commit();
14✔
1458
    };
14✔
1459

8✔
1460
    SECTION("compensating write because of permission violation") {
16✔
1461
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1462
            return q.equal(col, "bizz");
2✔
1463
        });
2✔
1464

1✔
1465
        CppContext c(realm);
2✔
1466
        realm->begin_transaction();
2✔
1467
        auto invalid_obj = ObjectId::gen();
2✔
1468
        Object::create(c, realm, "TopLevel",
2✔
1469
                       std::any(AnyDict{{"_id", invalid_obj}, {"queryable_str_field", "bizz"s}}));
2✔
1470
        realm->commit_transaction();
2✔
1471

1✔
1472
        wait_for_upload(*realm);
2✔
1473
        wait_for_download(*realm);
2✔
1474

1✔
1475
        validate_sync_error(
2✔
1476
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1477
            util::format("write to \"%1\" in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1478

1✔
1479
        wait_for_advance(*realm);
2✔
1480

1✔
1481
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1482
        REQUIRE(top_level_table->is_empty());
2!
1483
    }
2✔
1484

8✔
1485
    SECTION("compensating write because of permission violation with write on embedded object") {
16✔
1486
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1487
            return q.equal(col, "bizz").Or().equal(col, "foo");
2✔
1488
        });
2✔
1489

1✔
1490
        CppContext c(realm);
2✔
1491
        realm->begin_transaction();
2✔
1492
        auto invalid_obj = ObjectId::gen();
2✔
1493
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1494
                                  std::any(AnyDict{{"_id", invalid_obj},
2✔
1495
                                                   {"queryable_str_field", "foo"s},
2✔
1496
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1497
        realm->commit_transaction();
2✔
1498
        realm->begin_transaction();
2✔
1499
        obj.set_property_value(c, "queryable_str_field", std::any{"bizz"s});
2✔
1500
        realm->commit_transaction();
2✔
1501
        realm->begin_transaction();
2✔
1502
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1503
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1504
        realm->commit_transaction();
2✔
1505

1✔
1506
        wait_for_upload(*realm);
2✔
1507
        wait_for_download(*realm);
2✔
1508
        validate_sync_error(
2✔
1509
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1510
            util::format("write to \"%1\" in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1511

1✔
1512
        wait_for_advance(*realm);
2✔
1513

1✔
1514
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1515
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1516
        REQUIRE(util::any_cast<std::string&&>(obj.get_property_value<std::any>(c, "queryable_str_field")) == "foo");
2!
1517
        REQUIRE(util::any_cast<std::string&&>(embedded_obj.get_property_value<std::any>(c, "str_field")) == "bar");
2!
1518

1✔
1519
        realm->begin_transaction();
2✔
1520
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1521
        realm->commit_transaction();
2✔
1522

1✔
1523
        wait_for_upload(*realm);
2✔
1524
        wait_for_download(*realm);
2✔
1525

1✔
1526
        wait_for_advance(*realm);
2✔
1527
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1528
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1529
        REQUIRE(embedded_obj.get_column_value<StringData>("str_field") == "baz");
2!
1530
    }
2✔
1531

8✔
1532
    SECTION("compensating write for writing a top-level object that is out-of-view") {
16✔
1533
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1534
            return q.equal(col, "foo");
2✔
1535
        });
2✔
1536

1✔
1537
        CppContext c(realm);
2✔
1538
        realm->begin_transaction();
2✔
1539
        auto valid_obj = ObjectId::gen();
2✔
1540
        auto invalid_obj = ObjectId::gen();
2✔
1541
        Object::create(c, realm, "TopLevel",
2✔
1542
                       std::any(AnyDict{
2✔
1543
                           {"_id", valid_obj},
2✔
1544
                           {"queryable_str_field", "foo"s},
2✔
1545
                       }));
2✔
1546
        Object::create(c, realm, "TopLevel",
2✔
1547
                       std::any(AnyDict{
2✔
1548
                           {"_id", invalid_obj},
2✔
1549
                           {"queryable_str_field", "bar"s},
2✔
1550
                       }));
2✔
1551
        realm->commit_transaction();
2✔
1552

1✔
1553
        wait_for_upload(*realm);
2✔
1554
        wait_for_download(*realm);
2✔
1555

1✔
1556
        validate_sync_error(std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1557
                            "object is outside of the current query view");
2✔
1558

1✔
1559
        wait_for_advance(*realm);
2✔
1560

1✔
1561
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1562
        REQUIRE(top_level_table->size() == 1);
2!
1563
        REQUIRE(top_level_table->get_object_with_primary_key(valid_obj));
2!
1564

1✔
1565
        // Verify that a valid object afterwards does not produce an error
1✔
1566
        realm->begin_transaction();
2✔
1567
        Object::create(c, realm, "TopLevel",
2✔
1568
                       std::any(AnyDict{
2✔
1569
                           {"_id", ObjectId::gen()},
2✔
1570
                           {"queryable_str_field", "foo"s},
2✔
1571
                       }));
2✔
1572
        realm->commit_transaction();
2✔
1573

1✔
1574
        wait_for_upload(*realm);
2✔
1575
        wait_for_download(*realm);
2✔
1576
    }
2✔
1577

8✔
1578
    SECTION("compensating writes for each primary key type") {
16✔
1579
        SECTION("int") {
8✔
1580
            create_subscription("class_Int PK", [](auto q, auto col) {
2✔
1581
                return q.equal(col, "foo");
2✔
1582
            });
2✔
1583
            realm->begin_transaction();
2✔
1584
            realm->read_group().get_table("class_Int PK")->create_object_with_primary_key(123456);
2✔
1585
            realm->commit_transaction();
2✔
1586

1✔
1587
            wait_for_upload(*realm);
2✔
1588
            wait_for_download(*realm);
2✔
1589

1✔
1590
            validate_sync_error(std::move(error_future).get(), 123456, "Int PK",
2✔
1591
                                "write to \"123456\" in table \"Int PK\" not allowed");
2✔
1592
        }
2✔
1593

4✔
1594
        SECTION("short string") {
8✔
1595
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1596
                return q.equal(col, "foo");
2✔
1597
            });
2✔
1598
            realm->begin_transaction();
2✔
1599
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key("short");
2✔
1600
            realm->commit_transaction();
2✔
1601

1✔
1602
            wait_for_upload(*realm);
2✔
1603
            wait_for_download(*realm);
2✔
1604

1✔
1605
            validate_sync_error(std::move(error_future).get(), "short", "String PK",
2✔
1606
                                "write to \"short\" in table \"String PK\" not allowed");
2✔
1607
        }
2✔
1608

4✔
1609
        SECTION("long string") {
8✔
1610
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1611
                return q.equal(col, "foo");
2✔
1612
            });
2✔
1613
            realm->begin_transaction();
2✔
1614
            const char* pk = "long string which won't fit in the SSO buffer";
2✔
1615
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key(pk);
2✔
1616
            realm->commit_transaction();
2✔
1617

1✔
1618
            wait_for_upload(*realm);
2✔
1619
            wait_for_download(*realm);
2✔
1620

1✔
1621
            validate_sync_error(std::move(error_future).get(), pk, "String PK",
2✔
1622
                                util::format("write to \"%1\" in table \"String PK\" not allowed", pk));
2✔
1623
        }
2✔
1624

4✔
1625
        SECTION("uuid") {
8✔
1626
            create_subscription("class_UUID PK", [](auto q, auto col) {
2✔
1627
                return q.equal(col, "foo");
2✔
1628
            });
2✔
1629
            realm->begin_transaction();
2✔
1630
            UUID pk("01234567-9abc-4def-9012-3456789abcde");
2✔
1631
            realm->read_group().get_table("class_UUID PK")->create_object_with_primary_key(pk);
2✔
1632
            realm->commit_transaction();
2✔
1633

1✔
1634
            wait_for_upload(*realm);
2✔
1635
            wait_for_download(*realm);
2✔
1636

1✔
1637
            validate_sync_error(std::move(error_future).get(), pk, "UUID PK",
2✔
1638
                                util::format("write to \"UUID(%1)\" in table \"UUID PK\" not allowed", pk));
2✔
1639
        }
2✔
1640
    }
8✔
1641

8✔
1642
    // Clear the Realm afterwards as we're reusing an app
8✔
1643
    realm->begin_transaction();
16✔
1644
    table->clear();
16✔
1645
    realm->commit_transaction();
16✔
1646
    wait_for_upload(*realm);
16✔
1647
    realm.reset();
16✔
1648

8✔
1649
    // Add new sections before this
8✔
1650
    SECTION("teardown") {
16✔
1651
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1652
        harness.reset();
2✔
1653
    }
2✔
1654
}
16✔
1655

1656
TEST_CASE("flx: query on non-queryable field results in query error message", "[sync][flx][query][baas]") {
6✔
1657
    static std::optional<FLXSyncTestHarness> harness;
6✔
1658
    if (!harness) {
6✔
1659
        harness.emplace("flx_bad_query");
2✔
1660
    }
2✔
1661

3✔
1662
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
8✔
1663
        auto table = realm->read_group().get_table(table_name);
8✔
1664
        auto queryable_field = table->get_column_key(column_name);
8✔
1665
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
8✔
1666
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
8✔
1667
        return new_query.commit();
8✔
1668
    };
8✔
1669

3✔
1670
    auto check_status = [](auto status) {
6✔
1671
        CHECK(!status.is_ok());
6!
1672
        // Depending on the version of baas used, it may return 'Invalid query:' or
3✔
1673
        // 'Client provided query with bad syntax:'
3✔
1674
        if ((status.get_status().reason().find("Invalid query:") == std::string::npos &&
6✔
1675
             status.get_status().reason().find("Client provided query with bad syntax:") == std::string::npos) ||
3!
1676
            status.get_status().reason().find("\"TopLevel\": key \"non_queryable_field\" is not a queryable field") ==
6✔
1677
                std::string::npos) {
3✔
1678
            FAIL(status.get_status().reason());
×
1679
        }
×
1680
    };
6✔
1681

3✔
1682
    SECTION("Good query after bad query") {
6✔
1683
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1684
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1685
                return q.equal(c, "bar");
2✔
1686
            });
2✔
1687
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1688
            check_status(sub_res);
2✔
1689

1✔
1690
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1691
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1692

1✔
1693
            subs = create_subscription(realm, "class_TopLevel", "queryable_str_field", [](auto q, auto c) {
2✔
1694
                return q.equal(c, "foo");
2✔
1695
            });
2✔
1696
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1697

1✔
1698
            CHECK(realm->get_active_subscription_set().version() == 2);
2!
1699
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1700
        });
2✔
1701
    }
2✔
1702

3✔
1703
    SECTION("Bad query after bad query") {
6✔
1704
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1705
            auto sync_session = realm->sync_session();
2✔
1706
            sync_session->pause();
2✔
1707

1✔
1708
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1709
                return q.equal(c, "bar");
2✔
1710
            });
2✔
1711
            auto subs2 = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1712
                return q.equal(c, "bar");
2✔
1713
            });
2✔
1714

1✔
1715
            sync_session->resume();
2✔
1716

1✔
1717
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1718
            auto sub_res2 =
2✔
1719
                subs2.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1720

1✔
1721
            check_status(sub_res);
2✔
1722
            check_status(sub_res2);
2✔
1723

1✔
1724
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1725
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1726
        });
2✔
1727
    }
2✔
1728

3✔
1729
    // Add new sections before this
3✔
1730
    SECTION("teardown") {
6✔
1731
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1732
        harness.reset();
2✔
1733
    }
2✔
1734
}
6✔
1735

1736
#if REALM_ENABLE_GEOSPATIAL
1737
TEST_CASE("flx: geospatial", "[sync][flx][geospatial][baas]") {
4✔
1738
    static std::optional<FLXSyncTestHarness> harness;
4✔
1739
    if (!harness) {
4✔
1740
        Schema schema{
2✔
1741
            {"restaurant",
2✔
1742
             {
2✔
1743
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1744
                 {"queryable_str_field", PropertyType::String},
2✔
1745
                 {"location", PropertyType::Object | PropertyType::Nullable, "geoPointType"},
2✔
1746
                 {"array", PropertyType::Object | PropertyType::Array, "geoPointType"},
2✔
1747
             }},
2✔
1748
            {"geoPointType",
2✔
1749
             ObjectSchema::ObjectType::Embedded,
2✔
1750
             {
2✔
1751
                 {"type", PropertyType::String},
2✔
1752
                 {"coordinates", PropertyType::Double | PropertyType::Array},
2✔
1753
             }},
2✔
1754
        };
2✔
1755
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}};
2✔
1756
        harness.emplace("flx_geospatial", server_schema);
2✔
1757
    }
2✔
1758

2✔
1759
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
3✔
1760
        auto table = realm->read_group().get_table(table_name);
2✔
1761
        auto queryable_field = table->get_column_key(column_name);
2✔
1762
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
2✔
1763
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
2✔
1764
        return new_query.commit();
2✔
1765
    };
2✔
1766

2✔
1767
    // TODO: when this test starts failing because the server implements the new
2✔
1768
    // syntax, then we should implement an actual geospatial FLX query test here
2✔
1769
    /*
2✔
1770
    auto check_failed_status = [](auto status) {
2✔
1771
        CHECK(!status.is_ok());
2✔
1772
        if (status.get_status().reason().find("Client provided query with bad syntax:") == std::string::npos ||
2✔
1773
            status.get_status().reason().find("\"restaurant\": syntax error") == std::string::npos) {
2✔
1774
            FAIL(status.get_status().reason());
2✔
1775
        }
2✔
1776
    };
2✔
1777

2✔
1778
    SECTION("Server doesn't support GEOWITHIN yet") {
2✔
1779
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1780
            auto subs = create_subscription(realm, "class_restaurant", "location", [](Query q, ColKey c) {
2✔
1781
                GeoBox area{GeoPoint{0.2, 0.2}, GeoPoint{0.7, 0.7}};
2✔
1782
                return q.get_table()->column<Link>(c).geo_within(area);
2✔
1783
            });
2✔
1784
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1785
            check_failed_status(sub_res);
2✔
1786
            CHECK(realm->get_active_subscription_set().version() == 0);
2✔
1787
            CHECK(realm->get_latest_subscription_set().version() == 1);
2✔
1788
        });
2✔
1789
    }
2✔
1790
     */
2✔
1791

2✔
1792
    SECTION("non-geospatial FLX query syncs data which can be queried locally") {
4✔
1793
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1794
            auto subs = create_subscription(realm, "class_restaurant", "queryable_str_field", [](Query q, ColKey c) {
2✔
1795
                return q.equal(c, "synced");
2✔
1796
            });
2✔
1797
            auto make_polygon_filter = [&](const GeoPolygon& polygon) -> bson::BsonDocument {
20✔
1798
                bson::BsonArray inner{};
20✔
1799
                REALM_ASSERT_3(polygon.points.size(), ==, 1);
20✔
1800
                for (auto& point : polygon.points[0]) {
94✔
1801
                    inner.push_back(bson::BsonArray{point.longitude, point.latitude});
94✔
1802
                }
94✔
1803
                bson::BsonArray coords;
20✔
1804
                coords.push_back(inner);
20✔
1805
                bson::BsonDocument geo_bson{{{"type", "Polygon"}, {"coordinates", coords}}};
20✔
1806
                bson::BsonDocument filter{
20✔
1807
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$geometry", geo_bson}}}}}};
20✔
1808
                return filter;
20✔
1809
            };
20✔
1810
            auto make_circle_filter = [&](const GeoCircle& circle) -> bson::BsonDocument {
6✔
1811
                bson::BsonArray coords{circle.center.longitude, circle.center.latitude};
6✔
1812
                bson::BsonArray inner;
6✔
1813
                inner.push_back(coords);
6✔
1814
                inner.push_back(circle.radius_radians);
6✔
1815
                bson::BsonDocument filter{
6✔
1816
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$centerSphere", inner}}}}}};
6✔
1817
                return filter;
6✔
1818
            };
6✔
1819
            auto run_query_on_server = [&](const bson::BsonDocument& filter,
2✔
1820
                                           std::optional<std::string> expected_error = {}) -> size_t {
26✔
1821
                auto remote_client = harness->app()->current_user()->mongo_client("BackingDB");
26✔
1822
                auto db = remote_client.db(harness->session().app_session().config.mongo_dbname);
26✔
1823
                auto restaurant_collection = db["restaurant"];
26✔
1824
                bool processed = false;
26✔
1825
                constexpr int64_t limit = 1000;
26✔
1826
                size_t matches = 0;
26✔
1827
                restaurant_collection.count(filter, limit, [&](uint64_t count, util::Optional<AppError> error) {
26✔
1828
                    processed = true;
26✔
1829
                    if (error) {
26✔
1830
                        if (!expected_error) {
12✔
1831
                            util::format(std::cout, "query error: %1\n", error->reason());
×
1832
                            FAIL(error);
×
1833
                        }
×
1834
                        else {
12✔
1835
                            std::string reason = std::string(error->reason());
12✔
1836
                            std::transform(reason.begin(), reason.end(), reason.begin(), toLowerAscii);
12✔
1837
                            std::transform(expected_error->begin(), expected_error->end(), expected_error->begin(),
12✔
1838
                                           toLowerAscii);
12✔
1839
                            auto pos = reason.find(*expected_error);
12✔
1840
                            if (pos == std::string::npos) {
12✔
1841
                                util::format(std::cout, "mismatch error: '%1' and '%2'\n", reason, *expected_error);
×
1842
                                FAIL(reason);
×
1843
                            }
×
1844
                        }
12✔
1845
                    }
12✔
1846
                    matches = size_t(count);
26✔
1847
                });
26✔
1848
                REQUIRE(processed);
26!
1849
                return matches;
26✔
1850
            };
26✔
1851
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1852
            CHECK(sub_res.is_ok());
2!
1853
            CHECK(realm->get_active_subscription_set().version() == 1);
2!
1854
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1855

1✔
1856
            realm->begin_transaction();
2✔
1857

1✔
1858
            CppContext c(realm);
2✔
1859
            int64_t pk = 0;
2✔
1860
            auto add_point = [&](GeoPoint p) {
16✔
1861
                Object::create(
16✔
1862
                    c, realm, "restaurant",
16✔
1863
                    std::any(AnyDict{
16✔
1864
                        {"_id", ++pk},
16✔
1865
                        {"queryable_str_field", "synced"s},
16✔
1866
                        {"location", AnyDict{{"type", "Point"s},
16✔
1867
                                             {"coordinates", std::vector<std::any>{p.longitude, p.latitude}}}}}));
16✔
1868
            };
16✔
1869
            std::vector<GeoPoint> points = {
2✔
1870
                GeoPoint{-74.006, 40.712800000000001},            // New York city
2✔
1871
                GeoPoint{12.568300000000001, 55.676099999999998}, // Copenhagen
2✔
1872
                GeoPoint{12.082599999999999, 55.628},             // ragnarok, Roskilde
2✔
1873
                GeoPoint{-180.1, -90.1},                          // invalid
2✔
1874
                GeoPoint{0, 90},                                  // north pole
2✔
1875
                GeoPoint{-82.68193, 84.74653},                    // northern point that falls within a box later
2✔
1876
                GeoPoint{82.55243, 84.54981}, // another northern point, but on the other side of the pole
2✔
1877
                GeoPoint{2129, 89},           // invalid
2✔
1878
            };
2✔
1879
            for (auto& point : points) {
16✔
1880
                add_point(point);
16✔
1881
            }
16✔
1882
            realm->commit_transaction();
2✔
1883
            wait_for_upload(*realm);
2✔
1884

1✔
1885
            {
2✔
1886
                auto table = realm->read_group().get_table("class_restaurant");
2✔
1887
                CHECK(table->size() == points.size());
2!
1888
                Obj obj = table->get_object_with_primary_key(Mixed{1});
2✔
1889
                REQUIRE(obj);
2!
1890
                Geospatial geo = obj.get<Geospatial>("location");
2✔
1891
                REQUIRE(geo.get_type_string() == "Point");
2!
1892
                REQUIRE(geo.get_type() == Geospatial::Type::Point);
2!
1893
                GeoPoint point = geo.get<GeoPoint>();
2✔
1894
                REQUIRE(point.longitude == points[0].longitude);
2!
1895
                REQUIRE(point.latitude == points[0].latitude);
2!
1896
                REQUIRE(!point.get_altitude());
2!
1897
                ColKey location_col = table->get_column_key("location");
2✔
1898
                auto run_query_locally = [&table, &location_col](Geospatial bounds) -> size_t {
26✔
1899
                    Query query = table->column<Link>(location_col).geo_within(Geospatial(bounds));
26✔
1900
                    return query.find_all().size();
26✔
1901
                };
26✔
1902

1✔
1903
                reset_utils::wait_for_num_objects_in_atlas(
2✔
1904
                    harness->app()->current_user(), harness->session().app_session(), "restaurant", points.size());
2✔
1905

1✔
1906
                {
2✔
1907
                    GeoPolygon bounds{
2✔
1908
                        {{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}, GeoPoint{-80, 40.7128}}}};
2✔
1909
                    size_t local_matches = run_query_locally(bounds);
2✔
1910
                    size_t server_results = run_query_on_server(make_polygon_filter(bounds));
2✔
1911
                    CHECK(server_results == local_matches);
2!
1912
                }
2✔
1913
                {
2✔
1914
                    GeoCircle circle{.5, GeoPoint{0, 90}};
2✔
1915
                    size_t local_matches = run_query_locally(circle);
2✔
1916
                    size_t server_results = run_query_on_server(make_circle_filter(circle));
2✔
1917
                    CHECK(server_results == local_matches);
2!
1918
                }
2✔
1919
                { // a ring with 3 points without a matching begin/end is an error
2✔
1920
                    GeoPolygon open_bounds{{{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}}}};
2✔
1921
                    CHECK_THROWS_WITH(run_query_locally(open_bounds),
2✔
1922
                                      "Invalid region in GEOWITHIN query for parameter 'GeoPolygon({[-80, 40.7128], "
2✔
1923
                                      "[20, 60], [20, 20]})': 'Ring is not closed, first vertex 'GeoPoint([-80, "
2✔
1924
                                      "40.7128])' does not equal last vertex 'GeoPoint([20, 20])''");
2✔
1925
                    run_query_on_server(make_polygon_filter(open_bounds), "(BadValue) Loop is not closed");
2✔
1926
                }
2✔
1927
                {
2✔
1928
                    GeoCircle circle = GeoCircle::from_kms(10, GeoPoint{-180.1, -90.1});
2✔
1929
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
1930
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([-180.1, -90.1], "
2✔
1931
                                      "0.00156787)': 'Longitude/latitude is out of bounds, lng: -180.1 lat: -90.1'");
2✔
1932
                    run_query_on_server(make_circle_filter(circle), "(BadValue) longitude/latitude is out of bounds");
2✔
1933
                }
2✔
1934
                {
2✔
1935
                    GeoCircle circle = GeoCircle::from_kms(-1, GeoPoint{0, 0});
2✔
1936
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
1937
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([0, 0], "
2✔
1938
                                      "-0.000156787)': 'The radius of a circle must be a non-negative number'");
2✔
1939
                    run_query_on_server(make_circle_filter(circle),
2✔
1940
                                        "(BadValue) radius must be a non-negative number");
2✔
1941
                }
2✔
1942
                {
2✔
1943
                    // This box is from Gershøj to CPH airport. It includes CPH and Ragnarok but not NYC.
1✔
1944
                    std::vector<Geospatial> valid_box_variations = {
2✔
1945
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1946
                               GeoPoint{12.64773, 55.61211}}, // Gershøj, CPH Airport (Top Left, Bottom Right)
2✔
1947
                        GeoBox{GeoPoint{12.64773, 55.61211},
2✔
1948
                               GeoPoint{11.97575, 55.71601}}, // CPH Airport, Gershøj (Bottom Right, Top Left)
2✔
1949
                        GeoBox{GeoPoint{12.64773, 55.71601},
2✔
1950
                               GeoPoint{11.97575, 55.61211}}, // Upper Right, Bottom Left
2✔
1951
                        GeoBox{GeoPoint{11.97575, 55.61211},
2✔
1952
                               GeoPoint{12.64773, 55.71601}}, // Bottom Left, Upper Right
2✔
1953
                    };
2✔
1954
                    constexpr size_t expected_results = 2;
2✔
1955
                    for (auto& geo : valid_box_variations) {
8✔
1956
                        size_t local_matches = run_query_locally(geo);
8✔
1957
                        size_t server_matches =
8✔
1958
                            run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()));
8✔
1959
                        CHECK(local_matches == expected_results);
8!
1960
                        CHECK(server_matches == expected_results);
8!
1961
                    }
8✔
1962
                    std::vector<Geospatial> invalid_boxes = {
2✔
1963
                        GeoBox{GeoPoint{11.97575, 55.71601}, GeoPoint{11.97575, 55.71601}}, // same point twice
2✔
1964
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1965
                               GeoPoint{11.97575, 57.0}}, // two points on the same longitude
2✔
1966
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
1967
                               GeoPoint{12, 55.71601}}, // two points on the same latitude
2✔
1968
                    };
2✔
1969
                    for (auto& geo : invalid_boxes) {
6✔
1970
                        REQUIRE_THROWS_CONTAINING(run_query_locally(geo),
6✔
1971
                                                  "Invalid region in GEOWITHIN query for parameter 'GeoPolygon");
6✔
1972
                        run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()),
6✔
1973
                                            "(BadValue) Loop must have at least 3 different vertices");
6✔
1974
                    }
6✔
1975
                }
2✔
1976
                { // a box region that wraps the north pole. It contains the north pole point
2✔
1977
                    // and two others, one each on distinct sides of the globe.
1✔
1978
                    constexpr double lat = 82.83799;
2✔
1979
                    Geospatial north_pole_box =
2✔
1980
                        GeoPolygon{{{GeoPoint{-78.33951, lat}, GeoPoint{-90.33951, lat}, GeoPoint{90.33951, lat},
2✔
1981
                                     GeoPoint{78.33951, lat}, GeoPoint{-78.33951, lat}}}};
2✔
1982
                    constexpr size_t num_matching_points = 3;
2✔
1983
                    size_t local_matches = run_query_locally(north_pole_box);
2✔
1984
                    size_t server_matches =
2✔
1985
                        run_query_on_server(make_polygon_filter(north_pole_box.get<GeoPolygon>()));
2✔
1986
                    CHECK(local_matches == num_matching_points);
2!
1987
                    CHECK(server_matches == num_matching_points);
2!
1988
                }
2✔
1989
            }
2✔
1990
        });
2✔
1991
    }
2✔
1992

2✔
1993
    // Add new sections before this
2✔
1994
    SECTION("teardown") {
4✔
1995
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1996
        harness.reset();
2✔
1997
    }
2✔
1998
}
4✔
1999
#endif // REALM_ENABLE_GEOSPATIAL
2000

2001
TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][flx][bootstrap][baas]") {
2✔
2002
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
2✔
2003

1✔
2004
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
2005
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
2006
                                          SyncConfig::FLXSyncEnabled{});
2✔
2007
    interrupted_realm_config.cache = false;
2✔
2008

1✔
2009
    {
2✔
2010
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2011
        Realm::Config config = interrupted_realm_config;
2✔
2012
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2013
        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2014
        config.sync_config->on_sync_client_event_hook =
2✔
2015
            [promise = std::move(shared_promise), seen_version_one = false](std::weak_ptr<SyncSession> weak_session,
2✔
2016
                                                                            const SyncClientHookData& data) mutable {
18✔
2017
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
2018
                    return SyncClientHookAction::NoAction;
10✔
2019
                }
10✔
2020

4✔
2021
                auto session = weak_session.lock();
8✔
2022
                if (!session) {
8✔
2023
                    return SyncClientHookAction::NoAction;
×
2024
                }
×
2025

4✔
2026
                // If we haven't seen at least one download message for query version 1, then do nothing yet.
4✔
2027
                if (data.query_version == 0 || (data.query_version == 1 && !std::exchange(seen_version_one, true))) {
8✔
2028
                    return SyncClientHookAction::NoAction;
6✔
2029
                }
6✔
2030

1✔
2031
                REQUIRE(data.query_version == 1);
2!
2032
                REQUIRE(data.batch_state == sync::DownloadBatchState::MoreToCome);
2!
2033
                auto latest_subs = session->get_flx_subscription_store()->get_latest();
2✔
2034
                REQUIRE(latest_subs.version() == 1);
2!
2035
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2036

1✔
2037
                session->close();
2✔
2038
                promise->emplace_value();
2✔
2039

1✔
2040
                return SyncClientHookAction::TriggerReconnect;
2✔
2041
            };
2✔
2042

1✔
2043
        auto realm = Realm::get_shared_realm(config);
2✔
2044
        {
2✔
2045
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2046
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2047
            mut_subs.insert_or_assign(Query(table));
2✔
2048
            mut_subs.commit();
2✔
2049
        }
2✔
2050

1✔
2051
        interrupted.get();
2✔
2052
        realm->sync_session()->shutdown_and_wait();
2✔
2053
    }
2✔
2054

1✔
2055
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
2056

1✔
2057
    {
2✔
2058
        DBOptions options;
2✔
2059
        options.encryption_key = test_util::crypt_key();
2✔
2060
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2061
        auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
1✔
2062
        auto version_info = sub_store->get_version_info();
2✔
2063
        REQUIRE(version_info.active == 0);
2!
2064
        REQUIRE(version_info.latest == 1);
2!
2065
        auto latest_subs = sub_store->get_latest();
2✔
2066
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2067
        REQUIRE(latest_subs.size() == 1);
2!
2068
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
2069
    }
2✔
2070

1✔
2071
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2072
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2073
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2074
    wait_for_upload(*realm);
2✔
2075
    wait_for_download(*realm);
2✔
2076

1✔
2077
    wait_for_advance(*realm);
2✔
2078
    REQUIRE(table->size() == obj_ids_at_end.size());
2!
2079
    for (auto& id : obj_ids_at_end) {
10✔
2080
        REQUIRE(table->find_primary_key(Mixed{id}));
10!
2081
    }
10✔
2082

1✔
2083
    auto active_subs = realm->get_active_subscription_set();
2✔
2084
    auto latest_subs = realm->get_latest_subscription_set();
2✔
2085
    REQUIRE(active_subs.version() == latest_subs.version());
2!
2086
    REQUIRE(active_subs.version() == int64_t(1));
2!
2087
}
2✔
2088

2089
TEST_CASE("flx: dev mode uploads schema before query change", "[sync][flx][query][baas]") {
2✔
2090
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
2091
    auto default_schema = FLXSyncTestHarness::default_server_schema();
2✔
2092
    server_schema.queryable_fields = default_schema.queryable_fields;
2✔
2093
    server_schema.dev_mode_enabled = true;
2✔
2094
    server_schema.schema = Schema{};
2✔
2095

1✔
2096
    FLXSyncTestHarness harness("flx_dev_mode", server_schema);
2✔
2097
    auto foo_obj_id = ObjectId::gen();
2✔
2098
    auto bar_obj_id = ObjectId::gen();
2✔
2099
    harness.do_with_new_realm(
2✔
2100
        [&](SharedRealm realm) {
2✔
2101
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2102
            // auto queryable_str_field = table->get_column_key("queryable_str_field");
1✔
2103
            // auto queryable_int_field = table->get_column_key("queryable_int_field");
1✔
2104
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2105
            new_query.insert_or_assign(Query(table));
2✔
2106
            new_query.commit();
2✔
2107

1✔
2108
            CppContext c(realm);
2✔
2109
            realm->begin_transaction();
2✔
2110
            Object::create(c, realm, "TopLevel",
2✔
2111
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
2112
                                            {"queryable_str_field", "foo"s},
2✔
2113
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2114
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
2115
            Object::create(c, realm, "TopLevel",
2✔
2116
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
2117
                                            {"queryable_str_field", "bar"s},
2✔
2118
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2119
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
2120
            realm->commit_transaction();
2✔
2121

1✔
2122
            wait_for_upload(*realm);
2✔
2123
        },
2✔
2124
        default_schema.schema);
2✔
2125

1✔
2126
    harness.do_with_new_realm(
2✔
2127
        [&](SharedRealm realm) {
2✔
2128
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2129
            auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2130
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2131
            new_query.insert_or_assign(Query(table).greater_equal(queryable_int_field, int64_t(5)));
2✔
2132
            auto subs = new_query.commit();
2✔
2133
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2134
            wait_for_download(*realm);
2✔
2135
            Results results(realm, table);
2✔
2136

1✔
2137
            realm->refresh();
2✔
2138
            CHECK(results.size() == 2);
2!
2139
            CHECK(table->get_object_with_primary_key({foo_obj_id}).is_valid());
2!
2140
            CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2141
        },
2✔
2142
        default_schema.schema);
2✔
2143
}
2✔
2144

2145
// This is a test case for the server's fix for RCORE-969
2146
TEST_CASE("flx: change-of-query history divergence", "[sync][flx][query][baas]") {
2✔
2147
    FLXSyncTestHarness harness("flx_coq_divergence");
2✔
2148

1✔
2149
    // first we create an object on the server and upload it.
1✔
2150
    auto foo_obj_id = ObjectId::gen();
2✔
2151
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2152
        CppContext c(realm);
2✔
2153
        Object::create(c, realm, "TopLevel",
2✔
2154
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2155
                                        {"queryable_str_field", "foo"s},
2✔
2156
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2157
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
2158
    });
2✔
2159

1✔
2160
    // Now create another realm and wait for it to be fully synchronized with bootstrap version zero. i.e.
1✔
2161
    // our progress counters should be past the history entry containing the object created above.
1✔
2162
    auto test_file_config = harness.make_test_file();
2✔
2163
    auto realm = Realm::get_shared_realm(test_file_config);
2✔
2164
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2165
    auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2166

1✔
2167
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2168
    wait_for_upload(*realm);
2✔
2169
    wait_for_download(*realm);
2✔
2170

1✔
2171
    // Now disconnect the sync session
1✔
2172
    realm->sync_session()->pause();
2✔
2173

1✔
2174
    // And move the "foo" object created above into view and create a different diverging copy of it locally.
1✔
2175
    auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2176
    mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2177
    auto subs = mut_subs.commit();
2✔
2178

1✔
2179
    realm->begin_transaction();
2✔
2180
    CppContext c(realm);
2✔
2181
    Object::create(c, realm, "TopLevel",
2✔
2182
                   std::any(AnyDict{{"_id", foo_obj_id},
2✔
2183
                                    {"queryable_str_field", "foo"s},
2✔
2184
                                    {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2185
                                    {"non_queryable_field", "created locally"s}}));
2✔
2186
    realm->commit_transaction();
2✔
2187

1✔
2188
    // Reconnect the sync session and wait for the subscription that moved "foo" into view to be fully synchronized.
1✔
2189
    realm->sync_session()->resume();
2✔
2190
    wait_for_upload(*realm);
2✔
2191
    wait_for_download(*realm);
2✔
2192
    subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2193

1✔
2194
    wait_for_advance(*realm);
2✔
2195

1✔
2196
    // The bootstrap should have erase/re-created our object and we should have the version from the server
1✔
2197
    // locally.
1✔
2198
    auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2199
    REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2200
    REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2201

1✔
2202
    // Likewise, if we create a new realm and download all the objects, we should see the initial server version
1✔
2203
    // in the new realm rather than the "created locally" one.
1✔
2204
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2205
        CppContext c(realm);
2✔
2206

1✔
2207
        auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2208
        REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2209
        REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2210
    });
2✔
2211
}
2✔
2212

2213
TEST_CASE("flx: writes work offline", "[sync][flx][baas]") {
2✔
2214
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2215

1✔
2216
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2217
        auto sync_session = realm->sync_session();
2✔
2218
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2219
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2220
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2221
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2222
        new_query.insert_or_assign(Query(table));
2✔
2223
        new_query.commit();
2✔
2224

1✔
2225
        auto foo_obj_id = ObjectId::gen();
2✔
2226
        auto bar_obj_id = ObjectId::gen();
2✔
2227

1✔
2228
        CppContext c(realm);
2✔
2229
        realm->begin_transaction();
2✔
2230
        Object::create(c, realm, "TopLevel",
2✔
2231
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2232
                                        {"queryable_str_field", "foo"s},
2✔
2233
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2234
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2235
        Object::create(c, realm, "TopLevel",
2✔
2236
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2237
                                        {"queryable_str_field", "bar"s},
2✔
2238
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2239
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2240
        realm->commit_transaction();
2✔
2241

1✔
2242
        wait_for_upload(*realm);
2✔
2243
        wait_for_download(*realm);
2✔
2244
        sync_session->pause();
2✔
2245

1✔
2246
        // Make it so the subscriptions only match the "foo" object
1✔
2247
        {
2✔
2248
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2249
            mut_subs.clear();
2✔
2250
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2251
            mut_subs.commit();
2✔
2252
        }
2✔
2253

1✔
2254
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2255
        // multiple subscription set updates offline and that the last one eventually takes effect when
1✔
2256
        // you come back online and fully synchronize.
1✔
2257
        {
2✔
2258
            Results results(realm, table);
2✔
2259
            realm->begin_transaction();
2✔
2260
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2261
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2262
            realm->commit_transaction();
2✔
2263
        }
2✔
2264

1✔
2265
        // Update our subscriptions so that both foo/bar will be included
1✔
2266
        {
2✔
2267
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2268
            mut_subs.clear();
2✔
2269
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2270
            mut_subs.commit();
2✔
2271
        }
2✔
2272

1✔
2273
        // Make foo out of view for the current subscription.
1✔
2274
        {
2✔
2275
            Results results(realm, table);
2✔
2276
            realm->begin_transaction();
2✔
2277
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2278
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2279
            realm->commit_transaction();
2✔
2280
        }
2✔
2281

1✔
2282
        sync_session->resume();
2✔
2283
        wait_for_upload(*realm);
2✔
2284
        wait_for_download(*realm);
2✔
2285

1✔
2286
        realm->refresh();
2✔
2287
        Results results(realm, table);
2✔
2288
        CHECK(results.size() == 1);
2!
2289
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2290
    });
2✔
2291
}
2✔
2292

2293
TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") {
2✔
2294
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2295

1✔
2296
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2297
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2298
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2299
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2300
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2301
        new_query.insert_or_assign(Query(table));
2✔
2302
        new_query.commit();
2✔
2303

1✔
2304
        auto foo_obj_id = ObjectId::gen();
2✔
2305
        auto bar_obj_id = ObjectId::gen();
2✔
2306

1✔
2307
        CppContext c(realm);
2✔
2308
        realm->begin_transaction();
2✔
2309
        Object::create(c, realm, "TopLevel",
2✔
2310
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2311
                                        {"queryable_str_field", "foo"s},
2✔
2312
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2313
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2314
        Object::create(c, realm, "TopLevel",
2✔
2315
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2316
                                        {"queryable_str_field", "bar"s},
2✔
2317
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2318
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2319
        realm->commit_transaction();
2✔
2320

1✔
2321
        wait_for_upload(*realm);
2✔
2322

1✔
2323
        // Make it so the subscriptions only match the "foo" object
1✔
2324
        {
2✔
2325
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2326
            mut_subs.clear();
2✔
2327
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2328
            mut_subs.commit();
2✔
2329
        }
2✔
2330

1✔
2331
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2332
        // multiple subscription set updates without waiting and that the last one eventually takes effect when
1✔
2333
        // you fully synchronize.
1✔
2334
        {
2✔
2335
            Results results(realm, table);
2✔
2336
            realm->begin_transaction();
2✔
2337
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2338
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2339
            realm->commit_transaction();
2✔
2340
        }
2✔
2341

1✔
2342
        // Update our subscriptions so that both foo/bar will be included
1✔
2343
        {
2✔
2344
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2345
            mut_subs.clear();
2✔
2346
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2347
            mut_subs.commit();
2✔
2348
        }
2✔
2349

1✔
2350
        // Make foo out-of-view for the current subscription.
1✔
2351
        {
2✔
2352
            Results results(realm, table);
2✔
2353
            realm->begin_transaction();
2✔
2354
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2355
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2356
            realm->commit_transaction();
2✔
2357
        }
2✔
2358

1✔
2359
        wait_for_upload(*realm);
2✔
2360
        wait_for_download(*realm);
2✔
2361

1✔
2362
        realm->refresh();
2✔
2363
        Results results(realm, table);
2✔
2364
        CHECK(results.size() == 1);
2!
2365
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2366
    });
2✔
2367
}
2✔
2368

2369
TEST_CASE("flx: verify websocket protocol number and prefixes", "[sync][protocol]") {
2✔
2370
    // Update the expected value whenever the protocol version is updated - this ensures
1✔
2371
    // that the current protocol version does not change unexpectedly.
1✔
2372
    REQUIRE(10 == sync::get_current_protocol_version());
2✔
2373
    // This was updated in Protocol V8 to use '#' instead of '/' to support the Web SDK
1✔
2374
    REQUIRE("com.mongodb.realm-sync#" == sync::get_pbs_websocket_protocol_prefix());
2✔
2375
    REQUIRE("com.mongodb.realm-query-sync#" == sync::get_flx_websocket_protocol_prefix());
2✔
2376
}
2✔
2377

2378
// TODO: remote-baas: This test fails consistently with Windows remote baas server - to be fixed in RCORE-1674
2379
#ifndef _WIN32
2380
TEST_CASE("flx: subscriptions persist after closing/reopening", "[sync][flx][baas]") {
2✔
2381
    FLXSyncTestHarness harness("flx_bad_query");
2✔
2382
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2383

1✔
2384
    {
2✔
2385
        auto orig_realm = Realm::get_shared_realm(config);
2✔
2386
        auto mut_subs = orig_realm->get_latest_subscription_set().make_mutable_copy();
2✔
2387
        mut_subs.insert_or_assign(Query(orig_realm->read_group().get_table("class_TopLevel")));
2✔
2388
        mut_subs.commit();
2✔
2389
        orig_realm->close();
2✔
2390
    }
2✔
2391

1✔
2392
    {
2✔
2393
        auto new_realm = Realm::get_shared_realm(config);
2✔
2394
        auto latest_subs = new_realm->get_latest_subscription_set();
2✔
2395
        CHECK(latest_subs.size() == 1);
2!
2396
        latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2397
    }
2✔
2398
}
2✔
2399
#endif
2400

2401
TEST_CASE("flx: no subscription store created for PBS app", "[sync][flx][baas]") {
2✔
2402
    const std::string base_url = get_base_url();
2✔
2403
    auto server_app_config = minimal_app_config(base_url, "flx_connect_as_pbs", g_minimal_schema);
2✔
2404
    TestAppSession session(create_app(server_app_config));
2✔
2405
    SyncTestFile config(session.app(), bson::Bson{}, g_minimal_schema);
2✔
2406

1✔
2407
    auto realm = Realm::get_shared_realm(config);
2✔
2408
    CHECK(!wait_for_download(*realm));
2!
2409
    CHECK(!wait_for_upload(*realm));
2!
2410

1✔
2411
    CHECK(!realm->sync_session()->get_flx_subscription_store());
2!
2412

1✔
2413
    CHECK_THROWS_AS(realm->get_active_subscription_set(), IllegalOperation);
2✔
2414
    CHECK_THROWS_AS(realm->get_latest_subscription_set(), IllegalOperation);
2✔
2415
}
2✔
2416

2417
TEST_CASE("flx: connect to FLX as PBS returns an error", "[sync][flx][baas]") {
2✔
2418
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2419
    SyncTestFile config(harness.app(), bson::Bson{}, harness.schema());
2✔
2420
    std::mutex sync_error_mutex;
2✔
2421
    util::Optional<SyncError> sync_error;
2✔
2422
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2423
        std::lock_guard<std::mutex> lk(sync_error_mutex);
2✔
2424
        sync_error = std::move(error);
2✔
2425
    };
2✔
2426
    auto realm = Realm::get_shared_realm(config);
2✔
2427
    timed_wait_for([&] {
6,615✔
2428
        std::lock_guard<std::mutex> lk(sync_error_mutex);
6,615✔
2429
        return static_cast<bool>(sync_error);
6,615✔
2430
    });
6,615✔
2431

1✔
2432
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2433
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2434
}
2✔
2435

2436
TEST_CASE("flx: connect to FLX with partition value returns an error", "[sync][flx][protocol][baas]") {
2✔
2437
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2438
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2439
    config.sync_config->partition_value = "\"foobar\"";
2✔
2440

1✔
2441
    REQUIRE_EXCEPTION(Realm::get_shared_realm(config), IllegalCombination,
2✔
2442
                      "Cannot specify a partition value when flexible sync is enabled");
2✔
2443
}
2✔
2444

2445
TEST_CASE("flx: connect to PBS as FLX returns an error", "[sync][flx][protocol][baas]") {
2✔
2446
    const std::string base_url = get_base_url();
2✔
2447

1✔
2448
    auto server_app_config = minimal_app_config(base_url, "flx_connect_as_pbs", g_minimal_schema);
2✔
2449
    TestAppSession session(create_app(server_app_config));
2✔
2450
    auto app = session.app();
2✔
2451
    auto user = app->current_user();
2✔
2452

1✔
2453
    SyncTestFile config(user, g_minimal_schema, SyncConfig::FLXSyncEnabled{});
2✔
2454

1✔
2455
    std::mutex sync_error_mutex;
2✔
2456
    util::Optional<SyncError> sync_error;
2✔
2457
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2458
        std::lock_guard<std::mutex> lk(sync_error_mutex);
2✔
2459
        sync_error = std::move(error);
2✔
2460
    };
2✔
2461
    auto realm = Realm::get_shared_realm(config);
2✔
2462
    auto latest_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2463
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2464
    Query new_query_a(table);
2✔
2465
    new_query_a.equal(table->get_column_key("_id"), ObjectId::gen());
2✔
2466
    latest_subs.insert_or_assign(std::move(new_query_a));
2✔
2467
    latest_subs.commit();
2✔
2468

1✔
2469
    timed_wait_for([&] {
7,653✔
2470
        std::lock_guard<std::mutex> lk(sync_error_mutex);
7,653✔
2471
        return static_cast<bool>(sync_error);
7,653✔
2472
    });
7,653✔
2473

1✔
2474
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2475
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2476
}
2✔
2477

2478
TEST_CASE("flx: commit subscription while refreshing the access token", "[sync][flx][token][baas]") {
2✔
2479
    class HookedTransport : public SynchronousTestTransport {
2✔
2480
    public:
2✔
2481
        void send_request_to_server(const Request& request,
2✔
2482
                                    util::UniqueFunction<void(const Response&)>&& completion) override
2✔
2483
        {
10✔
2484
            if (request_hook) {
10✔
2485
                request_hook(request);
2✔
2486
            }
2✔
2487
            SynchronousTestTransport::send_request_to_server(request, std::move(completion));
10✔
2488
        }
10✔
2489
        util::UniqueFunction<void(const Request&)> request_hook;
2✔
2490
    };
2✔
2491

1✔
2492
    auto transport = std::make_shared<HookedTransport>();
2✔
2493
    FLXSyncTestHarness harness("flx_wait_access_token2", FLXSyncTestHarness::default_server_schema(), transport);
2✔
2494
    auto app = harness.app();
2✔
2495
    std::shared_ptr<SyncUser> user = app->current_user();
2✔
2496
    REQUIRE(user);
2!
2497
    REQUIRE(!user->access_token_refresh_required());
2!
2498
    // Set a bad access token, with an expired time. This will trigger a refresh initiated by the client.
1✔
2499
    std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
2✔
2500
    using namespace std::chrono_literals;
2✔
2501
    auto expires = std::chrono::system_clock::to_time_t(now - 30s);
2✔
2502
    user->update_access_token(encode_fake_jwt("fake_access_token", expires));
2✔
2503
    REQUIRE(user->access_token_refresh_required());
2!
2504

1✔
2505
    bool seen_waiting_for_access_token = false;
2✔
2506
    // Commit a subcription set while there is no sync session.
1✔
2507
    // A session is created when the access token is refreshed.
1✔
2508
    transport->request_hook = [&](const Request&) {
2✔
2509
        auto user = app->current_user();
2✔
2510
        REQUIRE(user);
2!
2511
        for (auto& session : user->all_sessions()) {
2✔
2512
            if (session->state() == SyncSession::State::WaitingForAccessToken) {
2✔
2513
                REQUIRE(!seen_waiting_for_access_token);
2!
2514
                seen_waiting_for_access_token = true;
2✔
2515

1✔
2516
                auto store = session->get_flx_subscription_store();
2✔
2517
                REQUIRE(store);
2!
2518
                auto mut_subs = store->get_latest().make_mutable_copy();
2✔
2519
                mut_subs.commit();
2✔
2520
            }
2✔
2521
        }
2✔
2522
    };
2✔
2523
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2524
    // This triggers the token refresh.
1✔
2525
    auto r = Realm::get_shared_realm(config);
2✔
2526
    REQUIRE(seen_waiting_for_access_token);
2!
2527
}
2✔
2528

2529
TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][bootstrap][baas]") {
6✔
2530
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
6✔
2531

3✔
2532
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
6✔
2533
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
6✔
2534
                                          SyncConfig::FLXSyncEnabled{});
6✔
2535
    interrupted_realm_config.cache = false;
6✔
2536

3✔
2537
    auto check_interrupted_state = [&](const DBRef& realm) {
6✔
2538
        auto tr = realm->start_read();
6✔
2539
        auto top_level = tr->get_table("class_TopLevel");
6✔
2540
        REQUIRE(top_level);
6!
2541
        REQUIRE(top_level->is_empty());
6!
2542

3✔
2543
        auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
6✔
2544
        auto version_info = sub_store->get_version_info();
6✔
2545
        REQUIRE(version_info.latest == 1);
6!
2546
        REQUIRE(version_info.active == 0);
6!
2547
        auto latest_subs = sub_store->get_latest();
6✔
2548
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
6!
2549
        REQUIRE(latest_subs.size() == 1);
6!
2550
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
6!
2551
    };
6✔
2552

3✔
2553
    auto mutate_realm = [&] {
5✔
2554
        harness.load_initial_data([&](SharedRealm realm) {
4✔
2555
            auto table = realm->read_group().get_table("class_TopLevel");
4✔
2556
            Results res(realm, Query(table).greater(table->get_column_key("queryable_int_field"), int64_t(10)));
4✔
2557
            REQUIRE(res.size() == 2);
4!
2558
            res.clear();
4✔
2559
        });
4✔
2560
    };
4✔
2561

3✔
2562
    SECTION("exception occurs during bootstrap application") {
6✔
2563
        {
2✔
2564
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2565
            Realm::Config config = interrupted_realm_config;
2✔
2566
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2567
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2568
            config.sync_config->on_sync_client_event_hook =
2✔
2569
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2570
                                                      const SyncClientHookData& data) mutable {
36✔
2571
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2572
                        return SyncClientHookAction::NoAction;
22✔
2573
                    }
22✔
2574
                    auto session = weak_session.lock();
14✔
2575
                    if (!session) {
14✔
2576
                        return SyncClientHookAction::NoAction;
×
2577
                    }
×
2578

7✔
2579
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2580
                        session->close();
2✔
2581
                        promise->emplace_value();
2✔
2582
                        return SyncClientHookAction::EarlyReturn;
2✔
2583
                    }
2✔
2584
                    return SyncClientHookAction::NoAction;
12✔
2585
                };
12✔
2586
            auto realm = Realm::get_shared_realm(config);
2✔
2587
            {
2✔
2588
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2589
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2590
                mut_subs.insert_or_assign(Query(table));
2✔
2591
                mut_subs.commit();
2✔
2592
            }
2✔
2593

1✔
2594
            interrupted.get();
2✔
2595
            realm->sync_session()->shutdown_and_wait();
2✔
2596
            realm->close();
2✔
2597
        }
2✔
2598

1✔
2599
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2600

1✔
2601
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2602
        // we expected it to be in.
1✔
2603
        {
2✔
2604
            DBOptions options;
2✔
2605
            options.encryption_key = test_util::crypt_key();
2✔
2606
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2607
            util::StderrLogger logger;
2✔
2608
            sync::PendingBootstrapStore bootstrap_store(realm, logger);
2✔
2609
            REQUIRE(bootstrap_store.has_pending());
2!
2610
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2611
            REQUIRE(pending_batch.query_version == 1);
2!
2612
            REQUIRE(pending_batch.progress);
2!
2613

1✔
2614
            check_interrupted_state(realm);
2✔
2615
        }
2✔
2616

1✔
2617
        interrupted_realm_config.sync_config->simulate_integration_error = true;
2✔
2618
        auto error_pf = util::make_promise_future<SyncError>();
2✔
2619
        interrupted_realm_config.sync_config->error_handler =
2✔
2620
            [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
2✔
2621
                std::shared_ptr<SyncSession>, SyncError error) {
2✔
2622
                promise->emplace_value(std::move(error));
2✔
2623
            };
2✔
2624

1✔
2625
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2626
        const auto& error = error_pf.future.get();
2✔
2627
        REQUIRE(error.is_fatal);
2!
2628
        REQUIRE(error.status == ErrorCodes::BadChangeset);
2!
2629
    }
2✔
2630

3✔
2631
    SECTION("interrupted before final bootstrap message") {
6✔
2632
        {
2✔
2633
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2634
            Realm::Config config = interrupted_realm_config;
2✔
2635
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2636
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2637
            config.sync_config->on_sync_client_event_hook =
2✔
2638
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2639
                                                      const SyncClientHookData& data) mutable {
16✔
2640
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
16✔
2641
                        return SyncClientHookAction::NoAction;
12✔
2642
                    }
12✔
2643
                    auto session = weak_session.lock();
4✔
2644
                    if (!session) {
4✔
2645
                        return SyncClientHookAction::NoAction;
×
2646
                    }
×
2647

2✔
2648
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::MoreToCome) {
4✔
2649
                        session->force_close();
2✔
2650
                        promise->emplace_value();
2✔
2651
                        return SyncClientHookAction::TriggerReconnect;
2✔
2652
                    }
2✔
2653
                    return SyncClientHookAction::NoAction;
2✔
2654
                };
2✔
2655
            auto realm = Realm::get_shared_realm(config);
2✔
2656
            {
2✔
2657
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2658
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2659
                mut_subs.insert_or_assign(Query(table));
2✔
2660
                mut_subs.commit();
2✔
2661
            }
2✔
2662

1✔
2663
            interrupted.get();
2✔
2664
            realm->sync_session()->shutdown_and_wait();
2✔
2665
            realm->close();
2✔
2666
        }
2✔
2667

1✔
2668
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2669

1✔
2670
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2671
        // we expected it to be in.
1✔
2672
        {
2✔
2673
            DBOptions options;
2✔
2674
            options.encryption_key = test_util::crypt_key();
2✔
2675
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2676
            util::StderrLogger logger;
2✔
2677
            sync::PendingBootstrapStore bootstrap_store(realm, logger);
2✔
2678
            REQUIRE(bootstrap_store.has_pending());
2!
2679
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2680
            REQUIRE(pending_batch.query_version == 1);
2!
2681
            REQUIRE(!pending_batch.progress);
2!
2682
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2683
            REQUIRE(pending_batch.changesets.size() == 1);
2!
2684

1✔
2685
            check_interrupted_state(realm);
2✔
2686
        }
2✔
2687

1✔
2688
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2689
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2690
        mutate_realm();
2✔
2691

1✔
2692
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
2693
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2694
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2695
        realm->get_latest_subscription_set()
2✔
2696
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
2697
            .get();
2✔
2698
        wait_for_upload(*realm);
2✔
2699
        wait_for_download(*realm);
2✔
2700

1✔
2701
        wait_for_advance(*realm);
2✔
2702
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
2703

1✔
2704
        REQUIRE(table->size() == expected_obj_ids.size());
2!
2705
        for (auto& id : expected_obj_ids) {
6✔
2706
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
2707
        }
6✔
2708
    }
2✔
2709

3✔
2710
    SECTION("interrupted after final bootstrap message before processing") {
6✔
2711
        {
2✔
2712
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2713
            Realm::Config config = interrupted_realm_config;
2✔
2714
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2715
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2716
            config.sync_config->on_sync_client_event_hook =
2✔
2717
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2718
                                                      const SyncClientHookData& data) mutable {
36✔
2719
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2720
                        return SyncClientHookAction::NoAction;
22✔
2721
                    }
22✔
2722
                    auto session = weak_session.lock();
14✔
2723
                    if (!session) {
14✔
2724
                        return SyncClientHookAction::NoAction;
×
2725
                    }
×
2726

7✔
2727
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2728
                        session->force_close();
2✔
2729
                        promise->emplace_value();
2✔
2730
                        return SyncClientHookAction::TriggerReconnect;
2✔
2731
                    }
2✔
2732
                    return SyncClientHookAction::NoAction;
12✔
2733
                };
12✔
2734
            auto realm = Realm::get_shared_realm(config);
2✔
2735
            {
2✔
2736
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2737
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2738
                mut_subs.insert_or_assign(Query(table));
2✔
2739
                mut_subs.commit();
2✔
2740
            }
2✔
2741

1✔
2742
            interrupted.get();
2✔
2743
            realm->sync_session()->shutdown_and_wait();
2✔
2744
            realm->close();
2✔
2745
        }
2✔
2746

1✔
2747
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2748

1✔
2749
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2750
        // we expected it to be in.
1✔
2751
        {
2✔
2752
            DBOptions options;
2✔
2753
            options.encryption_key = test_util::crypt_key();
2✔
2754
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2755
            util::StderrLogger logger;
2✔
2756
            sync::PendingBootstrapStore bootstrap_store(realm, logger);
2✔
2757
            REQUIRE(bootstrap_store.has_pending());
2!
2758
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2759
            REQUIRE(pending_batch.query_version == 1);
2!
2760
            REQUIRE(static_cast<bool>(pending_batch.progress));
2!
2761
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2762
            REQUIRE(pending_batch.changesets.size() == 6);
2!
2763

1✔
2764
            check_interrupted_state(realm);
2✔
2765
        }
2✔
2766

1✔
2767
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2768
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2769
        mutate_realm();
2✔
2770

1✔
2771
        auto [saw_valid_state_promise, saw_valid_state_future] = util::make_promise_future<void>();
2✔
2772
        auto shared_saw_valid_state_promise =
2✔
2773
            std::make_shared<decltype(saw_valid_state_promise)>(std::move(saw_valid_state_promise));
2✔
2774
        // This hook will let us check what the state of the realm is before it's integrated any new download
1✔
2775
        // messages from the server. This should be the full 5 object bootstrap that was received before we
1✔
2776
        // called mutate_realm().
1✔
2777
        interrupted_realm_config.sync_config->on_sync_client_event_hook =
2✔
2778
            [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2779
                                                                     const SyncClientHookData& data) {
18✔
2780
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
2781
                    return SyncClientHookAction::NoAction;
16✔
2782
                }
16✔
2783
                auto session = weak_session.lock();
2✔
2784
                if (!session) {
2✔
2785
                    return SyncClientHookAction::NoAction;
×
2786
                }
×
2787

1✔
2788
                if (data.query_version != 1 || data.batch_state == sync::DownloadBatchState::MoreToCome) {
2✔
2789
                    return SyncClientHookAction::NoAction;
×
2790
                }
×
2791

1✔
2792
                auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
2✔
2793
                auto active_sub_set = session->get_flx_subscription_store()->get_active();
2✔
2794
                auto version_info = session->get_flx_subscription_store()->get_version_info();
2✔
2795
                REQUIRE(version_info.pending_mark == active_sub_set.version());
2!
2796
                REQUIRE(version_info.active == active_sub_set.version());
2!
2797
                REQUIRE(version_info.latest == latest_sub_set.version());
2!
2798
                REQUIRE(latest_sub_set.version() == active_sub_set.version());
2!
2799
                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
2800

1✔
2801
                auto db = SyncSession::OnlyForTesting::get_db(*session);
2✔
2802
                auto tr = db->start_read();
2✔
2803

1✔
2804
                auto table = tr->get_table("class_TopLevel");
2✔
2805
                REQUIRE(table->size() == obj_ids_at_end.size());
2!
2806
                for (auto& id : obj_ids_at_end) {
10✔
2807
                    REQUIRE(table->find_primary_key(Mixed{id}));
10!
2808
                }
10✔
2809

1✔
2810
                promise->emplace_value();
2✔
2811
                return SyncClientHookAction::NoAction;
2✔
2812
            };
2✔
2813

1✔
2814
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
2815
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2816
        saw_valid_state_future.get();
2✔
2817
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2818
        realm->get_latest_subscription_set()
2✔
2819
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
2820
            .get();
2✔
2821
        wait_for_upload(*realm);
2✔
2822
        wait_for_download(*realm);
2✔
2823
        wait_for_advance(*realm);
2✔
2824

1✔
2825
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
2826

1✔
2827
        // After we've downloaded all the mutations there should only by 3 objects left.
1✔
2828
        REQUIRE(table->size() == expected_obj_ids.size());
2!
2829
        for (auto& id : expected_obj_ids) {
6✔
2830
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
2831
        }
6✔
2832
    }
2✔
2833
}
6✔
2834

2835
TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") {
18✔
2836
    static auto server_schema = [] {
10✔
2837
        FLXSyncTestHarness::ServerSchema server_schema;
2✔
2838
        server_schema.queryable_fields = {"queryable_str_field"};
2✔
2839
        server_schema.schema = {
2✔
2840
            {"Asymmetric",
2✔
2841
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
2842
             {
2✔
2843
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
2844
                 {"location", PropertyType::String | PropertyType::Nullable},
2✔
2845
                 {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
2✔
2846
             }},
2✔
2847
            {"Embedded",
2✔
2848
             ObjectSchema::ObjectType::Embedded,
2✔
2849
             {
2✔
2850
                 {"value", PropertyType::String | PropertyType::Nullable},
2✔
2851
             }},
2✔
2852
        };
2✔
2853
        return server_schema;
2✔
2854
    }();
2✔
2855
    static auto harness = std::make_unique<FLXSyncTestHarness>("asymmetric_sync", server_schema);
18✔
2856

9✔
2857
    SECTION("basic object construction") {
18✔
2858
        auto foo_obj_id = ObjectId::gen();
2✔
2859
        auto bar_obj_id = ObjectId::gen();
2✔
2860
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2861
            realm->begin_transaction();
2✔
2862
            CppContext c(realm);
2✔
2863
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
2864
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
2865
            realm->commit_transaction();
2✔
2866
            wait_for_upload(*realm);
2✔
2867
        });
2✔
2868

1✔
2869
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2870
            wait_for_download(*realm);
2✔
2871

1✔
2872
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2873
            REQUIRE(table->size() == 0);
2!
2874
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2875
            // Cannot query asymmetric tables.
1✔
2876
            CHECK_THROWS_AS(new_query.insert_or_assign(Query(table)), LogicError);
2✔
2877
        });
2✔
2878
    }
2✔
2879

9✔
2880
    SECTION("do not allow objects with same key within the same transaction") {
18✔
2881
        auto foo_obj_id = ObjectId::gen();
2✔
2882
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2883
            realm->begin_transaction();
2✔
2884
            CppContext c(realm);
2✔
2885
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
2886
            CHECK_THROWS_WITH(
2✔
2887
                Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "bar"s}})),
2✔
2888
                "Attempting to create an object of type 'Asymmetric' with an existing primary key value 'not "
2✔
2889
                "implemented'");
2✔
2890
            realm->commit_transaction();
2✔
2891
            wait_for_upload(*realm);
2✔
2892
        });
2✔
2893

1✔
2894
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2895
            wait_for_download(*realm);
2✔
2896

1✔
2897
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2898
            REQUIRE(table->size() == 0);
2!
2899
        });
2✔
2900
    }
2✔
2901

9✔
2902
    SECTION("create multiple objects - separate commits") {
18✔
2903
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2904
            CppContext c(realm);
2✔
2905
            for (int i = 0; i < 100; ++i) {
202✔
2906
                realm->begin_transaction();
200✔
2907
                auto obj_id = ObjectId::gen();
200✔
2908
                Object::create(c, realm, "Asymmetric",
200✔
2909
                               std::any(AnyDict{{"_id", obj_id}, {"location", util::format("foo_%1", i)}}));
200✔
2910
                realm->commit_transaction();
200✔
2911
            }
200✔
2912

1✔
2913
            wait_for_upload(*realm);
2✔
2914
            wait_for_download(*realm);
2✔
2915

1✔
2916
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2917
            REQUIRE(table->size() == 0);
2!
2918
        });
2✔
2919
    }
2✔
2920

9✔
2921
    SECTION("create multiple objects - same commit") {
18✔
2922
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2923
            CppContext c(realm);
2✔
2924
            realm->begin_transaction();
2✔
2925
            for (int i = 0; i < 100; ++i) {
202✔
2926
                auto obj_id = ObjectId::gen();
200✔
2927
                Object::create(c, realm, "Asymmetric",
200✔
2928
                               std::any(AnyDict{{"_id", obj_id}, {"location", util::format("foo_%1", i)}}));
200✔
2929
            }
200✔
2930
            realm->commit_transaction();
2✔
2931

1✔
2932
            wait_for_upload(*realm);
2✔
2933
            wait_for_download(*realm);
2✔
2934

1✔
2935
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2936
            REQUIRE(table->size() == 0);
2!
2937
        });
2✔
2938
    }
2✔
2939

9✔
2940
    SECTION("open with schema mismatch on IsAsymmetric") {
18✔
2941
        auto schema = server_schema.schema;
2✔
2942
        schema.find("Asymmetric")->table_type = ObjectSchema::ObjectType::TopLevel;
2✔
2943

1✔
2944
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
2✔
2945
            SyncTestFile config(user, schema, SyncConfig::FLXSyncEnabled{});
2✔
2946
            auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
2947
            auto error_count = 0;
2✔
2948
            auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
2949
                                &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
2950
                ++error_count;
4✔
2951
                if (error_count == 1) {
4✔
2952
                    // Bad changeset detected by the client.
1✔
2953
                    CHECK(err.status == ErrorCodes::BadChangeset);
2!
2954
                }
2✔
2955
                else if (error_count == 2) {
2✔
2956
                    // Server asking for a client reset.
1✔
2957
                    CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
2958
                    CHECK(err.is_client_reset_requested());
2!
2959
                    promise.get_promise().emplace_value(std::move(err));
2✔
2960
                }
2✔
2961
            };
4✔
2962

1✔
2963
            config.sync_config->error_handler = err_handler;
2✔
2964
            auto realm = Realm::get_shared_realm(config);
2✔
2965

1✔
2966
            auto err = error_future.get();
2✔
2967
            CHECK(error_count == 2);
2!
2968
        });
2✔
2969
    }
2✔
2970

9✔
2971
    SECTION("basic embedded object construction") {
18✔
2972
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2973
            realm->begin_transaction();
2✔
2974
            CppContext c(realm);
2✔
2975
            Object::create(c, realm, "Asymmetric",
2✔
2976
                           std::any(AnyDict{{"_id", ObjectId::gen()}, {"embedded_obj", AnyDict{{"value", "foo"s}}}}));
2✔
2977
            realm->commit_transaction();
2✔
2978
            wait_for_upload(*realm);
2✔
2979
        });
2✔
2980

1✔
2981
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2982
            wait_for_download(*realm);
2✔
2983

1✔
2984
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
2985
            REQUIRE(table->size() == 0);
2!
2986
        });
2✔
2987
    }
2✔
2988

9✔
2989
    SECTION("replace embedded object") {
18✔
2990
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
2991
            CppContext c(realm);
2✔
2992
            auto foo_obj_id = ObjectId::gen();
2✔
2993
            realm->begin_transaction();
2✔
2994
            Object::create(c, realm, "Asymmetric",
2✔
2995
                           std::any(AnyDict{{"_id", foo_obj_id}, {"embedded_obj", AnyDict{{"value", "foo"s}}}}));
2✔
2996
            realm->commit_transaction();
2✔
2997
            // Update embedded field to `null`.
1✔
2998
            realm->begin_transaction();
2✔
2999
            Object::create(c, realm, "Asymmetric",
2✔
3000
                           std::any(AnyDict{{"_id", foo_obj_id}, {"embedded_obj", std::any()}}));
2✔
3001
            realm->commit_transaction();
2✔
3002
            // Update embedded field again to a new value.
1✔
3003
            realm->begin_transaction();
2✔
3004
            Object::create(c, realm, "Asymmetric",
2✔
3005
                           std::any(AnyDict{{"_id", foo_obj_id}, {"embedded_obj", AnyDict{{"value", "bar"s}}}}));
2✔
3006
            realm->commit_transaction();
2✔
3007

1✔
3008
            wait_for_upload(*realm);
2✔
3009
            wait_for_download(*realm);
2✔
3010

1✔
3011
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3012
            REQUIRE(table->size() == 0);
2!
3013
        });
2✔
3014
    }
2✔
3015

9✔
3016
    SECTION("asymmetric table not allowed in PBS") {
18✔
3017
        Schema schema{
2✔
3018
            {"Asymmetric2",
2✔
3019
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3020
             {
2✔
3021
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
3022
                 {"location", PropertyType::Int},
2✔
3023
                 {"reading", PropertyType::Int},
2✔
3024
             }},
2✔
3025
        };
2✔
3026

1✔
3027
        SyncTestFile config(harness->app(), bson::Bson{}, schema);
2✔
3028
        REQUIRE_EXCEPTION(
2✔
3029
            Realm::get_shared_realm(config), SchemaValidationFailed,
2✔
3030
            Catch::Matchers::ContainsSubstring("Asymmetric table 'Asymmetric2' not allowed in partition based sync"));
2✔
3031
    }
2✔
3032

9✔
3033
    // Add any new test sections above this point
9✔
3034

9✔
3035
    SECTION("teardown") {
18✔
3036
        harness.reset();
2✔
3037
    }
2✔
3038
}
18✔
3039

3040
TEST_CASE("flx: data ingest - dev mode", "[sync][flx][data ingest][baas]") {
2✔
3041
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
3042
    server_schema.dev_mode_enabled = true;
2✔
3043
    server_schema.schema = Schema{};
2✔
3044

1✔
3045
    auto schema = Schema{{"Asymmetric",
2✔
3046
                          ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3047
                          {
2✔
3048
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3049
                              {"location", PropertyType::String | PropertyType::Nullable},
2✔
3050
                          }},
2✔
3051
                         {"TopLevel",
2✔
3052
                          {
2✔
3053
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3054
                              {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
3055
                          }}};
2✔
3056

1✔
3057
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
2✔
3058

1✔
3059
    auto foo_obj_id = ObjectId::gen();
2✔
3060
    auto bar_obj_id = ObjectId::gen();
2✔
3061

1✔
3062
    harness.do_with_new_realm(
2✔
3063
        [&](SharedRealm realm) {
2✔
3064
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3065
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3066
            new_query.insert_or_assign(Query(table));
2✔
3067
            std::move(new_query).commit();
2✔
3068

1✔
3069
            CppContext c(realm);
2✔
3070
            realm->begin_transaction();
2✔
3071
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
3072
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
3073
            realm->commit_transaction();
2✔
3074

1✔
3075
            wait_for_upload(*realm);
2✔
3076
        },
2✔
3077
        schema);
2✔
3078
}
2✔
3079

3080
TEST_CASE("flx: send client error", "[sync][flx][baas]") {
2✔
3081
    FLXSyncTestHarness harness("flx_client_error");
2✔
3082

1✔
3083
    // An integration error is simulated while bootstrapping.
1✔
3084
    // This results in the client sending an error message to the server.
1✔
3085
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3086
    config.sync_config->simulate_integration_error = true;
2✔
3087

1✔
3088
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
3089
    auto error_count = 0;
2✔
3090
    auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
3091
                        &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
3092
        ++error_count;
4✔
3093
        if (error_count == 1) {
4✔
3094
            // Bad changeset detected by the client.
1✔
3095
            CHECK(err.status == ErrorCodes::BadChangeset);
2!
3096
        }
2✔
3097
        else if (error_count == 2) {
2✔
3098
            // Server asking for a client reset.
1✔
3099
            CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
3100
            CHECK(err.is_client_reset_requested());
2!
3101
            promise.get_promise().emplace_value(std::move(err));
2✔
3102
        }
2✔
3103
    };
4✔
3104

1✔
3105
    config.sync_config->error_handler = err_handler;
2✔
3106
    auto realm = Realm::get_shared_realm(config);
2✔
3107
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3108
    auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3109
    new_query.insert_or_assign(Query(table));
2✔
3110
    new_query.commit();
2✔
3111

1✔
3112
    auto err = error_future.get();
2✔
3113
    CHECK(error_count == 2);
2!
3114
}
2✔
3115

3116
TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") {
6✔
3117
    FLXSyncTestHarness harness("bootstrap_full_sync");
6✔
3118

3✔
3119
    auto setup_subs = [](SharedRealm& realm) {
12✔
3120
        auto table = realm->read_group().get_table("class_TopLevel");
12✔
3121
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
12✔
3122
        new_query.clear();
12✔
3123
        auto col = table->get_column_key("queryable_str_field");
12✔
3124
        new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
12✔
3125
        return new_query.commit();
12✔
3126
    };
12✔
3127

3✔
3128
    auto bar_obj_id = ObjectId::gen();
6✔
3129
    auto bizz_obj_id = ObjectId::gen();
6✔
3130
    auto setup_and_poison_cache = [&] {
6✔
3131
        harness.load_initial_data([&](SharedRealm realm) {
6✔
3132
            CppContext c(realm);
6✔
3133
            Object::create(c, realm, "TopLevel",
6✔
3134
                           std::any(AnyDict{{"_id", bar_obj_id},
6✔
3135
                                            {"queryable_str_field", std::string{"bar"}},
6✔
3136
                                            {"queryable_int_field", static_cast<int64_t>(10)},
6✔
3137
                                            {"non_queryable_field", std::string{"non queryable 2"}}}));
6✔
3138
        });
6✔
3139

3✔
3140
        harness.do_with_new_realm([&](SharedRealm realm) {
6✔
3141
            // first set a subscription to force the creation/caching of a broker snapshot on the server.
3✔
3142
            setup_subs(realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
6✔
3143
            wait_for_advance(*realm);
6✔
3144
            auto table = realm->read_group().get_table("class_TopLevel");
6✔
3145
            REQUIRE(table->find_primary_key(bar_obj_id));
6!
3146

3✔
3147
            // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
3✔
3148
            // wait for a MARK message to come back, we'd miss it in our results.
3✔
3149
            CppContext c(realm);
6✔
3150
            realm->begin_transaction();
6✔
3151
            Object::create(c, realm, "TopLevel",
6✔
3152
                           std::any(AnyDict{{"_id", bizz_obj_id},
6✔
3153
                                            {"queryable_str_field", std::string{"bizz"}},
6✔
3154
                                            {"queryable_int_field", static_cast<int64_t>(15)},
6✔
3155
                                            {"non_queryable_field", std::string{"non queryable 3"}}}));
6✔
3156
            realm->commit_transaction();
6✔
3157
            wait_for_upload(*realm);
6✔
3158
        });
6✔
3159
    };
6✔
3160

3✔
3161
    SECTION("regular subscription change") {
6✔
3162
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3163
        std::atomic<bool> saw_truncated_bootstrap{false};
2✔
3164
        triggered_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
2✔
3165
                                                                      const SyncClientHookData& data) {
27✔
3166
            auto sess = weak_sess.lock();
27✔
3167
            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
27✔
3168
                return SyncClientHookAction::NoAction;
25✔
3169
            }
25✔
3170

1✔
3171
            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3172
            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3173
            REQUIRE(data.num_changesets == 1);
2!
3174
            auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3175
            auto read_tr = db->start_read();
2✔
3176
            auto table = read_tr->get_table("class_TopLevel");
2✔
3177
            REQUIRE(table->find_primary_key(bar_obj_id));
2!
3178
            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3179
            saw_truncated_bootstrap.store(true);
2✔
3180

1✔
3181
            return SyncClientHookAction::NoAction;
2✔
3182
        };
2✔
3183
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3184

1✔
3185
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3186
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3187
        // the changes we're about to create.
1✔
3188
        wait_for_upload(*problem_realm);
2✔
3189
        wait_for_download(*problem_realm);
2✔
3190

1✔
3191
        nlohmann::json command_request = {
2✔
3192
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3193
        };
2✔
3194
        auto resp_body =
2✔
3195
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3196
                .get();
2✔
3197
        REQUIRE(resp_body == "{}");
2!
3198

1✔
3199
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3200
        setup_and_poison_cache();
2✔
3201

1✔
3202
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3203
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3204
        // processed.
1✔
3205
        setup_subs(problem_realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3206
        wait_for_advance(*problem_realm);
2✔
3207

1✔
3208
        REQUIRE(saw_truncated_bootstrap.load());
2!
3209
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3210
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3211
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3212
    }
2✔
3213

3✔
3214
// TODO: remote-baas: This test fails intermittently with Windows remote baas server - to be fixed in RCORE-1674
3✔
3215
#ifndef _WIN32
6✔
3216
    SECTION("disconnect between bootstrap and mark") {
6✔
3217
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3218
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
3219
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3220
            [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &bizz_obj_id,
2✔
3221
             &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
26✔
3222
                auto sess = weak_sess.lock();
26✔
3223
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
26✔
3224
                    return SyncClientHookAction::NoAction;
24✔
3225
                }
24✔
3226

1✔
3227
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3228
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3229
                REQUIRE(data.num_changesets == 1);
2!
3230
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3231
                auto read_tr = db->start_read();
2✔
3232
                auto table = read_tr->get_table("class_TopLevel");
2✔
3233
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3234
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3235

1✔
3236
                sess->pause();
2✔
3237
                promise.get_promise().emplace_value();
2✔
3238
                return SyncClientHookAction::NoAction;
2✔
3239
            };
2✔
3240
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3241

1✔
3242
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3243
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3244
        // the changes we're about to create.
1✔
3245
        wait_for_upload(*problem_realm);
2✔
3246
        wait_for_download(*problem_realm);
2✔
3247

1✔
3248
        nlohmann::json command_request = {
2✔
3249
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3250
        };
2✔
3251
        auto resp_body =
2✔
3252
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3253
                .get();
2✔
3254
        REQUIRE(resp_body == "{}");
2!
3255

1✔
3256
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3257
        setup_and_poison_cache();
2✔
3258

1✔
3259
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3260
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3261
        // processed.
1✔
3262
        auto sub_set = setup_subs(problem_realm);
2✔
3263
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3264

1✔
3265
        interrupted.get();
2✔
3266
        problem_realm->sync_session()->shutdown_and_wait();
2✔
3267
        REQUIRE(!sub_complete_future.is_ready());
2!
3268
        sub_set.refresh();
2✔
3269
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3270

1✔
3271
        problem_realm->sync_session()->resume();
2✔
3272
        sub_complete_future.get();
2✔
3273
        wait_for_advance(*problem_realm);
2✔
3274

1✔
3275
        sub_set.refresh();
2✔
3276
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3277
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3278
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3279
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3280
    }
2✔
3281
#endif
6✔
3282
    SECTION("error/suspend between bootstrap and mark") {
6✔
3283
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3284
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3285
            [&bizz_obj_id, &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) {
27✔
3286
                auto sess = weak_sess.lock();
27✔
3287
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
27✔
3288
                    return SyncClientHookAction::NoAction;
25✔
3289
                }
25✔
3290

1✔
3291
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3292
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3293
                REQUIRE(data.num_changesets == 1);
2!
3294
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3295
                auto read_tr = db->start_read();
2✔
3296
                auto table = read_tr->get_table("class_TopLevel");
2✔
3297
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3298
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3299

1✔
3300
                return SyncClientHookAction::TriggerReconnect;
2✔
3301
            };
2✔
3302
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3303

1✔
3304
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3305
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3306
        // the changes we're about to create.
1✔
3307
        wait_for_upload(*problem_realm);
2✔
3308
        wait_for_download(*problem_realm);
2✔
3309

1✔
3310
        nlohmann::json command_request = {
2✔
3311
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3312
        };
2✔
3313
        auto resp_body =
2✔
3314
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3315
                .get();
2✔
3316
        REQUIRE(resp_body == "{}");
2!
3317

1✔
3318
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3319
        setup_and_poison_cache();
2✔
3320

1✔
3321
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3322
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3323
        // processed.
1✔
3324
        auto sub_set = setup_subs(problem_realm);
2✔
3325
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3326

1✔
3327
        sub_complete_future.get();
2✔
3328
        wait_for_advance(*problem_realm);
2✔
3329

1✔
3330
        sub_set.refresh();
2✔
3331
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3332
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3333
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3334
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3335
    }
2✔
3336
}
6✔
3337

3338
TEST_CASE("flx: convert flx sync realm to bundled realm", "[app][flx][baas]") {
12✔
3339
    static auto foo_obj_id = ObjectId::gen();
12✔
3340
    static auto bar_obj_id = ObjectId::gen();
12✔
3341
    static auto bizz_obj_id = ObjectId::gen();
12✔
3342
    static std::optional<FLXSyncTestHarness> harness;
12✔
3343
    if (!harness) {
12✔
3344
        harness.emplace("bundled_flx_realms");
2✔
3345
        harness->load_initial_data([&](SharedRealm realm) {
2✔
3346
            CppContext c(realm);
2✔
3347
            Object::create(c, realm, "TopLevel",
2✔
3348
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
3349
                                            {"queryable_str_field", "foo"s},
2✔
3350
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3351
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
3352
            Object::create(c, realm, "TopLevel",
2✔
3353
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
3354
                                            {"queryable_str_field", "bar"s},
2✔
3355
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
3356
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
3357
        });
2✔
3358
    }
2✔
3359

6✔
3360
    SECTION("flx to flx (should succeed)") {
12✔
3361
        create_user_and_log_in(harness->app());
2✔
3362
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3363
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3364
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3365
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3366
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3367
            auto subs = std::move(mut_subs).commit();
2✔
3368

1✔
3369
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3370
            wait_for_advance(*realm);
2✔
3371

1✔
3372
            realm->convert(target_config);
2✔
3373
        });
2✔
3374

1✔
3375
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3376

1✔
3377
        target_realm->begin_transaction();
2✔
3378
        CppContext c(target_realm);
2✔
3379
        Object::create(c, target_realm, "TopLevel",
2✔
3380
                       std::any(AnyDict{{"_id", bizz_obj_id},
2✔
3381
                                        {"queryable_str_field", "bizz"s},
2✔
3382
                                        {"queryable_int_field", static_cast<int64_t>(15)},
2✔
3383
                                        {"non_queryable_field", "non queryable 3"s}}));
2✔
3384
        target_realm->commit_transaction();
2✔
3385

1✔
3386
        wait_for_upload(*target_realm);
2✔
3387
        wait_for_download(*target_realm);
2✔
3388

1✔
3389
        auto latest_subs = target_realm->get_active_subscription_set();
2✔
3390
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3391
        REQUIRE(latest_subs.size() == 1);
2!
3392
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
3393
        REQUIRE(latest_subs.at(0).query_string ==
2!
3394
                Query(table).greater(table->get_column_key("queryable_int_field"), 5).get_description());
2✔
3395

1✔
3396
        REQUIRE(table->size() == 2);
2!
3397
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3398
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3399
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3400
    }
2✔
3401

6✔
3402
    SECTION("flx to local (should succeed)") {
12✔
3403
        TestFile target_config;
2✔
3404

1✔
3405
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3406
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3407
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3408
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3409
            auto subs = std::move(mut_subs).commit();
2✔
3410

1✔
3411
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3412
            wait_for_advance(*realm);
2✔
3413

1✔
3414
            target_config.schema = realm->schema();
2✔
3415
            target_config.schema_version = realm->schema_version();
2✔
3416
            realm->convert(target_config);
2✔
3417
        });
2✔
3418

1✔
3419
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3420
        REQUIRE_THROWS(target_realm->get_active_subscription_set());
2✔
3421

1✔
3422
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3423
        REQUIRE(table->size() == 2);
2!
3424
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3425
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3426
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3427
    }
2✔
3428

6✔
3429
    SECTION("flx to pbs (should fail to convert)") {
12✔
3430
        create_user_and_log_in(harness->app());
2✔
3431
        SyncTestFile target_config(harness->app()->current_user(), "12345"s, harness->schema());
2✔
3432
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3433
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3434
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3435
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3436
            auto subs = std::move(mut_subs).commit();
2✔
3437

1✔
3438
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3439
            wait_for_advance(*realm);
2✔
3440

1✔
3441
            REQUIRE_THROWS(realm->convert(target_config));
2✔
3442
        });
2✔
3443
    }
2✔
3444

6✔
3445
    SECTION("pbs to flx (should fail to convert)") {
12✔
3446
        create_user_and_log_in(harness->app());
2✔
3447
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3448

1✔
3449
        auto pbs_app_config = minimal_app_config(harness->app()->base_url(), "pbs_to_flx_convert", harness->schema());
2✔
3450

1✔
3451
        TestAppSession pbs_app_session(create_app(pbs_app_config));
2✔
3452
        SyncTestFile source_config(pbs_app_session.app()->current_user(), "54321"s, pbs_app_config.schema);
2✔
3453
        auto realm = Realm::get_shared_realm(source_config);
2✔
3454

1✔
3455
        realm->begin_transaction();
2✔
3456
        CppContext c(realm);
2✔
3457
        Object::create(c, realm, "TopLevel",
2✔
3458
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3459
                                        {"queryable_str_field", "foo"s},
2✔
3460
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3461
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3462
        realm->commit_transaction();
2✔
3463

1✔
3464
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3465
    }
2✔
3466

6✔
3467
    SECTION("local to flx (should fail to convert)") {
12✔
3468
        TestFile source_config;
2✔
3469
        source_config.schema = harness->schema();
2✔
3470
        source_config.schema_version = 1;
2✔
3471

1✔
3472
        auto realm = Realm::get_shared_realm(source_config);
2✔
3473
        auto foo_obj_id = ObjectId::gen();
2✔
3474

1✔
3475
        realm->begin_transaction();
2✔
3476
        CppContext c(realm);
2✔
3477
        Object::create(c, realm, "TopLevel",
2✔
3478
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3479
                                        {"queryable_str_field", "foo"s},
2✔
3480
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3481
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3482
        realm->commit_transaction();
2✔
3483

1✔
3484
        create_user_and_log_in(harness->app());
2✔
3485
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3486

1✔
3487
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3488
    }
2✔
3489

6✔
3490
    // Add new sections before this
6✔
3491
    SECTION("teardown") {
12✔
3492
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
3493
        harness.reset();
2✔
3494
    }
2✔
3495
}
12✔
3496

3497
TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][flx][compensating write][baas]") {
2✔
3498
    AppCreateConfig::ServiceRole role;
2✔
3499
    role.name = "compensating_write_perms";
2✔
3500

1✔
3501
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
3502
    doc_filters.read = true;
2✔
3503
    doc_filters.write =
2✔
3504
        nlohmann::json{{"queryable_str_field", nlohmann::json{{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
3505
    role.document_filters = doc_filters;
2✔
3506

1✔
3507
    role.insert_filter = true;
2✔
3508
    role.delete_filter = true;
2✔
3509
    role.read = true;
2✔
3510
    role.write = true;
2✔
3511
    FLXSyncTestHarness::ServerSchema server_schema{
2✔
3512
        g_simple_embedded_obj_schema, {"queryable_str_field", "queryable_int_field"}, {role}};
2✔
3513
    FLXSyncTestHarness::Config harness_config("flx_bad_query", server_schema);
2✔
3514
    harness_config.reconnect_mode = ReconnectMode::testing;
2✔
3515
    FLXSyncTestHarness harness(std::move(harness_config));
2✔
3516

1✔
3517
    auto test_obj_id_1 = ObjectId::gen();
2✔
3518
    auto test_obj_id_2 = ObjectId::gen();
2✔
3519

1✔
3520
    create_user_and_log_in(harness.app());
2✔
3521
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3522
    config.cache = false;
2✔
3523

1✔
3524
    {
2✔
3525
        auto error_received_pf = util::make_promise_future<void>();
2✔
3526
        config.sync_config->on_sync_client_event_hook =
2✔
3527
            [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
2✔
3528
                std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
26✔
3529
                if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
26✔
3530
                    return SyncClientHookAction::NoAction;
20✔
3531
                }
20✔
3532
                auto session = weak_session.lock();
6✔
3533
                REQUIRE(session);
6!
3534

3✔
3535
                auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
6✔
3536

3✔
3537
                if (error_code == sync::ProtocolError::initial_sync_not_completed) {
6✔
3538
                    return SyncClientHookAction::NoAction;
4✔
3539
                }
4✔
3540

1✔
3541
                REQUIRE(error_code == sync::ProtocolError::compensating_write);
2!
3542
                REQUIRE_FALSE(data.error_info->compensating_writes.empty());
2!
3543
                promise.get_promise().emplace_value();
2✔
3544

1✔
3545
                return SyncClientHookAction::TriggerReconnect;
2✔
3546
            };
2✔
3547

1✔
3548
        auto realm = Realm::get_shared_realm(config);
2✔
3549
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
3550
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
3551
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3552
        new_query.insert_or_assign(Query(table).equal(queryable_str_field, "bizz"));
2✔
3553
        std::move(new_query).commit();
2✔
3554

1✔
3555
        wait_for_upload(*realm);
2✔
3556
        wait_for_download(*realm);
2✔
3557

1✔
3558
        CppContext c(realm);
2✔
3559
        realm->begin_transaction();
2✔
3560
        Object::create(c, realm, "TopLevel",
2✔
3561
                       util::Any(AnyDict{
2✔
3562
                           {"_id", test_obj_id_1},
2✔
3563
                           {"queryable_str_field", std::string{"foo"}},
2✔
3564
                       }));
2✔
3565
        realm->commit_transaction();
2✔
3566

1✔
3567
        realm->begin_transaction();
2✔
3568
        Object::create(c, realm, "TopLevel",
2✔
3569
                       util::Any(AnyDict{
2✔
3570
                           {"_id", test_obj_id_2},
2✔
3571
                           {"queryable_str_field", std::string{"baz"}},
2✔
3572
                       }));
2✔
3573
        realm->commit_transaction();
2✔
3574

1✔
3575
        error_received_pf.future.get();
2✔
3576
        realm->sync_session()->shutdown_and_wait();
2✔
3577
        config.sync_config->on_sync_client_event_hook = {};
2✔
3578
    }
2✔
3579

1✔
3580
    _impl::RealmCoordinator::clear_all_caches();
2✔
3581

1✔
3582
    std::mutex errors_mutex;
2✔
3583
    std::condition_variable new_compensating_write;
2✔
3584
    std::vector<std::pair<ObjectId, sync::version_type>> error_to_download_version;
2✔
3585
    std::vector<sync::CompensatingWriteErrorInfo> compensating_writes;
2✔
3586
    sync::version_type download_version;
2✔
3587

1✔
3588
    config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
2✔
3589
                                                        const SyncClientHookData& data) mutable {
14✔
3590
        auto session = weak_session.lock();
14✔
3591
        if (!session) {
14✔
3592
            return SyncClientHookAction::NoAction;
×
3593
        }
×
3594

8✔
3595
        if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
14✔
3596
            if (data.event == SyncClientHookEvent::DownloadMessageReceived) {
10✔
3597
                download_version = data.progress.download.server_version;
5✔
3598
            }
5✔
3599

6✔
3600
            return SyncClientHookAction::NoAction;
10✔
3601
        }
10✔
3602

2✔
3603
        auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
4✔
3604
        REQUIRE(error_code == sync::ProtocolError::compensating_write);
4!
3605
        REQUIRE(!data.error_info->compensating_writes.empty());
4!
3606
        std::lock_guard<std::mutex> lk(errors_mutex);
4✔
3607
        for (const auto& compensating_write : data.error_info->compensating_writes) {
4✔
3608
            error_to_download_version.emplace_back(compensating_write.primary_key.get_object_id(),
4✔
3609
                                                   data.error_info->compensating_write_server_version);
4✔
3610
        }
4✔
3611

2✔
3612
        return SyncClientHookAction::NoAction;
4✔
3613
    };
4✔
3614

1✔
3615
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) {
4✔
3616
        std::unique_lock<std::mutex> lk(errors_mutex);
4✔
3617
        REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
4!
3618
        for (const auto& compensating_write : error.compensating_writes_info) {
4✔
3619
            auto tracked_error = std::find_if(error_to_download_version.begin(), error_to_download_version.end(),
4✔
3620
                                              [&](const auto& pair) {
6✔
3621
                                                  return pair.first == compensating_write.primary_key.get_object_id();
6✔
3622
                                              });
6✔
3623
            REQUIRE(tracked_error != error_to_download_version.end());
4!
3624
            CHECK(tracked_error->second <= download_version);
4!
3625
            compensating_writes.push_back(compensating_write);
4✔
3626
        }
4✔
3627
        new_compensating_write.notify_one();
4✔
3628
    };
4✔
3629

1✔
3630
    auto realm = Realm::get_shared_realm(config);
2✔
3631

1✔
3632
    wait_for_upload(*realm);
2✔
3633
    wait_for_download(*realm);
2✔
3634

1✔
3635
    std::unique_lock<std::mutex> lk(errors_mutex);
2✔
3636
    new_compensating_write.wait_for(lk, std::chrono::seconds(30), [&] {
2✔
3637
        return compensating_writes.size() == 2;
2✔
3638
    });
2✔
3639

1✔
3640
    REQUIRE(compensating_writes.size() == 2);
2!
3641
    auto& write_info = compensating_writes[0];
2✔
3642
    CHECK(write_info.primary_key.is_type(type_ObjectId));
2!
3643
    CHECK(write_info.primary_key.get_object_id() == test_obj_id_1);
2!
3644
    CHECK(write_info.object_name == "TopLevel");
2!
3645
    CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring("object is outside of the current query view"));
2✔
3646

1✔
3647
    write_info = compensating_writes[1];
2✔
3648
    REQUIRE(write_info.primary_key.is_type(type_ObjectId));
2!
3649
    REQUIRE(write_info.primary_key.get_object_id() == test_obj_id_2);
2!
3650
    REQUIRE(write_info.object_name == "TopLevel");
2!
3651
    REQUIRE(write_info.reason == util::format("write to \"%1\" in table \"TopLevel\" not allowed", test_obj_id_2));
2!
3652
    auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
3653
    REQUIRE(top_level_table->is_empty());
2!
3654
}
2✔
3655

3656
TEST_CASE("flx: bootstrap changesets are applied continuously", "[sync][flx][bootstrap][baas]") {
2✔
3657
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
2✔
3658
    fill_large_array_schema(harness);
2✔
3659

1✔
3660
    std::unique_ptr<std::thread> th;
2✔
3661
    sync::version_type user_commit_version = UINT_FAST64_MAX;
2✔
3662
    sync::version_type bootstrap_version = UINT_FAST64_MAX;
2✔
3663
    SharedRealm realm;
2✔
3664
    std::condition_variable cv;
2✔
3665
    std::mutex mutex;
2✔
3666
    bool allow_to_commit = false;
2✔
3667

1✔
3668
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3669
    auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
3670
    auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
3671
    config.sync_config->on_sync_client_event_hook =
2✔
3672
        [promise = std::move(shared_promise), &th, &realm, &user_commit_version, &bootstrap_version, &cv, &mutex,
2✔
3673
         &allow_to_commit](std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) {
50✔
3674
            if (data.query_version == 0) {
50✔
3675
                return SyncClientHookAction::NoAction;
12✔
3676
            }
12✔
3677
            if (data.event != SyncClientHookEvent::DownloadMessageIntegrated) {
38✔
3678
                return SyncClientHookAction::NoAction;
26✔
3679
            }
26✔
3680
            auto session = weak_session.lock();
12✔
3681
            if (!session) {
12✔
3682
                return SyncClientHookAction::NoAction;
×
3683
            }
×
3684
            if (data.batch_state != sync::DownloadBatchState::MoreToCome) {
12✔
3685
                // Read version after bootstrap is done.
1✔
3686
                auto db = TestHelper::get_db(realm);
2✔
3687
                ReadTransaction rt(db);
2✔
3688
                bootstrap_version = rt.get_version();
2✔
3689
                {
2✔
3690
                    std::lock_guard<std::mutex> lock(mutex);
2✔
3691
                    allow_to_commit = true;
2✔
3692
                }
2✔
3693
                cv.notify_one();
2✔
3694
                session->force_close();
2✔
3695
                promise->emplace_value();
2✔
3696
                return SyncClientHookAction::NoAction;
2✔
3697
            }
2✔
3698

5✔
3699
            if (th) {
10✔
3700
                return SyncClientHookAction::NoAction;
8✔
3701
            }
8✔
3702

1✔
3703
            auto func = [&] {
2✔
3704
                // Attempt to commit a local change after the first bootstrap batch was committed.
1✔
3705
                auto db = TestHelper::get_db(realm);
2✔
3706
                WriteTransaction wt(db);
2✔
3707
                TableRef table = wt.get_table("class_TopLevel");
2✔
3708
                table->create_object_with_primary_key(ObjectId::gen());
2✔
3709
                {
2✔
3710
                    std::unique_lock<std::mutex> lock(mutex);
2✔
3711
                    // Wait to commit until we read the final bootstrap version.
1✔
3712
                    cv.wait(lock, [&] {
2✔
3713
                        return allow_to_commit;
2✔
3714
                    });
2✔
3715
                }
2✔
3716
                user_commit_version = wt.commit();
2✔
3717
            };
2✔
3718
            th = std::make_unique<std::thread>(std::move(func));
2✔
3719

1✔
3720
            return SyncClientHookAction::NoAction;
2✔
3721
        };
2✔
3722

1✔
3723
    realm = Realm::get_shared_realm(config);
2✔
3724
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3725
    Query query(table);
2✔
3726
    {
2✔
3727
        auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3728
        new_subs.insert_or_assign(query);
2✔
3729
        new_subs.commit();
2✔
3730
    }
2✔
3731
    interrupted.get();
2✔
3732
    th->join();
2✔
3733

1✔
3734
    // The user commit is the last one.
1✔
3735
    CHECK(user_commit_version == bootstrap_version + 1);
2!
3736
}
2✔
3737

3738
TEST_CASE("flx: open realm + register subscription callack while bootstrapping",
3739
          "[sync][flx][bootstrap][async open][baas]") {
12✔
3740
    FLXSyncTestHarness harness("flx_bootstrap_batching");
12✔
3741
    auto foo_obj_id = ObjectId::gen();
12✔
3742
    harness.load_initial_data([&](SharedRealm realm) {
12✔
3743
        CppContext c(realm);
12✔
3744
        Object::create(c, realm, "TopLevel",
12✔
3745
                       std::any(AnyDict{{"_id", foo_obj_id},
12✔
3746
                                        {"queryable_str_field", "foo"s},
12✔
3747
                                        {"queryable_int_field", static_cast<int64_t>(5)},
12✔
3748
                                        {"non_queryable_field", "created as initial data seed"s}}));
12✔
3749
    });
12✔
3750
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
12✔
3751

6✔
3752
    std::atomic<bool> subscription_invoked = false;
12✔
3753
    auto subscription_pf = util::make_promise_future<bool>();
12✔
3754
    // create a subscription to commit when realm is open for the first time or asked to rerun on open
6✔
3755
    auto init_subscription_callback_with_promise =
12✔
3756
        [&, promise_holder = util::CopyablePromiseHolder(std::move(subscription_pf.promise))](
12✔
3757
            std::shared_ptr<Realm> realm) mutable {
10✔
3758
            REQUIRE(realm);
8!
3759
            auto table = realm->read_group().get_table("class_TopLevel");
8✔
3760
            Query query(table);
8✔
3761
            auto subscription = realm->get_latest_subscription_set();
8✔
3762
            auto mutable_subscription = subscription.make_mutable_copy();
8✔
3763
            mutable_subscription.insert_or_assign(query);
8✔
3764
            auto promise = promise_holder.get_promise();
8✔
3765
            mutable_subscription.commit();
8✔
3766
            subscription_invoked = true;
8✔
3767
            promise.emplace_value(true);
8✔
3768
        };
8✔
3769
    // verify that the subscription has changed the database
6✔
3770
    auto verify_subscription = [](SharedRealm realm) {
12✔
3771
        REQUIRE(realm);
12!
3772
        auto table_ref = realm->read_group().get_table("class_TopLevel");
12✔
3773
        REQUIRE(table_ref);
12!
3774
        REQUIRE(table_ref->get_column_count() == 4);
12!
3775
        REQUIRE(table_ref->get_column_key("_id"));
12!
3776
        REQUIRE(table_ref->get_column_key("queryable_str_field"));
12!
3777
        REQUIRE(table_ref->get_column_key("queryable_int_field"));
12!
3778
        REQUIRE(table_ref->get_column_key("non_queryable_field"));
12!
3779
        REQUIRE(table_ref->size() == 1);
12!
3780
        auto str_col = table_ref->get_column_key("queryable_str_field");
12✔
3781
        REQUIRE(table_ref->get_object(0).get<String>(str_col) == "foo");
12!
3782
        return true;
12✔
3783
    };
12✔
3784

6✔
3785
    SECTION("Sync open") {
12✔
3786
        // sync open with subscription callback. Subscription will be run, since this is the first time that realm is
1✔
3787
        // opened
1✔
3788
        subscription_invoked = false;
2✔
3789
        config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
3790
        auto realm = Realm::get_shared_realm(config);
2✔
3791
        REQUIRE(subscription_pf.future.get());
2!
3792
        auto sb = realm->get_latest_subscription_set();
2✔
3793
        auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
3794
        auto state = future.get();
2✔
3795
        REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
3796
        realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
3797
        REQUIRE(verify_subscription(realm));
2!
3798
    }
2✔
3799

6✔
3800
    SECTION("Sync Open + Async Open") {
12✔
3801
        {
2✔
3802
            subscription_invoked = false;
2✔
3803
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
3804
            auto realm = Realm::get_shared_realm(config);
2✔
3805
            REQUIRE(subscription_pf.future.get());
2!
3806
            auto sb = realm->get_latest_subscription_set();
2✔
3807
            auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
3808
            auto state = future.get();
2✔
3809
            REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
3810
            realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
3811
            REQUIRE(verify_subscription(realm));
2!
3812
        }
2✔
3813
        {
2✔
3814
            auto subscription_pf_async = util::make_promise_future<bool>();
2✔
3815
            auto init_subscription_asyc_callback =
2✔
3816
                [promise_holder_async = util::CopyablePromiseHolder(std::move(subscription_pf_async.promise))](
2✔
3817
                    std::shared_ptr<Realm> realm) mutable {
2✔
3818
                    REQUIRE(realm);
2!
3819
                    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3820
                    Query query(table);
2✔
3821
                    auto subscription = realm->get_latest_subscription_set();
2✔
3822
                    auto mutable_subscription = subscription.make_mutable_copy();
2✔
3823
                    mutable_subscription.insert_or_assign(query);
2✔
3824
                    auto promise = promise_holder_async.get_promise();
2✔
3825
                    mutable_subscription.commit();
2✔
3826
                    promise.emplace_value(true);
2✔
3827
                };
2✔
3828
            auto open_realm_pf = util::make_promise_future<bool>();
2✔
3829
            auto open_realm_completed_callback =
2✔
3830
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
3831
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
2✔
3832
                    auto promise = promise_holder.get_promise();
2✔
3833
                    if (err)
2✔
NEW
3834
                        promise.emplace_value(false);
×
3835
                    else
2✔
3836
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
2✔
3837
                };
2✔
3838

1✔
3839
            config.sync_config->subscription_initializer = init_subscription_asyc_callback;
2✔
3840
            config.sync_config->rerun_init_subscription_on_open = true;
2✔
3841
            auto async_open = Realm::get_synchronized_realm(config);
2✔
3842
            async_open->start(open_realm_completed_callback);
2✔
3843
            REQUIRE(open_realm_pf.future.get());
2!
3844
            REQUIRE(subscription_pf_async.future.get());
2!
3845
            config.sync_config->rerun_init_subscription_on_open = false;
2✔
3846
            auto realm = Realm::get_shared_realm(config);
2✔
3847
            REQUIRE(realm->get_latest_subscription_set().version() == 2);
2!
3848
            REQUIRE(realm->get_active_subscription_set().version() == 2);
2!
3849
        }
2✔
3850
    }
2✔
3851

6✔
3852
    SECTION("Async open") {
12✔
3853
        SECTION("Initial async open with no rerun on open set") {
8✔
3854
            // subscription will be run since this is the first time we are opening the realm file.
2✔
3855
            auto open_realm_pf = util::make_promise_future<bool>();
4✔
3856
            auto open_realm_completed_callback =
4✔
3857
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
4✔
3858
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
3859
                    auto promise = promise_holder.get_promise();
4✔
3860
                    if (err)
4✔
3861
                        promise.emplace_value(false);
×
3862
                    else
4✔
3863
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
4✔
3864
                };
4✔
3865

2✔
3866
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
4✔
3867
            auto async_open = Realm::get_synchronized_realm(config);
4✔
3868
            async_open->start(open_realm_completed_callback);
4✔
3869
            REQUIRE(open_realm_pf.future.get());
4!
3870
            REQUIRE(subscription_pf.future.get());
4!
3871

2✔
3872
            SECTION("rerun on open = false. Subscription not run") {
4✔
3873
                subscription_invoked = false;
2✔
3874
                auto async_open = Realm::get_synchronized_realm(config);
2✔
3875
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
3876
                auto open_realm_completed_callback =
2✔
3877
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
3878
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
3879
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
3880
                        // below
1✔
3881
                        promise_holder.get_promise().emplace_value(true);
2✔
3882
                    };
2✔
3883
                async_open->start(open_realm_completed_callback);
2✔
3884
                REQUIRE(open_realm_pf.future.get());
2!
3885
                REQUIRE_FALSE(subscription_invoked.load());
2!
3886
            }
2✔
3887

2✔
3888
            SECTION("rerun on open = true. Subscription not run cause realm already opened once") {
4✔
3889
                subscription_invoked = false;
2✔
3890
                auto realm = Realm::get_shared_realm(config);
2✔
3891
                auto init_subscription = [&subscription_invoked](std::shared_ptr<Realm> realm) mutable {
1✔
NEW
3892
                    REQUIRE(realm);
×
NEW
3893
                    auto table = realm->read_group().get_table("class_TopLevel");
×
NEW
3894
                    Query query(table);
×
NEW
3895
                    auto subscription = realm->get_latest_subscription_set();
×
NEW
3896
                    auto mutable_subscription = subscription.make_mutable_copy();
×
NEW
3897
                    mutable_subscription.insert_or_assign(query);
×
NEW
3898
                    mutable_subscription.commit();
×
NEW
3899
                    subscription_invoked.store(true);
×
NEW
3900
                };
×
3901
                config.sync_config->rerun_init_subscription_on_open = true;
2✔
3902
                config.sync_config->subscription_initializer = init_subscription;
2✔
3903
                auto async_open = Realm::get_synchronized_realm(config);
2✔
3904
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
3905
                auto open_realm_completed_callback =
2✔
3906
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
3907
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
3908
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
3909
                        // below
1✔
3910
                        promise_holder.get_promise().emplace_value(true);
2✔
3911
                    };
2✔
3912
                async_open->start(open_realm_completed_callback);
2✔
3913
                REQUIRE(open_realm_pf.future.get());
2!
3914
                REQUIRE_FALSE(subscription_invoked.load());
2!
3915
                REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
3916
                REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
3917
            }
2✔
3918
        }
4✔
3919

4✔
3920
        SECTION("rerun on open set for multiple async open tasks (subscription runs only once)") {
8✔
3921
            auto init_subscription = [](std::shared_ptr<Realm> realm) mutable {
8✔
3922
                REQUIRE(realm);
8!
3923
                auto table = realm->read_group().get_table("class_TopLevel");
8✔
3924
                Query query(table);
8✔
3925
                auto subscription = realm->get_latest_subscription_set();
8✔
3926
                auto mutable_subscription = subscription.make_mutable_copy();
8✔
3927
                mutable_subscription.insert_or_assign(query);
8✔
3928
                mutable_subscription.commit();
8✔
3929
            };
8✔
3930

2✔
3931
            auto open_task1_pf = util::make_promise_future<SharedRealm>();
4✔
3932
            auto open_task2_pf = util::make_promise_future<SharedRealm>();
4✔
3933
            auto open_callback1 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task1_pf.promise))](
4✔
3934
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
3935
                REQUIRE_FALSE(err);
4!
3936
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
3937
                REQUIRE(realm);
4!
3938
                promise_holder.get_promise().emplace_value(realm);
4✔
3939
            };
4✔
3940
            auto open_callback2 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task2_pf.promise))](
4✔
3941
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
3942
                REQUIRE_FALSE(err);
4!
3943
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
3944
                REQUIRE(realm);
4!
3945
                promise_holder.get_promise().emplace_value(realm);
4✔
3946
            };
4✔
3947

2✔
3948
            config.sync_config->rerun_init_subscription_on_open = true;
4✔
3949
            config.sync_config->subscription_initializer = init_subscription;
4✔
3950

2✔
3951
            SECTION("Realm was already created, but we want to rerun on first open using multiple tasks") {
4✔
3952
                {
2✔
3953
                    subscription_invoked = false;
2✔
3954
                    auto realm = Realm::get_shared_realm(config);
2✔
3955
                    auto sb = realm->get_latest_subscription_set();
2✔
3956
                    auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
3957
                    auto state = future.get();
2✔
3958
                    REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
3959
                    realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
3960
                    REQUIRE(verify_subscription(realm));
2!
3961
                    REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
3962
                    REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
3963
                }
2✔
3964

1✔
3965
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
3966
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
3967
                async_open_task1->start(open_callback1);
2✔
3968
                async_open_task2->start(open_callback2);
2✔
3969

1✔
3970
                auto realm1 = open_task1_pf.future.get();
2✔
3971
                auto realm2 = open_task2_pf.future.get();
2✔
3972

1✔
3973
                const auto version_expected = 2;
2✔
3974
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
3975
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
3976
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
3977
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
3978
                REQUIRE(r1_latest == version_expected);
2!
3979
                REQUIRE(r1_active == version_expected);
2!
3980
            }
2✔
3981
            SECTION("First time realm is created but opened via open async. Both tasks could run the subscription") {
4✔
3982
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
3983
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
3984
                async_open_task1->start(open_callback1);
2✔
3985
                async_open_task2->start(open_callback2);
2✔
3986
                auto realm1 = open_task1_pf.future.get();
2✔
3987
                auto realm2 = open_task2_pf.future.get();
2✔
3988

1✔
3989
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
3990
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
3991
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
3992
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
3993
                // the callback may be run twice, if task1 is the first task to open realm
1✔
3994
                // but it is scheduled after tasks2, which have opened realm later but
1✔
3995
                // by the time it runs, subscription version is equal to 0 (realm creation).
1✔
3996
                // This can only happen the first time that realm is created. All the other times
1✔
3997
                // the init_sb callback is guaranteed to run once.
1✔
3998
                REQUIRE(r1_latest >= 1);
2!
3999
                REQUIRE(r1_latest <= 2);
2!
4000
                REQUIRE(r1_active >= 1);
2!
4001
                REQUIRE(r1_active <= 2);
2!
4002
            }
2✔
4003
        }
4✔
4004
    }
8✔
4005
}
12✔
4006
TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset][async open][baas]") {
2✔
4007
    FLXSyncTestHarness harness("flx_bootstrap_batching");
2✔
4008
    auto foo_obj_id = ObjectId::gen();
2✔
4009
    std::atomic<bool> subscription_invoked = false;
2✔
4010
    harness.load_initial_data([&](SharedRealm realm) {
2✔
4011
        CppContext c(realm);
2✔
4012
        Object::create(c, realm, "TopLevel",
2✔
4013
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
4014
                                        {"queryable_str_field", "foo"s},
2✔
4015
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
4016
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
4017
    });
2✔
4018
    SyncTestFile realm_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
4019

1✔
4020
    auto subscription_callback = [&](std::shared_ptr<Realm> realm) {
2✔
4021
        REQUIRE(realm);
2!
4022
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
4023
        Query query(table);
2✔
4024
        auto subscription = realm->get_latest_subscription_set();
2✔
4025
        auto mutable_subscription = subscription.make_mutable_copy();
2✔
4026
        mutable_subscription.insert_or_assign(query);
2✔
4027
        subscription_invoked = true;
2✔
4028
        mutable_subscription.commit();
2✔
4029
    };
2✔
4030

1✔
4031
    auto before_callback_called = util::make_promise_future<void>();
2✔
4032
    auto after_callback_called = util::make_promise_future<void>();
2✔
4033
    realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
4034
    realm_config.sync_config->subscription_initializer = subscription_callback;
2✔
4035

1✔
4036
    realm_config.sync_config->on_sync_client_event_hook =
2✔
4037
        [&, client_reset_triggered = false](std::weak_ptr<SyncSession> weak_sess,
2✔
4038
                                            const SyncClientHookData& event_data) mutable {
23✔
4039
            auto sess = weak_sess.lock();
23✔
4040
            if (!sess) {
23✔
4041
                return SyncClientHookAction::NoAction;
×
4042
            }
×
4043
            if (sess->path() != realm_config.path) {
23✔
4044
                return SyncClientHookAction::NoAction;
16✔
4045
            }
16✔
4046

4✔
4047
            if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
7✔
4048
                return SyncClientHookAction::NoAction;
5✔
4049
            }
5✔
4050

1✔
4051
            if (client_reset_triggered) {
2✔
4052
                return SyncClientHookAction::NoAction;
×
4053
            }
×
4054
            client_reset_triggered = true;
2✔
4055
            reset_utils::trigger_client_reset(harness.session().app_session());
2✔
4056
            return SyncClientHookAction::EarlyReturn;
2✔
4057
        };
2✔
4058

1✔
4059
    realm_config.sync_config->notify_before_client_reset =
2✔
4060
        [promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))](
2✔
4061
            std::shared_ptr<Realm> realm) mutable {
2✔
4062
            CHECK(realm->schema_version() == 1);
2!
4063
            promise.get_promise().emplace_value();
2✔
4064
        };
2✔
4065

1✔
4066
    realm_config.sync_config->notify_after_client_reset =
2✔
4067
        [promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))](
2✔
4068
            std::shared_ptr<Realm> realm, ThreadSafeReference, bool) mutable {
2✔
4069
            CHECK(realm->schema_version() == 1);
2!
4070
            promise.get_promise().emplace_value();
2✔
4071
        };
2✔
4072

1✔
4073
    auto realm_task = Realm::get_synchronized_realm(realm_config);
2✔
4074
    auto realm_pf = util::make_promise_future<SharedRealm>();
2✔
4075
    realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))](
2✔
4076
                          ThreadSafeReference ref, std::exception_ptr ex) mutable {
2✔
4077
        auto promise = promise_holder.get_promise();
2✔
4078
        if (ex) {
2✔
4079
            try {
×
4080
                std::rethrow_exception(ex);
×
4081
            }
×
4082
            catch (...) {
×
4083
                promise.set_error(exception_to_status());
×
4084
            }
×
4085
            return;
×
4086
        }
2✔
4087
        auto realm = Realm::get_shared_realm(std::move(ref));
2✔
4088
        if (!realm) {
2✔
4089
            promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"});
×
4090
        }
×
4091
        promise.emplace_value(std::move(realm));
2✔
4092
    });
2✔
4093
    auto realm = realm_pf.future.get();
2✔
4094
    before_callback_called.future.get();
2✔
4095
    after_callback_called.future.get();
2✔
4096
    REQUIRE(subscription_invoked.load());
2!
4097
}
2✔
4098

4099
// Test that resending pending subscription sets does not cause any inconsistencies in the progress cursors.
4100
TEST_CASE("flx sync: resend pending subscriptions when reconnecting", "[sync][flx][baas]") {
2✔
4101
    FLXSyncTestHarness harness("flx_pending_subscriptions", {g_large_array_schema, {"queryable_int_field"}});
2✔
4102

1✔
4103
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
4104
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
4105
                                          SyncConfig::FLXSyncEnabled{});
2✔
4106
    interrupted_realm_config.cache = false;
2✔
4107

1✔
4108
    {
2✔
4109
        auto pf = util::make_promise_future<void>();
2✔
4110
        Realm::Config config = interrupted_realm_config;
2✔
4111
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
4112
        config.sync_config->on_sync_client_event_hook =
2✔
4113
            [promise = util::CopyablePromiseHolder(std::move(pf.promise))](std::weak_ptr<SyncSession> weak_session,
2✔
4114
                                                                           const SyncClientHookData& data) mutable {
50✔
4115
                if (data.event != SyncClientHookEvent::BootstrapMessageProcessed &&
50✔
4116
                    data.event != SyncClientHookEvent::BootstrapProcessed) {
43✔
4117
                    return SyncClientHookAction::NoAction;
32✔
4118
                }
32✔
4119
                auto session = weak_session.lock();
18✔
4120
                if (!session) {
18✔
4121
                    return SyncClientHookAction::NoAction;
×
4122
                }
×
4123
                if (data.query_version != 1) {
18✔
4124
                    return SyncClientHookAction::NoAction;
4✔
4125
                }
4✔
4126

7✔
4127
                // Commit a subscriptions set whenever a bootstrap message is received for query version 1.
7✔
4128
                if (data.event == SyncClientHookEvent::BootstrapMessageProcessed) {
14✔
4129
                    auto latest_subs = session->get_flx_subscription_store()->get_latest().make_mutable_copy();
12✔
4130
                    latest_subs.commit();
12✔
4131
                    return SyncClientHookAction::NoAction;
12✔
4132
                }
12✔
4133
                // At least one subscription set was created.
1✔
4134
                CHECK(session->get_flx_subscription_store()->get_latest().version() > 1);
2!
4135
                promise.get_promise().emplace_value();
2✔
4136
                // Reconnect once query version 1 is bootstrapped.
1✔
4137
                return SyncClientHookAction::TriggerReconnect;
2✔
4138
            };
2✔
4139

1✔
4140
        auto realm = Realm::get_shared_realm(config);
2✔
4141
        {
2✔
4142
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
4143
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
4144
            mut_subs.insert_or_assign(Query(table));
2✔
4145
            mut_subs.commit();
2✔
4146
        }
2✔
4147
        pf.future.get();
2✔
4148
        realm->sync_session()->shutdown_and_wait();
2✔
4149
        realm->close();
2✔
4150
    }
2✔
4151

1✔
4152
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
4153

1✔
4154
    // Check at least one subscription set needs to be resent.
1✔
4155
    {
2✔
4156
        DBOptions options;
2✔
4157
        options.encryption_key = test_util::crypt_key();
2✔
4158
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
4159
        auto sub_store = sync::SubscriptionStore::create(realm, [](int64_t) {});
1✔
4160
        auto version_info = sub_store->get_version_info();
2✔
4161
        REQUIRE(version_info.latest > version_info.active);
2!
4162
    }
2✔
4163

1✔
4164
    // Resend the pending subscriptions.
1✔
4165
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
4166
    wait_for_upload(*realm);
2✔
4167
    wait_for_download(*realm);
2✔
4168
}
2✔
4169

4170
} // namespace realm::app
4171

4172
#endif // REALM_ENABLE_AUTH_TESTS
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc