• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jonathan.reams_2947

01 Dec 2023 08:08PM UTC coverage: 91.739% (+0.04%) from 91.695%
jonathan.reams_2947

Pull #7160

Evergreen

jbreams
allow handle_error to decide resumability
Pull Request #7160: Prevent resuming a session that has not been fully shut down

92428 of 169414 branches covered (0.0%)

315 of 349 new or added lines in 14 files covered. (90.26%)

80 existing lines in 14 files now uncovered.

232137 of 253041 relevant lines covered (91.74%)

6882826.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.34
/test/object-store/sync/flx_sync.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2021 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#if REALM_ENABLE_AUTH_TESTS
20

21
#include <catch2/catch_all.hpp>
22

23
#include <util/test_file.hpp>
24
#include <util/crypt_key.hpp>
25
#include <util/sync/flx_sync_harness.hpp>
26
#include <util/sync/sync_test_utils.hpp>
27

28
#include <realm/object_id.hpp>
29
#include <realm/query_expression.hpp>
30

31
#include <realm/object-store/binding_context.hpp>
32
#include <realm/object-store/impl/object_accessor_impl.hpp>
33
#include <realm/object-store/impl/realm_coordinator.hpp>
34
#include <realm/object-store/schema.hpp>
35
#include <realm/object-store/sync/generic_network_transport.hpp>
36
#include <realm/object-store/sync/mongo_client.hpp>
37
#include <realm/object-store/sync/mongo_database.hpp>
38
#include <realm/object-store/sync/mongo_collection.hpp>
39
#include <realm/object-store/sync/async_open_task.hpp>
40
#include <realm/util/bson/bson.hpp>
41
#include <realm/object-store/sync/sync_session.hpp>
42

43
#include <realm/sync/client_base.hpp>
44
#include <realm/sync/config.hpp>
45
#include <realm/sync/noinst/client_history_impl.hpp>
46
#include <realm/sync/noinst/client_reset.hpp>
47
#include <realm/sync/noinst/client_reset_operation.hpp>
48
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
49
#include <realm/sync/noinst/server/access_token.hpp>
50
#include <realm/sync/protocol.hpp>
51
#include <realm/sync/subscriptions.hpp>
52

53
#include <realm/util/future.hpp>
54
#include <realm/util/logger.hpp>
55

56
#include <catch2/catch_all.hpp>
57

58
#include <filesystem>
59
#include <iostream>
60
#include <stdexcept>
61

62
using namespace std::string_literals;
63

64
namespace realm {
65

66
class TestHelper {
67
public:
68
    static DBRef& get_db(SharedRealm const& shared_realm)
69
    {
70
        return Realm::Internal::get_db(*shared_realm);
71
    }
72
};
73

74
} // namespace realm
75

76
namespace realm::app {
77

78
namespace {
79
const Schema g_minimal_schema{
80
    {"TopLevel",
81
     {
82
         {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
83
     }},
84
};
85

86
const Schema g_large_array_schema{
87
    ObjectSchema("TopLevel",
88
                 {
89
                     {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
90
                     {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
91
                     {"list_of_strings", PropertyType::Array | PropertyType::String},
92
                 }),
93
};
94

95
const Schema g_simple_embedded_obj_schema{
96
    {"TopLevel",
97
     {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
98
      {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
99
      {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
100
    {"TopLevel_embedded_obj",
101
     ObjectSchema::ObjectType::Embedded,
102
     {
103
         {"str_field", PropertyType::String | PropertyType::Nullable},
104
     }},
105
};
106

107
// Populates a FLXSyncTestHarness with the g_large_array_schema with objects that are large enough that
108
// they are guaranteed to fill multiple bootstrap download messages. Currently this means generating 5
109
// objects each with 1024 array entries of 1024 bytes each.
110
//
111
// Returns a list of the _id values for the objects created.
112
std::vector<ObjectId> fill_large_array_schema(FLXSyncTestHarness& harness)
113
{
12✔
114
    std::vector<ObjectId> ret;
12✔
115
    REQUIRE(harness.schema() == g_large_array_schema);
12!
116
    harness.load_initial_data([&](SharedRealm realm) {
12✔
117
        CppContext c(realm);
12✔
118
        for (int i = 0; i < 5; ++i) {
72✔
119
            auto id = ObjectId::gen();
60✔
120
            auto obj = Object::create(c, realm, "TopLevel",
60✔
121
                                      std::any(AnyDict{{"_id", id},
60✔
122
                                                       {"list_of_strings", AnyVector{}},
60✔
123
                                                       {"queryable_int_field", static_cast<int64_t>(i * 5)}}));
60✔
124
            List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings"));
60✔
125
            for (int j = 0; j < 1024; ++j) {
61,500✔
126
                str_list.add(c, std::any(std::string(1024, 'a' + (j % 26))));
61,440✔
127
            }
61,440✔
128

30✔
129
            ret.push_back(id);
60✔
130
        }
60✔
131
    });
12✔
132
    return ret;
12✔
133
}
12✔
134

135
} // namespace
136

137
TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][baas]") {
2✔
138
    FLXSyncTestHarness harness("basic_flx_connect");
2✔
139

1✔
140
    auto foo_obj_id = ObjectId::gen();
2✔
141
    auto bar_obj_id = ObjectId::gen();
2✔
142
    harness.load_initial_data([&](SharedRealm realm) {
2✔
143
        CppContext c(realm);
2✔
144
        Object::create(c, realm, "TopLevel",
2✔
145
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
146
                                        {"queryable_str_field", "foo"s},
2✔
147
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
148
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
149
        Object::create(c, realm, "TopLevel",
2✔
150
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
151
                                        {"queryable_str_field", "bar"s},
2✔
152
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
153
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
154
    });
2✔
155

1✔
156

1✔
157
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
158
        {
2✔
159
            auto empty_subs = realm->get_latest_subscription_set();
2✔
160
            CHECK(empty_subs.size() == 0);
2!
161
            CHECK(empty_subs.version() == 0);
2!
162
            empty_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
163
        }
2✔
164

1✔
165
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
166
        auto col_key = table->get_column_key("queryable_str_field");
2✔
167
        Query query_foo(table);
2✔
168
        query_foo.equal(col_key, "foo");
2✔
169
        {
2✔
170
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
171
            new_subs.insert_or_assign(query_foo);
2✔
172
            auto subs = new_subs.commit();
2✔
173
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
174
        }
2✔
175

1✔
176
        {
2✔
177
            wait_for_advance(*realm);
2✔
178
            Results results(realm, table);
2✔
179
            CHECK(results.size() == 1);
2!
180
            auto obj = results.get<Obj>(0);
2✔
181
            CHECK(obj.is_valid());
2!
182
            CHECK(obj.get<ObjectId>("_id") == foo_obj_id);
2!
183
        }
2✔
184

1✔
185
        {
2✔
186
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
187
            Query new_query_bar(table);
2✔
188
            new_query_bar.equal(col_key, "bar");
2✔
189
            mut_subs.insert_or_assign(new_query_bar);
2✔
190
            auto subs = mut_subs.commit();
2✔
191
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
192
        }
2✔
193

1✔
194
        {
2✔
195
            wait_for_advance(*realm);
2✔
196
            Results results(realm, Query(table));
2✔
197
            CHECK(results.size() == 2);
2!
198
        }
2✔
199

1✔
200
        {
2✔
201
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
202
            CHECK(mut_subs.erase(query_foo));
2!
203
            Query new_query_bar(table);
2✔
204
            new_query_bar.equal(col_key, "bar");
2✔
205
            mut_subs.insert_or_assign(new_query_bar);
2✔
206
            auto subs = mut_subs.commit();
2✔
207
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
208
        }
2✔
209

1✔
210
        {
2✔
211
            wait_for_advance(*realm);
2✔
212
            Results results(realm, Query(table));
2✔
213
            CHECK(results.size() == 1);
2!
214
            auto obj = results.get<Obj>(0);
2✔
215
            CHECK(obj.is_valid());
2!
216
            CHECK(obj.get<ObjectId>("_id") == bar_obj_id);
2!
217
        }
2✔
218

1✔
219
        {
2✔
220
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
221
            mut_subs.clear();
2✔
222
            auto subs = mut_subs.commit();
2✔
223
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
224
        }
2✔
225

1✔
226
        {
2✔
227
            wait_for_advance(*realm);
2✔
228
            Results results(realm, table);
2✔
229
            CHECK(results.size() == 0);
2!
230
        }
2✔
231
    });
2✔
232
}
2✔
233

234
TEST_CASE("flx: test commands work", "[sync][flx][test command][baas]") {
2✔
235
    FLXSyncTestHarness harness("test_commands");
2✔
236
    harness.do_with_new_realm([&](const SharedRealm& realm) {
2✔
237
        wait_for_upload(*realm);
2✔
238
        nlohmann::json command_request = {
2✔
239
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
240
        };
2✔
241
        auto resp_body =
2✔
242
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), command_request.dump()).get();
2✔
243
        REQUIRE(resp_body == "{}");
2!
244

1✔
245
        auto bad_status =
2✔
246
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "foobar: }").get_no_throw();
2✔
247
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
248
        REQUIRE_THAT(bad_status.get_status().reason(),
2✔
249
                     Catch::Matchers::ContainsSubstring("Invalid json input to send_test_command"));
2✔
250

1✔
251
        bad_status =
2✔
252
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "{\"cmd\": \"\"}").get_no_throw();
2✔
253
        REQUIRE_FALSE(bad_status.is_ok());
2!
254
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
2!
255
        REQUIRE(bad_status.get_status().reason() ==
2!
256
                "Must supply command name in \"command\" field of test command json object");
2✔
257
    });
2✔
258
}
2✔
259

260

261
static auto make_error_handler()
262
{
42✔
263
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
42✔
264
    auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
42✔
265
    auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) {
42✔
266
        error_promise->emplace_value(std::move(err));
42✔
267
    };
42✔
268
    return std::make_pair(std::move(error_future), std::move(fn));
42✔
269
}
42✔
270

271
static auto install_error_handler(Realm::Config& config)
272
{
20✔
273
    auto&& [error_future, error_handler] = make_error_handler();
20✔
274
    config.sync_config->error_handler = std::move(error_handler);
20✔
275
    return std::move(error_future);
20✔
276
}
20✔
277

278
static auto make_client_reset_handler()
279
{
16✔
280
    auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
16✔
281
    auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
16✔
282
    auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
16✔
283
        reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
12✔
284
    };
16✔
285
    return std::make_pair(std::move(reset_future), std::move(fn));
16✔
286
}
16✔
287

288

289
TEST_CASE("app: error handling integration test", "[sync][flx][baas]") {
24✔
290
    static std::optional<FLXSyncTestHarness> harness{"error_handling"};
24✔
291
    create_user_and_log_in(harness->app());
24✔
292
    SyncTestFile config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
24✔
293
    config.sync_config->client_resync_mode = ClientResyncMode::Manual;
24✔
294

12✔
295
    SECTION("handles unknown errors gracefully") {
24✔
296
        auto error_future = install_error_handler(config);
2✔
297
        auto r = Realm::get_shared_realm(config);
2✔
298
        wait_for_download(*r);
2✔
299
        nlohmann::json error_body = {
2✔
300
            {"tryAgain", false},         {"message", "fake error"},
2✔
301
            {"shouldClientReset", true}, {"isRecoveryModeDisabled", false},
2✔
302
            {"action", "ClientReset"},
2✔
303
        };
2✔
304
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
305
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
306
        auto test_cmd_res =
2✔
307
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
308
                .get();
2✔
309
        REQUIRE(test_cmd_res == "{}");
2!
310
        auto error = wait_for_future(std::move(error_future)).get();
2✔
311
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
312
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
2!
313
        REQUIRE(error.is_fatal);
2!
314
        REQUIRE_THAT(error.status.reason(),
2✔
315
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
316
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
317

1✔
318
        // since the client resync mode is manual here we should have ended up in an inactive state.
1✔
319
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
320
    }
2✔
321

12✔
322
    SECTION("unknown errors that are warnings end up in an active state") {
24✔
323
        auto error_future = install_error_handler(config);
2✔
324
        auto r = Realm::get_shared_realm(config);
2✔
325
        wait_for_download(*r);
2✔
326
        nlohmann::json error_body = {
2✔
327
            {"tryAgain", false},          {"message", "fake error"},
2✔
328
            {"shouldClientReset", false}, {"isRecoveryModeDisabled", false},
2✔
329
            {"action", "Warning"},
2✔
330
        };
2✔
331
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
332
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
333
        auto test_cmd_res =
2✔
334
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
335
                .get();
2✔
336
        REQUIRE(test_cmd_res == "{}");
2!
337
        auto error = wait_for_future(std::move(error_future)).get();
2✔
338
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
339
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
2!
340
        REQUIRE(error.is_fatal);
2!
341
        REQUIRE_THAT(error.status.reason(),
2✔
342
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
343
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
344

1✔
345
        REQUIRE(r->sync_session()->state() == SyncSession::State::Active);
2!
346
    }
2✔
347

12✔
348
    SECTION("transient errors do not surface to error handler") {
24✔
349
        std::mutex error_mutex;
2✔
350
        std::condition_variable error_cv;
2✔
351
        std::vector<sync::ProtocolErrorInfo> errors;
2✔
352
        bool error_handler_called = false;
2✔
353
        config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>,
2✔
354
                                                            const SyncClientHookData& data) {
16✔
355
            if (data.event != SyncClientHookEvent::SessionSuspended) {
16✔
356
                return SyncClientHookAction::NoAction;
14✔
357
            }
14✔
358

1✔
359
            std::lock_guard lock{error_mutex};
2✔
360
            errors.push_back(*data.error_info);
2✔
361
            error_cv.notify_one();
2✔
362
            return SyncClientHookAction::NoAction;
2✔
363
        };
2✔
364

1✔
365
        config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError) {
1✔
NEW
366
            std::lock_guard lock{error_mutex};
×
NEW
367
            error_handler_called = true;
×
NEW
368
            error_cv.notify_one();
×
NEW
369
        };
×
370

1✔
371
        auto r = Realm::get_shared_realm(config);
2✔
372
        wait_for_upload(*r);
2✔
373
        nlohmann::json error_body = {
2✔
374
            {"tryAgain", true},           {"message", "transient fake error"},
2✔
375
            {"shouldClientReset", false}, {"isRecoveryModeDisabled", false},
2✔
376
            {"action", "Transient"},
2✔
377
        };
2✔
378
        nlohmann::json test_command = {
2✔
379
            {"command", "ECHO_ERROR"},
2✔
380
            {"args", nlohmann::json{{"errorCode", static_cast<int>(sync::ProtocolError::initial_sync_not_completed)},
2✔
381
                                    {"errorBody", error_body}}}};
2✔
382
        auto test_cmd_res =
2✔
383
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
384
                .get();
2✔
385
        REQUIRE(test_cmd_res == "{}");
2!
386

1✔
387
        std::unique_lock lock{error_mutex};
2✔
388
        error_cv.wait(lock, [&] {
4✔
389
            return error_handler_called || !errors.empty();
4✔
390
        });
4✔
391

1✔
392
        REQUIRE(!errors.empty());
2!
393
        lock.unlock();
2✔
394
        wait_for_download(*r);
2✔
395
        lock.lock();
2✔
396
        REQUIRE(!error_handler_called);
2!
397
        const auto error = std::move(errors.back());
2✔
398
        lock.unlock();
2✔
399

1✔
400
        REQUIRE(!error.is_fatal);
2!
401
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
2!
402
        REQUIRE(error.should_client_reset);
2!
403
        REQUIRE(!error.should_client_reset.value());
2!
404
        REQUIRE(static_cast<sync::ProtocolError>(error.raw_error_code) ==
2!
405
                sync::ProtocolError::initial_sync_not_completed);
2✔
406
    }
2✔
407

12✔
408
    SECTION("unknown errors without actions are application bugs") {
24✔
409
        auto error_future = install_error_handler(config);
2✔
410
        auto r = Realm::get_shared_realm(config);
2✔
411
        wait_for_download(*r);
2✔
412
        nlohmann::json error_body = {
2✔
413
            {"tryAgain", false},
2✔
414
            {"message", "fake error"},
2✔
415
            {"shouldClientReset", false},
2✔
416
            {"isRecoveryModeDisabled", false},
2✔
417
        };
2✔
418
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
419
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
420
        auto test_cmd_res =
2✔
421
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
422
                .get();
2✔
423
        REQUIRE(test_cmd_res == "{}");
2!
424
        auto error = wait_for_future(std::move(error_future)).get();
2✔
425
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
426
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
427
        REQUIRE(error.is_fatal);
2!
428
        REQUIRE_THAT(error.status.reason(),
2✔
429
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
430
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
431
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
432
    }
2✔
433

12✔
434
    SECTION("cannot resume after fatal error") {
24✔
435
        enum class BarrierState { WaitingForSuspend, WaitingForResume, Done };
2✔
436
        std::mutex barrier_mutex;
2✔
437
        std::condition_variable barrier_cv;
2✔
438
        BarrierState barrier_state = BarrierState::WaitingForSuspend;
2✔
439
        config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>,
2✔
440
                                                            const SyncClientHookData& data) {
16✔
441
            if (data.event != SyncClientHookEvent::SessionSuspended || !data.error_info->is_fatal) {
16✔
442
                return SyncClientHookAction::NoAction;
14✔
443
            }
14✔
444

1✔
445
            std::unique_lock lock{barrier_mutex};
2✔
446
            REALM_ASSERT(barrier_state == BarrierState::WaitingForSuspend);
2✔
447
            barrier_state = BarrierState::WaitingForResume;
2✔
448
            barrier_cv.notify_one();
2✔
449
            barrier_cv.wait(lock, [&] {
4✔
450
                return barrier_state == BarrierState::Done;
4✔
451
            });
4✔
452
            return SyncClientHookAction::NoAction;
2✔
453
        };
2✔
454
        auto error_future = install_error_handler(config);
2✔
455
        auto r = Realm::get_shared_realm(config);
2✔
456
        wait_for_upload(*r);
2✔
457
        nlohmann::json error_body = {
2✔
458
            {"tryAgain", false},
2✔
459
            {"message", "fake error"},
2✔
460
            {"shouldClientReset", false},
2✔
461
            {"isRecoveryModeDisabled", false},
2✔
462
        };
2✔
463
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
464
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
2✔
465
        auto test_cmd_res =
2✔
466
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
467
                .get();
2✔
468
        REQUIRE(test_cmd_res == "{}");
2!
469

1✔
470
        // Resume the session while the error is being handled but before the session is marked inactive.
1✔
471
        {
2✔
472
            std::unique_lock lock{barrier_mutex};
2✔
473
            barrier_cv.wait(lock, [&] {
4✔
474
                return barrier_state == BarrierState::WaitingForResume;
4✔
475
            });
4✔
476
            r->sync_session()->handle_reconnect();
2✔
477
            barrier_state = BarrierState::Done;
2✔
478
            barrier_cv.notify_one();
2✔
479
        }
2✔
480

1✔
481
        auto error = wait_for_future(std::move(error_future)).get();
2✔
482
        REQUIRE(error.status == ErrorCodes::UnknownError);
2!
483
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
484
        REQUIRE(error.is_fatal);
2!
485
        REQUIRE_THAT(error.status.reason(),
2✔
486
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
2✔
487
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
488
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
489
    }
2✔
490

12✔
491

12✔
492
    SECTION("handles unknown actions gracefully") {
24✔
493
        auto error_future = install_error_handler(config);
2✔
494
        auto r = Realm::get_shared_realm(config);
2✔
495
        wait_for_download(*r);
2✔
496
        nlohmann::json error_body = {
2✔
497
            {"tryAgain", false},
2✔
498
            {"message", "fake error"},
2✔
499
            {"shouldClientReset", true},
2✔
500
            {"isRecoveryModeDisabled", false},
2✔
501
            {"action", "FakeActionThatWillNeverExist"},
2✔
502
        };
2✔
503
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
504
                                       {"args", nlohmann::json{{"errorCode", 201}, {"errorBody", error_body}}}};
2✔
505
        auto test_cmd_res =
2✔
506
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
507
                .get();
2✔
508
        REQUIRE(test_cmd_res == "{}");
2!
509
        auto error = wait_for_future(std::move(error_future)).get();
2✔
510
        REQUIRE(error.status == ErrorCodes::RuntimeError);
2!
511
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
512
        REQUIRE(error.is_fatal);
2!
513
        REQUIRE_THAT(error.status.reason(), !Catch::Matchers::ContainsSubstring("Unknown sync protocol error code"));
2✔
514
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
2✔
515
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
516
    }
2✔
517

12✔
518

12✔
519
    SECTION("unknown connection-level errors are still errors") {
24✔
520
        auto error_future = install_error_handler(config);
2✔
521
        auto r = Realm::get_shared_realm(config);
2✔
522
        wait_for_download(*r);
2✔
523
        nlohmann::json error_body = {{"tryAgain", false},
2✔
524
                                     {"message", "fake error"},
2✔
525
                                     {"shouldClientReset", false},
2✔
526
                                     {"isRecoveryModeDisabled", false},
2✔
527
                                     {"action", "ApplicationBug"}};
2✔
528
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
2✔
529
                                       {"args", nlohmann::json{{"errorCode", 199}, {"errorBody", error_body}}}};
2✔
530
        auto test_cmd_res =
2✔
531
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
532
                .get();
2✔
533
        REQUIRE(test_cmd_res == "{}");
2!
534
        auto error = wait_for_future(std::move(error_future)).get();
2✔
535
        REQUIRE(error.status == ErrorCodes::SyncProtocolInvariantFailed);
2!
536
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ProtocolViolation);
2!
537
        REQUIRE(error.is_fatal);
2!
538
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
539
    }
2✔
540

12✔
541
    SECTION("client reset errors") {
24✔
542
        auto error_future = install_error_handler(config);
6✔
543
        auto r = Realm::get_shared_realm(config);
6✔
544
        wait_for_download(*r);
6✔
545
        nlohmann::json error_body = {{"tryAgain", false},
6✔
546
                                     {"message", "fake error"},
6✔
547
                                     {"shouldClientReset", true},
6✔
548
                                     {"isRecoveryModeDisabled", false},
6✔
549
                                     {"action", "ClientReset"}};
6✔
550
        auto code = GENERATE(sync::ProtocolError::bad_client_file_ident, sync::ProtocolError::bad_server_version,
6✔
551
                             sync::ProtocolError::diverging_histories);
6✔
552
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
6✔
553
                                       {"args", nlohmann::json{{"errorCode", code}, {"errorBody", error_body}}}};
6✔
554
        auto test_cmd_res =
6✔
555
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
6✔
556
                .get();
6✔
557
        REQUIRE(test_cmd_res == "{}");
6!
558
        auto error = wait_for_future(std::move(error_future)).get();
6✔
559
        REQUIRE(error.status == ErrorCodes::SyncClientResetRequired);
6!
560
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
6!
561
        REQUIRE(error.is_client_reset_requested());
6!
562
        REQUIRE(error.is_fatal);
6!
563
        // since the client resync mode is manual here we should have ended up in an inactive state.
3✔
564
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
6!
565
    }
6✔
566

12✔
567
    SECTION("log out user action") {
24✔
568
        auto error_future = install_error_handler(config);
2✔
569
        auto r = Realm::get_shared_realm(config);
2✔
570
        wait_for_download(*r);
2✔
571
        nlohmann::json error_body = {{"tryAgain", false},
2✔
572
                                     {"message", "fake error"},
2✔
573
                                     {"shouldClientReset", false},
2✔
574
                                     {"isRecoveryModeDisabled", false},
2✔
575
                                     {"action", "LogOutUser"}};
2✔
576
        nlohmann::json test_command = {
2✔
577
            {"command", "ECHO_ERROR"},
2✔
578
            {"args", nlohmann::json{{"errorCode", sync::ProtocolError::user_mismatch}, {"errorBody", error_body}}}};
2✔
579
        auto test_cmd_res =
2✔
580
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
2✔
581
                .get();
2✔
582
        REQUIRE(test_cmd_res == "{}");
2!
583
        auto error = wait_for_future(std::move(error_future)).get();
2✔
584
        REQUIRE(error.status == ErrorCodes::SyncUserMismatch);
2!
585
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::LogOutUser);
2!
586
        REQUIRE(!error.is_client_reset_requested());
2!
587
        REQUIRE(error.is_fatal);
2!
588
        REQUIRE(r->sync_session()->state() == SyncSession::State::Inactive);
2!
589
        REQUIRE(!r->sync_session()->user()->is_logged_in());
2!
590
    }
2✔
591

12✔
592
    SECTION("teardown") {
24✔
593
        harness.reset();
2✔
594
    }
2✔
595
}
24✔
596

597

598
TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
42✔
599
    std::vector<ObjectSchema> schema{
42✔
600
        {"TopLevel",
42✔
601
         {
42✔
602
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
603
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
604
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
42✔
605
             {"non_queryable_field", PropertyType::String | PropertyType::Nullable},
42✔
606
             {"list_of_ints_field", PropertyType::Int | PropertyType::Array},
42✔
607
             {"sum_of_list_field", PropertyType::Int},
42✔
608
         }},
42✔
609
        {"TopLevel2",
42✔
610
         {
42✔
611
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
42✔
612
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
42✔
613
         }},
42✔
614
    };
42✔
615

21✔
616
    // some of these tests make additive schema changes which is only allowed in dev mode
21✔
617
    constexpr bool dev_mode = true;
42✔
618
    FLXSyncTestHarness harness("flx_client_reset",
42✔
619
                               {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode});
42✔
620

21✔
621
    auto add_object = [](SharedRealm realm, std::string str_field, int64_t int_field,
42✔
622
                         ObjectId oid = ObjectId::gen()) {
116✔
623
        CppContext c(realm);
116✔
624
        realm->begin_transaction();
116✔
625

58✔
626
        int64_t r1 = random_int();
116✔
627
        int64_t r2 = random_int();
116✔
628
        int64_t r3 = random_int();
116✔
629
        int64_t sum = uint64_t(r1) + r2 + r3;
116✔
630

58✔
631
        Object::create(c, realm, "TopLevel",
116✔
632
                       std::any(AnyDict{{"_id", oid},
116✔
633
                                        {"queryable_str_field", str_field},
116✔
634
                                        {"queryable_int_field", int_field},
116✔
635
                                        {"non_queryable_field", "non queryable 1"s},
116✔
636
                                        {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
116✔
637
                                        {"sum_of_list_field", sum}}));
116✔
638
        realm->commit_transaction();
116✔
639
    };
116✔
640

21✔
641
    auto subscribe_to_and_add_objects = [&](SharedRealm realm, size_t num_objects) {
41✔
642
        auto table = realm->read_group().get_table("class_TopLevel");
40✔
643
        auto id_col = table->get_primary_key_column();
40✔
644
        auto sub_set = realm->get_latest_subscription_set();
40✔
645
        for (size_t i = 0; i < num_objects; ++i) {
130✔
646
            auto oid = ObjectId::gen();
90✔
647
            auto mut_sub = sub_set.make_mutable_copy();
90✔
648
            mut_sub.clear();
90✔
649
            mut_sub.insert_or_assign(Query(table).equal(id_col, oid));
90✔
650
            sub_set = mut_sub.commit();
90✔
651
            add_object(realm, util::format("added _id='%1'", oid), 0, oid);
90✔
652
        }
90✔
653
    };
40✔
654

21✔
655
    auto add_subscription_for_new_object = [&](SharedRealm realm, std::string str_field,
42✔
656
                                               int64_t int_field) -> sync::SubscriptionSet {
32✔
657
        auto table = realm->read_group().get_table("class_TopLevel");
22✔
658
        auto queryable_str_field = table->get_column_key("queryable_str_field");
22✔
659
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
22✔
660
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, StringData(str_field)));
22✔
661
        auto resulting_set = sub_set.commit();
22✔
662
        add_object(realm, str_field, int_field);
22✔
663
        return resulting_set;
22✔
664
    };
22✔
665

21✔
666
    auto add_invalid_subscription = [&](SharedRealm realm) -> sync::SubscriptionSet {
22✔
667
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
668
        auto queryable_str_field = table->get_column_key("non_queryable_field");
2✔
669
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
2✔
670
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
671
        auto resulting_set = sub_set.commit();
2✔
672
        return resulting_set;
2✔
673
    };
2✔
674

21✔
675
    auto count_queries_with_str = [](sync::SubscriptionSet subs, std::string_view str) {
24✔
676
        size_t count = 0;
6✔
677
        for (auto sub : subs) {
10✔
678
            if (sub.query_string.find(str) != std::string::npos) {
10✔
679
                ++count;
4✔
680
            }
4✔
681
        }
10✔
682
        return count;
6✔
683
    };
6✔
684
    create_user_and_log_in(harness.app());
42✔
685
    auto user1 = harness.app()->current_user();
42✔
686
    create_user_and_log_in(harness.app());
42✔
687
    auto user2 = harness.app()->current_user();
42✔
688
    SyncTestFile config_local(user1, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
689
    config_local.path += ".local";
42✔
690
    SyncTestFile config_remote(user2, harness.schema(), SyncConfig::FLXSyncEnabled{});
42✔
691
    config_remote.path += ".remote";
42✔
692
    const std::string str_field_value = "foo";
42✔
693
    const int64_t local_added_int = 100;
42✔
694
    const int64_t remote_added_int = 200;
42✔
695
    size_t before_reset_count = 0;
42✔
696
    size_t after_reset_count = 0;
42✔
697
    config_local.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) {
34✔
698
        ++before_reset_count;
26✔
699
    };
26✔
700
    config_local.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference,
42✔
701
                                                                               bool) {
21✔
702
        ++after_reset_count;
×
703
    };
×
704

21✔
705
    SECTION("Recover: offline writes and subscription (single subscription)") {
42✔
706
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
707
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
708
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
709
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
710
        test_reset
2✔
711
            ->populate_initial_object([&](SharedRealm realm) {
2✔
712
                auto pk_of_added_object = ObjectId::gen();
2✔
713
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
714
                auto table = realm->read_group().get_table(ObjectStore::table_name_for_object_type("TopLevel"));
2✔
715
                REALM_ASSERT(table);
2✔
716
                mut_subs.insert_or_assign(Query(table));
2✔
717
                mut_subs.commit();
2✔
718

1✔
719
                realm->begin_transaction();
2✔
720
                CppContext c(realm);
2✔
721
                int64_t r1 = random_int();
2✔
722
                int64_t r2 = random_int();
2✔
723
                int64_t r3 = random_int();
2✔
724
                int64_t sum = uint64_t(r1) + r2 + r3;
2✔
725

1✔
726
                Object::create(c, realm, "TopLevel",
2✔
727
                               std::any(AnyDict{{"_id"s, pk_of_added_object},
2✔
728
                                                {"queryable_str_field"s, "initial value"s},
2✔
729
                                                {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
2✔
730
                                                {"sum_of_list_field", sum}}));
2✔
731

1✔
732
                realm->commit_transaction();
2✔
733
                wait_for_upload(*realm);
2✔
734
                return pk_of_added_object;
2✔
735
            })
2✔
736
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
737
                add_object(local_realm, str_field_value, local_added_int);
2✔
738
            })
2✔
739
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
740
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
741
                sync::SubscriptionSet::State actual =
2✔
742
                    remote_realm->get_latest_subscription_set()
2✔
743
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
744
                        .get();
2✔
745
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
746
            })
2✔
747
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
748
                wait_for_advance(*local_realm);
2✔
749
                ClientResyncMode mode = client_reset_future.get();
2✔
750
                REQUIRE(mode == ClientResyncMode::Recover);
2!
751
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
752
                auto str_col = table->get_column_key("queryable_str_field");
2✔
753
                auto int_col = table->get_column_key("queryable_int_field");
2✔
754
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
755
                tv.sort(int_col);
2✔
756
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
757
                REQUIRE(tv.size() == 2);
2!
758
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
759
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
760
            })
2✔
761
            ->run();
2✔
762
    }
2✔
763

21✔
764
    SECTION("Recover: subscription and offline writes after client reset failure") {
42✔
765
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
766
        auto&& [error_future, error_handler] = make_error_handler();
2✔
767
        config_local.sync_config->error_handler = error_handler;
2✔
768

1✔
769
        std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(config_local.path);
2✔
770
        // create a non-empty directory that we'll fail to delete
1✔
771
        util::make_dir(fresh_path);
2✔
772
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
773

1✔
774
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
775
        test_reset
2✔
776
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
777
                auto mut_sub = local_realm->get_latest_subscription_set().make_mutable_copy();
2✔
778
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
779
                mut_sub.insert_or_assign(Query(table));
2✔
780
                mut_sub.commit();
2✔
781

1✔
782
                CppContext c(local_realm);
2✔
783
                local_realm->begin_transaction();
2✔
784
                Object::create(c, local_realm, "TopLevel2",
2✔
785
                               std::any(AnyDict{{"_id"s, ObjectId::gen()}, {"queryable_str_field"s, "foo"s}}));
2✔
786
                local_realm->commit_transaction();
2✔
787
            })
2✔
788
            ->on_post_reset([](SharedRealm local_realm) {
2✔
789
                // Verify offline subscription was not removed.
1✔
790
                auto subs = local_realm->get_latest_subscription_set();
2✔
791
                auto table = local_realm->read_group().get_table("class_TopLevel2");
2✔
792
                REQUIRE(subs.find(Query(table)));
2!
793
            })
2✔
794
            ->run();
2✔
795

1✔
796
        // Remove the folder preventing the completion of a client reset.
1✔
797
        util::try_remove_dir_recursive(fresh_path);
2✔
798

1✔
799
        RealmConfig config_copy = config_local;
2✔
800
        config_copy.sync_config = std::make_shared<SyncConfig>(*config_copy.sync_config);
2✔
801
        config_copy.sync_config->error_handler = nullptr;
2✔
802
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
803
        config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
804

1✔
805
        // Attempt to open the realm again.
1✔
806
        // This time the client reset succeeds and the offline subscription and writes are recovered.
1✔
807
        auto realm = Realm::get_shared_realm(config_copy);
2✔
808
        ClientResyncMode mode = reset_future.get();
2✔
809
        REQUIRE(mode == ClientResyncMode::Recover);
2!
810

1✔
811
        auto table = realm->read_group().get_table("class_TopLevel2");
2✔
812
        auto str_col = table->get_column_key("queryable_str_field");
2✔
813
        REQUIRE(table->size() == 1);
2!
814
        REQUIRE(table->get_object(0).get<String>(str_col) == "foo");
2!
815
    }
2✔
816

21✔
817
    SECTION("Recover: offline writes and subscriptions (multiple subscriptions)") {
42✔
818
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
819
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
820
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
821
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
822
        test_reset
2✔
823
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
824
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
825
            })
2✔
826
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
827
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
828
                sync::SubscriptionSet::State actual =
2✔
829
                    remote_realm->get_latest_subscription_set()
2✔
830
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
831
                        .get();
2✔
832
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
833
            })
2✔
834
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
835
                ClientResyncMode mode = client_reset_future.get();
2✔
836
                REQUIRE(mode == ClientResyncMode::Recover);
2!
837
                auto subs = local_realm->get_latest_subscription_set();
2✔
838
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
839
                // make sure that the subscription for "foo" survived the reset
1✔
840
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
841
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
842
                REQUIRE(count_of_foo == 1);
2!
843
                local_realm->refresh();
2✔
844
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
845
                auto str_col = table->get_column_key("queryable_str_field");
2✔
846
                auto int_col = table->get_column_key("queryable_int_field");
2✔
847
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
2✔
848
                tv.sort(int_col);
2✔
849
                // the object we created while offline was recovered, and the remote object was downloaded
1✔
850
                REQUIRE(tv.size() == 2);
2!
851
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
2!
852
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
2!
853
            })
2✔
854
            ->run();
2✔
855
    }
2✔
856

21✔
857
    auto validate_integrity_of_arrays = [](TableRef table) -> size_t {
29✔
858
        auto sum_col = table->get_column_key("sum_of_list_field");
16✔
859
        auto array_col = table->get_column_key("list_of_ints_field");
16✔
860
        auto query = table->column<Lst<Int>>(array_col).sum() == table->column<Int>(sum_col) &&
16✔
861
                     table->column<Lst<Int>>(array_col).size() > 0;
16✔
862
        return query.count();
16✔
863
    };
16✔
864

21✔
865
    SECTION("Recover: offline writes with associated subscriptions in the correct order") {
42✔
866
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
867
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
868
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
869
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
870
        constexpr size_t num_objects_added = 20;
2✔
871
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
872
        constexpr size_t num_objects_added_by_remote = 1;  // make_remote_changes()
2✔
873
        test_reset
2✔
874
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
875
                subscribe_to_and_add_objects(local_realm, num_objects_added);
2✔
876
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
877
                REQUIRE(table->size() == num_objects_added + num_objects_added_by_harness);
2!
878
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
879
                REQUIRE(count_of_valid_array_data == num_objects_added);
2!
880
            })
2✔
881
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
882
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
883
                sync::SubscriptionSet::State actual =
2✔
884
                    remote_realm->get_latest_subscription_set()
2✔
885
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
886
                        .get();
2✔
887
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
888
            })
2✔
889
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
890
                ClientResyncMode mode = client_reset_future.get();
2✔
891
                REQUIRE(mode == ClientResyncMode::Recover);
2!
892
                local_realm->refresh();
2✔
893
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
894
                auto state = latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
895
                REQUIRE(state == sync::SubscriptionSet::State::Complete);
2!
896
                local_realm->refresh();
2✔
897
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
898
                if (table->size() != 1) {
2✔
899
                    table->to_json(std::cout, 1, {});
×
900
                }
×
901
                REQUIRE(table->size() == 1);
2!
902
                auto mut_sub = latest_subs.make_mutable_copy();
2✔
903
                mut_sub.clear();
2✔
904
                mut_sub.insert_or_assign(Query(table));
2✔
905
                latest_subs = mut_sub.commit();
2✔
906
                latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
907
                local_realm->refresh();
2✔
908
                REQUIRE(table->size() ==
2!
909
                        num_objects_added + num_objects_added_by_harness + num_objects_added_by_remote);
2✔
910
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
911
                REQUIRE(count_of_valid_array_data == num_objects_added + num_objects_added_by_remote);
2!
912
            })
2✔
913
            ->run();
2✔
914
    }
2✔
915

21✔
916
    SECTION("Recover: incompatible property changes are rejected") {
42✔
917
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
918
        auto&& [error_future, err_handler] = make_error_handler();
2✔
919
        config_local.sync_config->error_handler = err_handler;
2✔
920
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
921
        constexpr size_t num_objects_added_before = 2;
2✔
922
        constexpr size_t num_objects_added_after = 2;
2✔
923
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
2✔
924
        constexpr std::string_view added_property_name = "new_property";
2✔
925
        test_reset
2✔
926
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
927
                subscribe_to_and_add_objects(local_realm, num_objects_added_before);
2✔
928
                Schema local_update = schema;
2✔
929
                Schema::iterator it = local_update.find("TopLevel");
2✔
930
                REQUIRE(it != local_update.end());
2!
931
                it->persisted_properties.push_back(
2✔
932
                    {std::string(added_property_name), PropertyType::Float | PropertyType::Nullable});
2✔
933
                local_realm->update_schema(local_update);
2✔
934
                subscribe_to_and_add_objects(local_realm, num_objects_added_after);
2✔
935
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
936
                REQUIRE(table->size() ==
2!
937
                        num_objects_added_before + num_objects_added_after + num_objects_added_by_harness);
2✔
938
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
939
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
940
            })
2✔
941
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
942
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
943
                Schema remote_update = schema;
2✔
944
                Schema::iterator it = remote_update.find("TopLevel");
2✔
945
                REQUIRE(it != remote_update.end());
2!
946
                it->persisted_properties.push_back(
2✔
947
                    {std::string(added_property_name), PropertyType::UUID | PropertyType::Nullable});
2✔
948
                remote_realm->update_schema(remote_update);
2✔
949
                sync::SubscriptionSet::State actual =
2✔
950
                    remote_realm->get_latest_subscription_set()
2✔
951
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
952
                        .get();
2✔
953
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
2!
954
            })
2✔
955
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm local_realm) mutable {
2✔
956
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
957
                REQUIRE(before_reset_count == 1);
2!
958
                REQUIRE(after_reset_count == 0);
2!
959
                REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
960
                REQUIRE(sync_error.is_client_reset_requested());
2!
961
                local_realm->refresh();
2✔
962
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
963
                // since schema validation happens in the first recovery commit, that whole commit is rolled back
1✔
964
                // and the final state here is "pre reset"
1✔
965
                REQUIRE(table->size() ==
2!
966
                        num_objects_added_before + num_objects_added_by_harness + num_objects_added_after);
2✔
967
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
2✔
968
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
2!
969
            })
2✔
970
            ->run();
2✔
971
    }
2✔
972

21✔
973
    SECTION("unsuccessful replay of local changes") {
42✔
974
        constexpr size_t num_objects_added_before = 2;
4✔
975
        constexpr size_t num_objects_added_after = 2;
4✔
976
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
4✔
977
        constexpr std::string_view added_property_name = "new_property";
4✔
978
        auto&& [error_future, err_handler] = make_error_handler();
4✔
979
        config_local.sync_config->error_handler = err_handler;
4✔
980

2✔
981
        // The local changes here are a bit contrived because removing a column is disallowed
2✔
982
        // at the object store layer for sync'd Realms. The only reason a recovery should fail in production
2✔
983
        // during the apply stage is due to programmer error or external factors such as out of disk space.
2✔
984
        // Any schema discrepancies are caught by the initial diff, so the way to make a recovery fail here is
2✔
985
        // to add and remove a column at the core level such that the schema diff passes, but instructions are
2✔
986
        // generated which will fail when applied.
2✔
987
        reset_utils::TestClientReset::Callback make_local_changes_that_will_fail = [&](SharedRealm local_realm) {
4✔
988
            subscribe_to_and_add_objects(local_realm, num_objects_added_before);
4✔
989
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
990
            REQUIRE(table->size() == num_objects_added_before + num_objects_added_by_harness);
4!
991
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
992
            REQUIRE(count_of_valid_array_data == num_objects_added_before);
4!
993
            local_realm->begin_transaction();
4✔
994
            ColKey added = table->add_column(type_Int, added_property_name);
4✔
995
            table->remove_column(added);
4✔
996
            local_realm->commit_transaction();
4✔
997
            subscribe_to_and_add_objects(local_realm, num_objects_added_after); // these are lost!
4✔
998
        };
4✔
999

2✔
1000
        reset_utils::TestClientReset::Callback verify_post_reset_state = [&, err_future = std::move(error_future)](
4✔
1001
                                                                             SharedRealm local_realm) mutable {
4✔
1002
            auto sync_error = wait_for_future(std::move(err_future)).get();
4✔
1003
            REQUIRE(before_reset_count == 1);
4!
1004
            REQUIRE(after_reset_count == 0);
4!
1005
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
4!
1006
            REQUIRE(sync_error.is_client_reset_requested());
4!
1007
            local_realm->refresh();
4✔
1008
            auto table = local_realm->read_group().get_table("class_TopLevel");
4✔
1009
            ColKey added = table->get_column_key(added_property_name);
4✔
1010
            REQUIRE(!added); // partial recovery halted at remove_column() but rolled back everything in the change
4!
1011
            // table is missing num_objects_added_after and the last commit after the latest subscription
2✔
1012
            // this is due to how recovery batches together changesets up until a subscription
2✔
1013
            const size_t expected_added_objects = num_objects_added_before - 1;
4✔
1014
            REQUIRE(table->size() == expected_added_objects + num_objects_added_by_harness);
4!
1015
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
4✔
1016
            REQUIRE(count_of_valid_array_data == expected_added_objects);
4!
1017
        };
4✔
1018

2✔
1019
        SECTION("Recover: unsuccessful recovery leads to a manual reset") {
4✔
1020
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1021
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1022
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
1023
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
1024
                ->run();
2✔
1025
            RealmConfig config_copy = config_local;
2✔
1026
            auto&& [error_future2, err_handler2] = make_error_handler();
2✔
1027
            config_copy.sync_config->error_handler = err_handler2;
2✔
1028
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
1029
            auto sync_error = wait_for_future(std::move(error_future2)).get();
2✔
1030
            REQUIRE(before_reset_count == 2);
2!
1031
            REQUIRE(after_reset_count == 0);
2!
1032
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1033
            REQUIRE(sync_error.is_client_reset_requested());
2!
1034
        }
2✔
1035

2✔
1036
        SECTION("RecoverOrDiscard: unsuccessful reapply leads to discard") {
4✔
1037
            config_local.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard;
2✔
1038
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1039
            test_reset->make_local_changes(std::move(make_local_changes_that_will_fail))
2✔
1040
                ->on_post_reset(std::move(verify_post_reset_state))
2✔
1041
                ->run();
2✔
1042

1✔
1043
            RealmConfig config_copy = config_local;
2✔
1044
            auto&& [client_reset_future, reset_handler] = make_client_reset_handler();
2✔
1045
            config_copy.sync_config->error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
1✔
1046
                REALM_ASSERT_EX(!err.is_fatal, err.status);
×
1047
                CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1048
            };
×
1049
            config_copy.sync_config->notify_after_client_reset = reset_handler;
2✔
1050
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
2✔
1051
            ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
1052
            REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
1053
            realm_post_reset->refresh();
2✔
1054
            auto table = realm_post_reset->read_group().get_table("class_TopLevel");
2✔
1055
            ColKey added = table->get_column_key(added_property_name);
2✔
1056
            REQUIRE(!added);                                        // reverted local changes
2!
1057
            REQUIRE(table->size() == num_objects_added_by_harness); // discarded all offline local changes
2!
1058
        }
2✔
1059
    }
4✔
1060

21✔
1061
    SECTION("DiscardLocal: offline writes and subscriptions are lost") {
42✔
1062
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1063
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
1064
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
1065
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1066
        test_reset
2✔
1067
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
1068
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
1069
            })
2✔
1070
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
1071
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
1072
            })
2✔
1073
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) mutable {
2✔
1074
                ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
2✔
1075
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
1076
                auto subs = local_realm->get_latest_subscription_set();
2✔
1077
                wait_for_future(subs.get_state_change_notification(sync::SubscriptionSet::State::Complete)).get();
2✔
1078
                local_realm->refresh();
2✔
1079
                auto table = local_realm->read_group().get_table("class_TopLevel");
2✔
1080
                auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
1081
                auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
1082
                auto tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
1083
                // the object we created while offline was discarded, and the remote object was not downloaded
1✔
1084
                REQUIRE(tv.size() == 0);
2!
1085
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
2✔
1086
                // make sure that the subscription for "foo" did not survive the reset
1✔
1087
                REQUIRE(count_of_foo == 0);
2!
1088
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
2!
1089

1✔
1090
                // adding data and subscriptions to a reset Realm works as normal
1✔
1091
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
1092
                auto latest_subs = local_realm->get_latest_subscription_set();
2✔
1093
                REQUIRE(latest_subs.version() > subs.version());
2!
1094
                wait_for_future(latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete))
2✔
1095
                    .get();
2✔
1096
                local_realm->refresh();
2✔
1097
                count_of_foo = count_queries_with_str(latest_subs, util::format("\"%1\"", str_field_value));
2✔
1098
                REQUIRE(count_of_foo == 1);
2!
1099
                tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
2✔
1100
                REQUIRE(tv.size() == 2);
2!
1101
                tv.sort(queryable_int_field);
2✔
1102
                REQUIRE(tv.get_object(0).get<int64_t>(queryable_int_field) == local_added_int);
2!
1103
                REQUIRE(tv.get_object(1).get<int64_t>(queryable_int_field) == remote_added_int);
2!
1104
            })
2✔
1105
            ->run();
2✔
1106
    }
2✔
1107

21✔
1108
    SECTION("DiscardLocal: an invalid subscription made while offline becomes superceeded") {
42✔
1109
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1110
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
1111
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
1112
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1113
        std::unique_ptr<sync::SubscriptionSet> invalid_sub;
2✔
1114
        test_reset
2✔
1115
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
1116
                invalid_sub = std::make_unique<sync::SubscriptionSet>(add_invalid_subscription(local_realm));
2✔
1117
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
2✔
1118
            })
2✔
1119
            ->make_remote_changes([&](SharedRealm remote_realm) {
2✔
1120
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
2✔
1121
            })
2✔
1122
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
2✔
1123
                local_realm->refresh();
2✔
1124
                sync::SubscriptionSet::State actual =
2✔
1125
                    invalid_sub->get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1126
                REQUIRE(actual == sync::SubscriptionSet::State::Superseded);
2!
1127
                ClientResyncMode mode = client_reset_future.get();
2✔
1128
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
2!
1129
            })
2✔
1130
            ->run();
2✔
1131
    }
2✔
1132

21✔
1133
    SECTION("DiscardLocal: an error is produced if a previously successful query becomes invalid due to "
42✔
1134
            "server changes across a reset") {
22✔
1135
        // Disable dev mode so non-queryable fields are not automatically added as queryable
1✔
1136
        const AppSession& app_session = harness.session().app_session();
2✔
1137
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
1138
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1139
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1140
        config_local.sync_config->error_handler = err_handler;
2✔
1141
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1142
        test_reset
2✔
1143
            ->setup([&](SharedRealm realm) {
2✔
1144
                if (realm->sync_session()->path() == config_local.path) {
2✔
1145
                    auto added_sub = add_subscription_for_new_object(realm, str_field_value, 0);
2✔
1146
                    added_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1147
                }
2✔
1148
            })
2✔
1149
            ->make_local_changes([&](SharedRealm local_realm) {
2✔
1150
                add_object(local_realm, str_field_value, local_added_int);
2✔
1151
                // Make "queryable_str_field" not a valid query field.
1✔
1152
                // Pre-reset, the Realm had a successful query on it, but now when the client comes back online
1✔
1153
                // and tries to reset, the fresh Realm download will fail with a query error.
1✔
1154
                const AppSession& app_session = harness.session().app_session();
2✔
1155
                auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
2✔
1156
                auto baas_sync_config =
2✔
1157
                    app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service);
2✔
1158
                REQUIRE(baas_sync_config.queryable_field_names->is_array());
2!
1159
                auto it = baas_sync_config.queryable_field_names->begin();
2✔
1160
                for (; it != baas_sync_config.queryable_field_names->end(); ++it) {
2✔
1161
                    if (*it == "queryable_str_field") {
2✔
1162
                        break;
2✔
1163
                    }
2✔
1164
                }
2✔
1165
                REQUIRE(it != baas_sync_config.queryable_field_names->end());
2!
1166
                baas_sync_config.queryable_field_names->erase(it);
2✔
1167
                app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
2✔
1168
            })
2✔
1169
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm) mutable {
2✔
1170
                auto sync_error = wait_for_future(std::move(err_future)).get();
2✔
1171
                INFO(sync_error.status);
2✔
1172
                CHECK(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1173
            })
2✔
1174
            ->run();
2✔
1175
    }
2✔
1176

21✔
1177
    SECTION("DiscardLocal: completion callbacks fire after client reset even when there is no data to download") {
42✔
1178
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1179
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
2✔
1180
        config_local.sync_config->notify_after_client_reset = reset_handler;
2✔
1181
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1182
        test_reset
2✔
1183
            ->on_post_local_changes([&](SharedRealm realm) {
2✔
1184
                wait_for_upload(*realm);
2✔
1185
                wait_for_download(*realm);
2✔
1186
            })
2✔
1187
            ->run();
2✔
1188
    }
2✔
1189

21✔
1190
    SECTION("DiscardLocal: open realm after client reset failure") {
42✔
1191
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1192
        auto&& [error_future, error_handler] = make_error_handler();
2✔
1193
        config_local.sync_config->error_handler = error_handler;
2✔
1194

1✔
1195
        std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(config_local.path);
2✔
1196
        // create a non-empty directory that we'll fail to delete
1✔
1197
        util::make_dir(fresh_path);
2✔
1198
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
2✔
1199

1✔
1200
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
2✔
1201
        test_reset->run();
2✔
1202

1✔
1203
        // Client reset fails due to sync client not being able to create the fresh realm.
1✔
1204
        auto sync_error = wait_for_future(std::move(error_future)).get();
2✔
1205
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1206

1✔
1207
        // Open the realm again. This should not crash.
1✔
1208
        auto&& [err_future, err_handler] = make_error_handler();
2✔
1209
        config_local.sync_config->error_handler = std::move(err_handler);
2✔
1210

1✔
1211
        auto realm_post_reset = Realm::get_shared_realm(config_local);
2✔
1212
        sync_error = wait_for_future(std::move(err_future)).get();
2✔
1213
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
2!
1214
    }
2✔
1215

21✔
1216
    enum class ResetMode { NoReset, InitiateClientReset };
42✔
1217
    auto seed_realm = [&harness, &subscribe_to_and_add_objects](RealmConfig config, ResetMode reset_mode) {
34✔
1218
        config.sync_config->error_handler = [path = config.path](std::shared_ptr<SyncSession>, SyncError err) {
13✔
1219
            // ignore spurious failures on this instance
1220
            util::format(std::cout, "spurious error while seeding a Realm at '%1': %2\n", path, err.status);
×
1221
        };
×
1222
        SharedRealm realm = Realm::get_shared_realm(config);
26✔
1223
        subscribe_to_and_add_objects(realm, 1);
26✔
1224
        auto subs = realm->get_latest_subscription_set();
26✔
1225
        auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
26✔
1226
        CHECK(result == sync::SubscriptionSet::State::Complete);
26!
1227
        if (reset_mode == ResetMode::InitiateClientReset) {
26✔
1228
            reset_utils::trigger_client_reset(harness.session().app_session(), realm);
18✔
1229
        }
18✔
1230
        realm->close();
26✔
1231
    };
26✔
1232

21✔
1233
    auto setup_reset_handlers_for_schema_validation =
42✔
1234
        [&before_reset_count, &after_reset_count](RealmConfig& config, Schema expected_schema) {
28✔
1235
            auto& sync_config = *config.sync_config;
14✔
1236
            sync_config.error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
7✔
1237
                FAIL(err.status);
×
1238
            };
×
1239
            sync_config.notify_before_client_reset = [&before_reset_count,
14✔
1240
                                                      expected = expected_schema](SharedRealm frozen_before) {
14✔
1241
                ++before_reset_count;
14✔
1242
                REQUIRE(frozen_before->schema().size() > 0);
14!
1243
                REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1244
                REQUIRE(frozen_before->schema() == expected);
14!
1245
            };
14✔
1246

7✔
1247
            auto [promise, future] = util::make_promise_future<void>();
14✔
1248
            sync_config.notify_after_client_reset =
14✔
1249
                [&after_reset_count, promise = util::CopyablePromiseHolder<void>(std::move(promise)), expected_schema,
14✔
1250
                 reset_mode = config.sync_config->client_resync_mode, has_schema = config.schema.has_value()](
14✔
1251
                    SharedRealm frozen_before, ThreadSafeReference after_ref, bool did_recover) mutable {
14✔
1252
                    ++after_reset_count;
14✔
1253
                    REQUIRE(frozen_before->schema().size() > 0);
14!
1254
                    REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
14!
1255
                    REQUIRE(frozen_before->schema() == expected_schema);
14!
1256
                    SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_dummy());
14✔
1257
                    if (!has_schema) {
14✔
1258
                        after->set_schema_subset(expected_schema);
4✔
1259
                    }
4✔
1260
                    REQUIRE(after);
14!
1261
                    REQUIRE(after->schema() == expected_schema);
14!
1262
                    // the above check is sufficient unless operator==() is changed to not care about ordering
7✔
1263
                    // so future proof that by explicitly checking the order of properties here as well
7✔
1264
                    REQUIRE(after->schema().size() == frozen_before->schema().size());
14!
1265
                    auto after_it = after->schema().find("TopLevel");
14✔
1266
                    auto before_it = frozen_before->schema().find("TopLevel");
14✔
1267
                    REQUIRE(after_it != after->schema().end());
14!
1268
                    REQUIRE(before_it != frozen_before->schema().end());
14!
1269
                    REQUIRE(after_it->name == before_it->name);
14!
1270
                    REQUIRE(after_it->persisted_properties.size() == before_it->persisted_properties.size());
14!
1271
                    REQUIRE(after_it->persisted_properties[1].name == "queryable_int_field");
14!
1272
                    REQUIRE(after_it->persisted_properties[2].name == "queryable_str_field");
14!
1273
                    REQUIRE(before_it->persisted_properties[1].name == "queryable_int_field");
14!
1274
                    REQUIRE(before_it->persisted_properties[2].name == "queryable_str_field");
14!
1275
                    REQUIRE(did_recover == (reset_mode == ClientResyncMode::Recover));
14!
1276
                    promise.get_promise().emplace_value();
14✔
1277
                };
14✔
1278
            return std::move(future); // move is not redundant here because of how destructing works
14✔
1279
        };
14✔
1280

21✔
1281
    SECTION("Recover: schema indexes match in before and after states") {
42✔
1282
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1283
        // reorder a property such that it does not match the on disk property order
1✔
1284
        std::vector<ObjectSchema> local_schema = schema;
2✔
1285
        std::swap(local_schema[0].persisted_properties[1], local_schema[0].persisted_properties[2]);
2✔
1286
        local_schema[0].persisted_properties.push_back(
2✔
1287
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
2✔
1288
        config_local.schema = local_schema;
2✔
1289
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1290
        auto future = setup_reset_handlers_for_schema_validation(config_local, local_schema);
2✔
1291
        SharedRealm realm = Realm::get_shared_realm(config_local);
2✔
1292
        future.get();
2✔
1293
        CHECK(before_reset_count == 1);
2!
1294
        CHECK(after_reset_count == 1);
2!
1295
    }
2✔
1296

21✔
1297
    SECTION("Adding a local property matching a server addition is allowed") {
42✔
1298
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1299
        config_local.sync_config->client_resync_mode = mode;
4✔
1300
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1301
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1302
        changed_schema[0].persisted_properties.push_back(
4✔
1303
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1304
        // In a separate Realm, make the property addition.
2✔
1305
        // Since this is dev mode, it will be added to the server's schema.
2✔
1306
        config_remote.schema = changed_schema;
4✔
1307
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1308
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1309
        config_local.schema = changed_schema;
4✔
1310
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1311

2✔
1312
        async_open_realm(config_local,
4✔
1313
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1314
                             REQUIRE(ref);
4!
1315
                             REQUIRE_FALSE(error);
4!
1316
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1317
                             fut.get();
4✔
1318
                             CHECK(before_reset_count == 1);
4!
1319
                             CHECK(after_reset_count == 1);
4!
1320
                         });
4✔
1321
    }
4✔
1322

21✔
1323
    SECTION("Adding a local property matching a server addition inside the before reset callback is allowed") {
42✔
1324
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
4✔
1325
        config_local.sync_config->client_resync_mode = mode;
4✔
1326
        seed_realm(config_local, ResetMode::InitiateClientReset);
4✔
1327
        std::vector<ObjectSchema> changed_schema = schema;
4✔
1328
        changed_schema[0].persisted_properties.push_back(
4✔
1329
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
4✔
1330
        // In a separate Realm, make the property addition.
2✔
1331
        // Since this is dev mode, it will be added to the server's schema.
2✔
1332
        config_remote.schema = changed_schema;
4✔
1333
        seed_realm(config_remote, ResetMode::NoReset);
4✔
1334
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
4✔
1335
        config_local.schema.reset();
4✔
1336
        config_local.sync_config->freeze_before_reset_realm = false;
4✔
1337
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
4✔
1338

2✔
1339
        auto notify_before = std::move(config_local.sync_config->notify_before_client_reset);
4✔
1340
        config_local.sync_config->notify_before_client_reset = [=](std::shared_ptr<Realm> realm) {
4✔
1341
            realm->update_schema(changed_schema);
4✔
1342
            notify_before(realm);
4✔
1343
        };
4✔
1344

2✔
1345
        auto notify_after = std::move(config_local.sync_config->notify_after_client_reset);
4✔
1346
        config_local.sync_config->notify_after_client_reset = [=](std::shared_ptr<Realm> before,
4✔
1347
                                                                  ThreadSafeReference after, bool did_recover) {
4✔
1348
            before->set_schema_subset(changed_schema);
4✔
1349
            notify_after(before, std::move(after), did_recover);
4✔
1350
        };
4✔
1351

2✔
1352
        async_open_realm(config_local,
4✔
1353
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
4✔
1354
                             REQUIRE(ref);
4!
1355
                             REQUIRE_FALSE(error);
4!
1356
                             auto realm = Realm::get_shared_realm(std::move(ref));
4✔
1357
                             fut.get();
4✔
1358
                             CHECK(before_reset_count == 1);
4!
1359
                             CHECK(after_reset_count == 1);
4!
1360
                         });
4✔
1361
    }
4✔
1362

21✔
1363
    auto make_additive_changes = [](std::vector<ObjectSchema> schema) {
24✔
1364
        schema[0].persisted_properties.push_back(
6✔
1365
            {"added_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
6✔
1366
        std::swap(schema[0].persisted_properties[1], schema[0].persisted_properties[2]);
6✔
1367
        schema.push_back({"AddedClass",
6✔
1368
                          {
6✔
1369
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
6✔
1370
                              {"str_field", PropertyType::String | PropertyType::Nullable},
6✔
1371
                          }});
6✔
1372
        return schema;
6✔
1373
    };
6✔
1374
    SECTION("Recover: additive schema changes are recovered in dev mode") {
42✔
1375
        const AppSession& app_session = harness.session().app_session();
2✔
1376
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
2✔
1377
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1378
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1379
        config_local.schema = changed_schema;
2✔
1380
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1381
        ThreadSafeReference ref_async;
2✔
1382
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1383
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1384
            REQUIRE(ref);
2!
1385
            REQUIRE_FALSE(error);
2!
1386
            ref_async = std::move(ref);
2✔
1387
        });
2✔
1388
        future.get();
2✔
1389
        CHECK(before_reset_count == 1);
2!
1390
        CHECK(after_reset_count == 1);
2!
1391
        {
2✔
1392
            auto realm = Realm::get_shared_realm(std::move(ref_async));
2✔
1393
            // make changes to the newly added property
1✔
1394
            realm->begin_transaction();
2✔
1395
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
1396
            ColKey new_col = table->get_column_key("added_oid_field");
2✔
1397
            REQUIRE(new_col);
2!
1398
            for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1399
                it->set(new_col, ObjectId::gen());
2✔
1400
            }
2✔
1401
            realm->commit_transaction();
2✔
1402
            // subscribe to the new Class and add an object
1✔
1403
            auto new_table = realm->read_group().get_table("class_AddedClass");
2✔
1404
            auto sub_set = realm->get_latest_subscription_set();
2✔
1405
            auto mut_sub = sub_set.make_mutable_copy();
2✔
1406
            mut_sub.insert_or_assign(Query(new_table));
2✔
1407
            mut_sub.commit();
2✔
1408
            realm->begin_transaction();
2✔
1409
            REQUIRE(new_table);
2!
1410
            new_table->create_object_with_primary_key(ObjectId::gen());
2✔
1411
            realm->commit_transaction();
2✔
1412
            auto result = realm->get_latest_subscription_set()
2✔
1413
                              .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
1414
                              .get();
2✔
1415
            CHECK(result == sync::SubscriptionSet::State::Complete);
2!
1416
            wait_for_advance(*realm);
2✔
1417
            realm->close();
2✔
1418
        }
2✔
1419
        {
2✔
1420
            // ensure that an additional schema change after the successful reset is also accepted by the server
1✔
1421
            changed_schema[0].persisted_properties.push_back(
2✔
1422
                {"added_oid_field_second", PropertyType::ObjectId | PropertyType::Nullable});
2✔
1423
            changed_schema.push_back({"AddedClassSecond",
2✔
1424
                                      {
2✔
1425
                                          {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
1426
                                          {"str_field_2", PropertyType::String | PropertyType::Nullable},
2✔
1427
                                      }});
2✔
1428
            config_local.schema = changed_schema;
2✔
1429
            async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1430
                REQUIRE(ref);
2!
1431
                REQUIRE_FALSE(error);
2!
1432
                auto realm = Realm::get_shared_realm(std::move(ref));
2✔
1433
                auto table = realm->read_group().get_table("class_AddedClassSecond");
2✔
1434
                ColKey new_col = table->get_column_key("str_field_2");
2✔
1435
                REQUIRE(new_col);
2!
1436
                auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
1437
                new_subs.insert_or_assign(Query(table).equal(new_col, "hello"));
2✔
1438
                auto subs = new_subs.commit();
2✔
1439
                realm->begin_transaction();
2✔
1440
                table->create_object_with_primary_key(Mixed{ObjectId::gen()}, {{new_col, "hello"}});
2✔
1441
                realm->commit_transaction();
2✔
1442
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1443
                wait_for_advance(*realm);
2✔
1444
                REQUIRE(table->size() == 1);
2!
1445
            });
2✔
1446
        }
2✔
1447
    }
2✔
1448

21✔
1449
    SECTION("DiscardLocal: additive schema changes not allowed") {
42✔
1450
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1451
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1452
        config_local.schema = changed_schema;
2✔
1453
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
2✔
1454
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1455
        config_local.sync_config->error_handler = err_handler;
2✔
1456
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1457
            REQUIRE(!ref);
2!
1458
            REQUIRE(error);
2!
1459
            REQUIRE_THROWS_CONTAINING(std::rethrow_exception(error),
2✔
1460
                                      "A fatal error occurred during client reset: 'Client reset cannot recover when "
2✔
1461
                                      "classes have been removed: {AddedClass}'");
2✔
1462
        });
2✔
1463
        error_future.get();
2✔
1464
        CHECK(before_reset_count == 1);
2!
1465
        CHECK(after_reset_count == 0);
2!
1466
    }
2✔
1467

21✔
1468
    SECTION("Recover: incompatible schema changes on async open are an error") {
42✔
1469
        seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1470
        std::vector<ObjectSchema> changed_schema = schema;
2✔
1471
        changed_schema[0].persisted_properties[0].type = PropertyType::UUID; // incompatible type change
2✔
1472
        config_local.schema = changed_schema;
2✔
1473
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1474
        auto&& [error_future, err_handler] = make_error_handler();
2✔
1475
        config_local.sync_config->error_handler = err_handler;
2✔
1476
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1477
            REQUIRE(!ref);
2!
1478
            REQUIRE(error);
2!
1479
            REQUIRE_THROWS_CONTAINING(
2✔
1480
                std::rethrow_exception(error),
2✔
1481
                "A fatal error occurred during client reset: 'The following changes cannot be "
2✔
1482
                "made in additive-only schema mode:\n"
2✔
1483
                "- Property 'TopLevel._id' has been changed from 'object id' to 'uuid'.\nIf your app is running in "
2✔
1484
                "development mode, you can delete the realm and restart the app to update your schema.'");
2✔
1485
        });
2✔
1486
        error_future.get();
2✔
1487
        CHECK(before_reset_count == 0); // we didn't even get this far because opening the frozen realm fails
2!
1488
        CHECK(after_reset_count == 0);
2!
1489
    }
2✔
1490

21✔
1491
    SECTION("Recover: additive schema changes without dev mode produce an error after client reset") {
42✔
1492
        const AppSession& app_session = harness.session().app_session();
2✔
1493
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
2✔
1494
        {
2✔
1495
            seed_realm(config_local, ResetMode::InitiateClientReset);
2✔
1496
            // Disable dev mode so that schema changes are not allowed
1✔
1497
            app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
2✔
1498
            std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
2✔
1499
            config_local.schema = changed_schema;
2✔
1500
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
1501
            (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema);
2✔
1502
            auto&& [error_future, err_handler] = make_error_handler();
2✔
1503
            config_local.sync_config->error_handler = err_handler;
2✔
1504
            async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
2✔
1505
                REQUIRE(ref);
2!
1506
                REQUIRE_FALSE(error);
2!
1507
                auto realm = Realm::get_shared_realm(std::move(ref));
2✔
1508
                // make changes to the new property
1✔
1509
                realm->begin_transaction();
2✔
1510
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
1511
                ColKey new_col = table->get_column_key("added_oid_field");
2✔
1512
                REQUIRE(new_col);
2!
1513
                for (auto it = table->begin(); it != table->end(); ++it) {
4✔
1514
                    it->set(new_col, ObjectId::gen());
2✔
1515
                }
2✔
1516
                realm->commit_transaction();
2✔
1517
            });
2✔
1518
            auto realm = Realm::get_shared_realm(config_local);
2✔
1519
            auto err = error_future.get();
2✔
1520
            std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding "
2✔
1521
                                       "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", "
2✔
1522
                                       "schema changes from clients are restricted when developer mode is disabled";
2✔
1523
            std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema "
2✔
1524
                                    "for Realm table \"AddedClass\", schema changes from clients are restricted when "
2✔
1525
                                    "developer mode is disabled";
2✔
1526
            REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) ||
2✔
1527
                                                  Catch::Matchers::ContainsSubstring(class_err));
2✔
1528
            CHECK(before_reset_count == 1);
2!
1529
            CHECK(after_reset_count == 1);
2!
1530
        }
2✔
1531
    }
2✔
1532

21✔
1533
    // the previous section turns off dev mode, undo that now for later tests
21✔
1534
    const AppSession& app_session = harness.session().app_session();
42✔
1535
    app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
42✔
1536
}
42✔
1537

1538
TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") {
2✔
1539
    FLXSyncTestHarness harness("flx_bad_query", {g_simple_embedded_obj_schema, {"queryable_str_field"}});
2✔
1540
    harness.do_with_new_user([&](auto user) {
2✔
1541
        SyncTestFile config(user, harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
1542
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
1543
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
2✔
1544
        config.sync_config->error_handler = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>,
2✔
1545
                                                                                        SyncError err) {
1✔
1546
            CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1547
            error_promise->emplace_value(std::move(err));
×
1548
        };
×
1549

1✔
1550
        auto realm = Realm::get_shared_realm(config);
2✔
1551
        CppContext c(realm);
2✔
1552
        realm->begin_transaction();
2✔
1553
        REQUIRE_THROWS_AS(
2✔
1554
            Object::create(c, realm, "TopLevel",
2✔
1555
                           std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_str_field", "foo"s}})),
2✔
1556
            NoSubscriptionForWrite);
2✔
1557
        realm->cancel_transaction();
2✔
1558

1✔
1559
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
1560

1✔
1561
        REQUIRE(table->is_empty());
2!
1562
        auto col_key = table->get_column_key("queryable_str_field");
2✔
1563
        {
2✔
1564
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
1565
            new_subs.insert_or_assign(Query(table).equal(col_key, "foo"));
2✔
1566
            auto subs = new_subs.commit();
2✔
1567
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1568
        }
2✔
1569

1✔
1570
        realm->begin_transaction();
2✔
1571
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1572
                                  std::any(AnyDict{{"_id", ObjectId::gen()},
2✔
1573
                                                   {"queryable_str_field", "foo"s},
2✔
1574
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1575
        realm->commit_transaction();
2✔
1576

1✔
1577
        realm->begin_transaction();
2✔
1578
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1579
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1580
        realm->commit_transaction();
2✔
1581

1✔
1582
        wait_for_upload(*realm);
2✔
1583
        wait_for_download(*realm);
2✔
1584
    });
2✔
1585
}
2✔
1586

1587
TEST_CASE("flx: uploading an object that is out-of-view results in compensating write",
1588
          "[sync][flx][compensating write][baas]") {
16✔
1589
    static std::optional<FLXSyncTestHarness> harness;
16✔
1590
    if (!harness) {
16✔
1591
        Schema schema{{"TopLevel",
2✔
1592
                       {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
1593
                        {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1594
                        {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
2✔
1595
                      {"TopLevel_embedded_obj",
2✔
1596
                       ObjectSchema::ObjectType::Embedded,
2✔
1597
                       {{"str_field", PropertyType::String | PropertyType::Nullable}}},
2✔
1598
                      {"Int PK",
2✔
1599
                       {
2✔
1600
                           {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1601
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1602
                       }},
2✔
1603
                      {"String PK",
2✔
1604
                       {
2✔
1605
                           {"_id", PropertyType::String, Property::IsPrimary{true}},
2✔
1606
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1607
                       }},
2✔
1608
                      {"UUID PK",
2✔
1609
                       {
2✔
1610
                           {"_id", PropertyType::UUID, Property::IsPrimary{true}},
2✔
1611
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
1612
                       }}};
2✔
1613

1✔
1614
        AppCreateConfig::ServiceRole role;
2✔
1615
        role.name = "compensating_write_perms";
2✔
1616

1✔
1617
        AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
1618
        doc_filters.read = true;
2✔
1619
        doc_filters.write = {{"queryable_str_field", {{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
1620
        role.document_filters = doc_filters;
2✔
1621

1✔
1622
        role.insert_filter = true;
2✔
1623
        role.delete_filter = true;
2✔
1624
        role.read = true;
2✔
1625
        role.write = true;
2✔
1626
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}, {role}};
2✔
1627
        harness.emplace("flx_bad_query", server_schema);
2✔
1628
    }
2✔
1629

8✔
1630
    create_user_and_log_in(harness->app());
16✔
1631
    auto user = harness->app()->current_user();
16✔
1632

8✔
1633
    auto make_error_handler = [] {
16✔
1634
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
16✔
1635
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
16✔
1636
        auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
15✔
1637
            if (!error_promise) {
14✔
1638
                util::format(std::cerr,
×
1639
                             "An unexpected sync error was caught by the default SyncTestFile handler: '%1'\n",
×
1640
                             err.status);
×
1641
                abort();
×
1642
            }
×
1643
            error_promise->emplace_value(std::move(err));
14✔
1644
            error_promise.reset();
14✔
1645
        };
14✔
1646

8✔
1647
        return std::make_pair(std::move(error_future), std::move(fn));
16✔
1648
    };
16✔
1649

8✔
1650
    auto validate_sync_error = [&](const SyncError& sync_error, Mixed expected_pk, const char* expected_object_name,
16✔
1651
                                   const std::string& error_msg_fragment) {
15✔
1652
        CHECK(sync_error.status == ErrorCodes::SyncCompensatingWrite);
14!
1653
        CHECK(!sync_error.is_client_reset_requested());
14!
1654
        CHECK(sync_error.compensating_writes_info.size() == 1);
14!
1655
        CHECK(sync_error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
14!
1656
        auto write_info = sync_error.compensating_writes_info[0];
14✔
1657
        CHECK(write_info.primary_key == expected_pk);
14!
1658
        CHECK(write_info.object_name == expected_object_name);
14!
1659
        CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring(error_msg_fragment));
14✔
1660
    };
14✔
1661

8✔
1662
    SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
1663
    auto&& [error_future, err_handler] = make_error_handler();
16✔
1664
    config.sync_config->error_handler = err_handler;
16✔
1665
    auto realm = Realm::get_shared_realm(config);
16✔
1666
    auto table = realm->read_group().get_table("class_TopLevel");
16✔
1667

8✔
1668
    auto create_subscription = [&](StringData table_name, auto make_query) {
15✔
1669
        auto table = realm->read_group().get_table(table_name);
14✔
1670
        auto queryable_str_field = table->get_column_key("queryable_str_field");
14✔
1671
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
14✔
1672
        new_query.insert_or_assign(make_query(Query(table), queryable_str_field));
14✔
1673
        new_query.commit();
14✔
1674
    };
14✔
1675

8✔
1676
    SECTION("compensating write because of permission violation") {
16✔
1677
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1678
            return q.equal(col, "bizz");
2✔
1679
        });
2✔
1680

1✔
1681
        CppContext c(realm);
2✔
1682
        realm->begin_transaction();
2✔
1683
        auto invalid_obj = ObjectId::gen();
2✔
1684
        Object::create(c, realm, "TopLevel",
2✔
1685
                       std::any(AnyDict{{"_id", invalid_obj}, {"queryable_str_field", "bizz"s}}));
2✔
1686
        realm->commit_transaction();
2✔
1687

1✔
1688
        validate_sync_error(
2✔
1689
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1690
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1691

1✔
1692
        wait_for_advance(*realm);
2✔
1693

1✔
1694
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1695
        REQUIRE(top_level_table->is_empty());
2!
1696
    }
2✔
1697

8✔
1698
    SECTION("compensating write because of permission violation with write on embedded object") {
16✔
1699
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1700
            return q.equal(col, "bizz").Or().equal(col, "foo");
2✔
1701
        });
2✔
1702

1✔
1703
        CppContext c(realm);
2✔
1704
        realm->begin_transaction();
2✔
1705
        auto invalid_obj = ObjectId::gen();
2✔
1706
        auto obj = Object::create(c, realm, "TopLevel",
2✔
1707
                                  std::any(AnyDict{{"_id", invalid_obj},
2✔
1708
                                                   {"queryable_str_field", "foo"s},
2✔
1709
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
2✔
1710
        realm->commit_transaction();
2✔
1711
        realm->begin_transaction();
2✔
1712
        obj.set_property_value(c, "queryable_str_field", std::any{"bizz"s});
2✔
1713
        realm->commit_transaction();
2✔
1714
        realm->begin_transaction();
2✔
1715
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1716
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1717
        realm->commit_transaction();
2✔
1718

1✔
1719
        validate_sync_error(
2✔
1720
            std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1721
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", invalid_obj.to_string()));
2✔
1722

1✔
1723
        wait_for_advance(*realm);
2✔
1724

1✔
1725
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1726
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1727
        REQUIRE(util::any_cast<std::string&&>(obj.get_property_value<std::any>(c, "queryable_str_field")) == "foo");
2!
1728
        REQUIRE(util::any_cast<std::string&&>(embedded_obj.get_property_value<std::any>(c, "str_field")) == "bar");
2!
1729

1✔
1730
        realm->begin_transaction();
2✔
1731
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
2✔
1732
        realm->commit_transaction();
2✔
1733

1✔
1734
        wait_for_upload(*realm);
2✔
1735
        wait_for_download(*realm);
2✔
1736

1✔
1737
        wait_for_advance(*realm);
2✔
1738
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
2✔
1739
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
2✔
1740
        REQUIRE(embedded_obj.get_column_value<StringData>("str_field") == "baz");
2!
1741
    }
2✔
1742

8✔
1743
    SECTION("compensating write for writing a top-level object that is out-of-view") {
16✔
1744
        create_subscription("class_TopLevel", [](auto q, auto col) {
2✔
1745
            return q.equal(col, "foo");
2✔
1746
        });
2✔
1747

1✔
1748
        CppContext c(realm);
2✔
1749
        realm->begin_transaction();
2✔
1750
        auto valid_obj = ObjectId::gen();
2✔
1751
        auto invalid_obj = ObjectId::gen();
2✔
1752
        Object::create(c, realm, "TopLevel",
2✔
1753
                       std::any(AnyDict{
2✔
1754
                           {"_id", valid_obj},
2✔
1755
                           {"queryable_str_field", "foo"s},
2✔
1756
                       }));
2✔
1757
        Object::create(c, realm, "TopLevel",
2✔
1758
                       std::any(AnyDict{
2✔
1759
                           {"_id", invalid_obj},
2✔
1760
                           {"queryable_str_field", "bar"s},
2✔
1761
                       }));
2✔
1762
        realm->commit_transaction();
2✔
1763

1✔
1764
        validate_sync_error(std::move(error_future).get(), invalid_obj, "TopLevel",
2✔
1765
                            "object is outside of the current query view");
2✔
1766

1✔
1767
        wait_for_advance(*realm);
2✔
1768

1✔
1769
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
1770
        REQUIRE(top_level_table->size() == 1);
2!
1771
        REQUIRE(top_level_table->get_object_with_primary_key(valid_obj));
2!
1772

1✔
1773
        // Verify that a valid object afterwards does not produce an error
1✔
1774
        realm->begin_transaction();
2✔
1775
        Object::create(c, realm, "TopLevel",
2✔
1776
                       std::any(AnyDict{
2✔
1777
                           {"_id", ObjectId::gen()},
2✔
1778
                           {"queryable_str_field", "foo"s},
2✔
1779
                       }));
2✔
1780
        realm->commit_transaction();
2✔
1781

1✔
1782
        wait_for_upload(*realm);
2✔
1783
        wait_for_download(*realm);
2✔
1784
    }
2✔
1785

8✔
1786
    SECTION("compensating writes for each primary key type") {
16✔
1787
        SECTION("int") {
8✔
1788
            create_subscription("class_Int PK", [](auto q, auto col) {
2✔
1789
                return q.equal(col, "foo");
2✔
1790
            });
2✔
1791
            realm->begin_transaction();
2✔
1792
            realm->read_group().get_table("class_Int PK")->create_object_with_primary_key(123456);
2✔
1793
            realm->commit_transaction();
2✔
1794

1✔
1795
            validate_sync_error(std::move(error_future).get(), 123456, "Int PK",
2✔
1796
                                "write to 123456 in table \"Int PK\" not allowed");
2✔
1797
        }
2✔
1798

4✔
1799
        SECTION("short string") {
8✔
1800
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1801
                return q.equal(col, "foo");
2✔
1802
            });
2✔
1803
            realm->begin_transaction();
2✔
1804
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key("short");
2✔
1805
            realm->commit_transaction();
2✔
1806

1✔
1807
            validate_sync_error(std::move(error_future).get(), "short", "String PK",
2✔
1808
                                "write to \"short\" in table \"String PK\" not allowed");
2✔
1809
        }
2✔
1810

4✔
1811
        SECTION("long string") {
8✔
1812
            create_subscription("class_String PK", [](auto q, auto col) {
2✔
1813
                return q.equal(col, "foo");
2✔
1814
            });
2✔
1815
            realm->begin_transaction();
2✔
1816
            const char* pk = "long string which won't fit in the SSO buffer";
2✔
1817
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key(pk);
2✔
1818
            realm->commit_transaction();
2✔
1819

1✔
1820
            validate_sync_error(std::move(error_future).get(), pk, "String PK",
2✔
1821
                                util::format("write to \"%1\" in table \"String PK\" not allowed", pk));
2✔
1822
        }
2✔
1823

4✔
1824
        SECTION("uuid") {
8✔
1825
            create_subscription("class_UUID PK", [](auto q, auto col) {
2✔
1826
                return q.equal(col, "foo");
2✔
1827
            });
2✔
1828
            realm->begin_transaction();
2✔
1829
            UUID pk("01234567-9abc-4def-9012-3456789abcde");
2✔
1830
            realm->read_group().get_table("class_UUID PK")->create_object_with_primary_key(pk);
2✔
1831
            realm->commit_transaction();
2✔
1832

1✔
1833
            validate_sync_error(std::move(error_future).get(), pk, "UUID PK",
2✔
1834
                                util::format("write to UUID(%1) in table \"UUID PK\" not allowed", pk));
2✔
1835
        }
2✔
1836
    }
8✔
1837

8✔
1838
    // Clear the Realm afterwards as we're reusing an app
8✔
1839
    realm->begin_transaction();
16✔
1840
    table->clear();
16✔
1841
    realm->commit_transaction();
16✔
1842
    wait_for_upload(*realm);
16✔
1843
    realm.reset();
16✔
1844

8✔
1845
    // Add new sections before this
8✔
1846
    SECTION("teardown") {
16✔
1847
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1848
        harness.reset();
2✔
1849
    }
2✔
1850
}
16✔
1851

1852
TEST_CASE("flx: query on non-queryable field results in query error message", "[sync][flx][query][baas]") {
6✔
1853
    static std::optional<FLXSyncTestHarness> harness;
6✔
1854
    if (!harness) {
6✔
1855
        harness.emplace("flx_bad_query");
2✔
1856
    }
2✔
1857

3✔
1858
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
8✔
1859
        auto table = realm->read_group().get_table(table_name);
8✔
1860
        auto queryable_field = table->get_column_key(column_name);
8✔
1861
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
8✔
1862
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
8✔
1863
        return new_query.commit();
8✔
1864
    };
8✔
1865

3✔
1866
    auto check_status = [](auto status) {
6✔
1867
        CHECK(!status.is_ok());
6!
1868
        std::string reason = status.get_status().reason();
6✔
1869
        // Depending on the version of baas used, it may return 'Invalid query:' or
3✔
1870
        // 'Client provided query with bad syntax:'
3✔
1871
        if ((reason.find("Invalid query:") == std::string::npos &&
6✔
1872
             reason.find("Client provided query with bad syntax:") == std::string::npos) ||
3!
1873
            reason.find("\"TopLevel\": key \"non_queryable_field\" is not a queryable field") == std::string::npos) {
6✔
1874
            FAIL(reason);
×
1875
        }
×
1876
    };
6✔
1877

3✔
1878
    SECTION("Good query after bad query") {
6✔
1879
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1880
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1881
                return q.equal(c, "bar");
2✔
1882
            });
2✔
1883
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1884
            check_status(sub_res);
2✔
1885

1✔
1886
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1887
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1888

1✔
1889
            subs = create_subscription(realm, "class_TopLevel", "queryable_str_field", [](auto q, auto c) {
2✔
1890
                return q.equal(c, "foo");
2✔
1891
            });
2✔
1892
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
1893

1✔
1894
            CHECK(realm->get_active_subscription_set().version() == 2);
2!
1895
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1896
        });
2✔
1897
    }
2✔
1898

3✔
1899
    SECTION("Bad query after bad query") {
6✔
1900
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1901
            auto sync_session = realm->sync_session();
2✔
1902
            sync_session->pause();
2✔
1903

1✔
1904
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1905
                return q.equal(c, "bar");
2✔
1906
            });
2✔
1907
            auto subs2 = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
2✔
1908
                return q.equal(c, "bar");
2✔
1909
            });
2✔
1910

1✔
1911
            sync_session->resume();
2✔
1912

1✔
1913
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1914
            auto sub_res2 =
2✔
1915
                subs2.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1916

1✔
1917
            check_status(sub_res);
2✔
1918
            check_status(sub_res2);
2✔
1919

1✔
1920
            CHECK(realm->get_active_subscription_set().version() == 0);
2!
1921
            CHECK(realm->get_latest_subscription_set().version() == 2);
2!
1922
        });
2✔
1923
    }
2✔
1924

3✔
1925
    // Add new sections before this
3✔
1926
    SECTION("teardown") {
6✔
1927
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
1928
        harness.reset();
2✔
1929
    }
2✔
1930
}
6✔
1931

1932
#if REALM_ENABLE_GEOSPATIAL
1933
TEST_CASE("flx: geospatial", "[sync][flx][geospatial][baas]") {
6✔
1934
    static std::optional<FLXSyncTestHarness> harness;
6✔
1935
    if (!harness) {
6✔
1936
        Schema schema{
2✔
1937
            {"restaurant",
2✔
1938
             {
2✔
1939
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
1940
                 {"queryable_str_field", PropertyType::String},
2✔
1941
                 {"location", PropertyType::Object | PropertyType::Nullable, "geoPointType"},
2✔
1942
                 {"array", PropertyType::Object | PropertyType::Array, "geoPointType"},
2✔
1943
             }},
2✔
1944
            {"geoPointType",
2✔
1945
             ObjectSchema::ObjectType::Embedded,
2✔
1946
             {
2✔
1947
                 {"type", PropertyType::String},
2✔
1948
                 {"coordinates", PropertyType::Double | PropertyType::Array},
2✔
1949
             }},
2✔
1950
        };
2✔
1951
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field", "location"}};
2✔
1952
        harness.emplace("flx_geospatial", server_schema);
2✔
1953
    }
2✔
1954

3✔
1955
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
18✔
1956
        auto table = realm->read_group().get_table(table_name);
18✔
1957
        auto queryable_field = table->get_column_key(column_name);
18✔
1958
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
18✔
1959
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
18✔
1960
        return new_query.commit();
18✔
1961
    };
18✔
1962

3✔
1963
    SECTION("Server supports a basic geowithin FLX query") {
6✔
1964
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
1965
            const realm::AppSession& app_session = harness->session().app_session();
2✔
1966
            auto sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
2✔
1967

1✔
1968
            AdminAPISession::ServiceConfig config =
2✔
1969
                app_session.admin_api.get_config(app_session.server_app_id, sync_service);
2✔
1970
            auto subs = create_subscription(realm, "class_restaurant", "location", [](Query q, ColKey c) {
2✔
1971
                GeoBox area{GeoPoint{0.2, 0.2}, GeoPoint{0.7, 0.7}};
2✔
1972
                Query query = q.get_table()->column<Link>(c).geo_within(area);
2✔
1973
                std::string ser = query.get_description();
2✔
1974
                return query;
2✔
1975
            });
2✔
1976
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
1977
            CHECK(sub_res.is_ok());
2!
1978
            CHECK(realm->get_active_subscription_set().version() == 1);
2!
1979
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
1980
        });
2✔
1981
    }
2✔
1982

3✔
1983
    SECTION("geospatial query consistency: local/server/FLX") {
6✔
1984
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
2✔
1985
            SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
1986
            auto error_pf = util::make_promise_future<SyncError>();
2✔
1987
            config.sync_config->error_handler =
2✔
1988
                [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
2✔
1989
                    std::shared_ptr<SyncSession>, SyncError error) {
2✔
1990
                    promise->emplace_value(std::move(error));
2✔
1991
                };
2✔
1992

1✔
1993
            auto realm = Realm::get_shared_realm(config);
2✔
1994

1✔
1995
            auto subs = create_subscription(realm, "class_restaurant", "queryable_str_field", [](Query q, ColKey c) {
2✔
1996
                return q.equal(c, "synced");
2✔
1997
            });
2✔
1998
            auto make_polygon_filter = [&](const GeoPolygon& polygon) -> bson::BsonDocument {
20✔
1999
                bson::BsonArray inner{};
20✔
2000
                REALM_ASSERT_3(polygon.points.size(), ==, 1);
20✔
2001
                for (auto& point : polygon.points[0]) {
94✔
2002
                    inner.push_back(bson::BsonArray{point.longitude, point.latitude});
94✔
2003
                }
94✔
2004
                bson::BsonArray coords;
20✔
2005
                coords.push_back(inner);
20✔
2006
                bson::BsonDocument geo_bson{{{"type", "Polygon"}, {"coordinates", coords}}};
20✔
2007
                bson::BsonDocument filter{
20✔
2008
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$geometry", geo_bson}}}}}};
20✔
2009
                return filter;
20✔
2010
            };
20✔
2011
            auto make_circle_filter = [&](const GeoCircle& circle) -> bson::BsonDocument {
6✔
2012
                bson::BsonArray coords{circle.center.longitude, circle.center.latitude};
6✔
2013
                bson::BsonArray inner;
6✔
2014
                inner.push_back(coords);
6✔
2015
                inner.push_back(circle.radius_radians);
6✔
2016
                bson::BsonDocument filter{
6✔
2017
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$centerSphere", inner}}}}}};
6✔
2018
                return filter;
6✔
2019
            };
6✔
2020
            auto run_query_on_server = [&](const bson::BsonDocument& filter,
2✔
2021
                                           std::optional<std::string> expected_error = {}) -> size_t {
26✔
2022
                auto remote_client = harness->app()->current_user()->mongo_client("BackingDB");
26✔
2023
                auto db = remote_client.db(harness->session().app_session().config.mongo_dbname);
26✔
2024
                auto restaurant_collection = db["restaurant"];
26✔
2025
                bool processed = false;
26✔
2026
                constexpr int64_t limit = 1000;
26✔
2027
                size_t matches = 0;
26✔
2028
                restaurant_collection.count(filter, limit, [&](uint64_t count, util::Optional<AppError> error) {
26✔
2029
                    processed = true;
26✔
2030
                    if (error) {
26✔
2031
                        if (!expected_error) {
12✔
2032
                            util::format(std::cout, "query error: %1\n", error->reason());
×
2033
                            FAIL(error);
×
2034
                        }
×
2035
                        else {
12✔
2036
                            std::string reason = std::string(error->reason());
12✔
2037
                            std::transform(reason.begin(), reason.end(), reason.begin(), toLowerAscii);
12✔
2038
                            std::transform(expected_error->begin(), expected_error->end(), expected_error->begin(),
12✔
2039
                                           toLowerAscii);
12✔
2040
                            auto pos = reason.find(*expected_error);
12✔
2041
                            if (pos == std::string::npos) {
12✔
2042
                                util::format(std::cout, "mismatch error: '%1' and '%2'\n", reason, *expected_error);
×
2043
                                FAIL(reason);
×
2044
                            }
×
2045
                        }
12✔
2046
                    }
12✔
2047
                    matches = size_t(count);
26✔
2048
                });
26✔
2049
                REQUIRE(processed);
26!
2050
                return matches;
26✔
2051
            };
26✔
2052
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
2✔
2053
            CHECK(sub_res.is_ok());
2!
2054
            CHECK(realm->get_active_subscription_set().version() == 1);
2!
2055
            CHECK(realm->get_latest_subscription_set().version() == 1);
2!
2056

1✔
2057
            CppContext c(realm);
2✔
2058
            int64_t pk = 0;
2✔
2059
            auto add_point = [&](GeoPoint p) {
16✔
2060
                Object::create(
16✔
2061
                    c, realm, "restaurant",
16✔
2062
                    std::any(AnyDict{
16✔
2063
                        {"_id", ++pk},
16✔
2064
                        {"queryable_str_field", "synced"s},
16✔
2065
                        {"location", AnyDict{{"type", "Point"s},
16✔
2066
                                             {"coordinates", std::vector<std::any>{p.longitude, p.latitude}}}}}));
16✔
2067
            };
16✔
2068
            std::vector<GeoPoint> points = {
2✔
2069
                GeoPoint{-74.006, 40.712800000000001},            // New York city
2✔
2070
                GeoPoint{12.568300000000001, 55.676099999999998}, // Copenhagen
2✔
2071
                GeoPoint{12.082599999999999, 55.628},             // ragnarok, Roskilde
2✔
2072
                GeoPoint{-180.1, -90.1},                          // invalid
2✔
2073
                GeoPoint{0, 90},                                  // north pole
2✔
2074
                GeoPoint{-82.68193, 84.74653},                    // northern point that falls within a box later
2✔
2075
                GeoPoint{82.55243, 84.54981}, // another northern point, but on the other side of the pole
2✔
2076
                GeoPoint{2129, 89},           // invalid
2✔
2077
            };
2✔
2078
            constexpr size_t invalids_to_be_compensated = 2; // 4, 8
2✔
2079
            realm->begin_transaction();
2✔
2080
            for (auto& point : points) {
16✔
2081
                add_point(point);
16✔
2082
            }
16✔
2083
            realm->commit_transaction();
2✔
2084
            const auto& error = error_pf.future.get();
2✔
2085
            REQUIRE(!error.is_fatal);
2!
2086
            REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
2!
2087
            REQUIRE(error.compensating_writes_info.size() == invalids_to_be_compensated);
2!
2088
            REQUIRE_THAT(error.compensating_writes_info[0].reason,
2✔
2089
                         Catch::Matchers::ContainsSubstring("in table \"restaurant\" will corrupt geojson data"));
2✔
2090
            REQUIRE_THAT(error.compensating_writes_info[1].reason,
2✔
2091
                         Catch::Matchers::ContainsSubstring("in table \"restaurant\" will corrupt geojson data"));
2✔
2092

1✔
2093
            {
2✔
2094
                auto table = realm->read_group().get_table("class_restaurant");
2✔
2095
                CHECK(table->size() == points.size());
2!
2096
                Obj obj = table->get_object_with_primary_key(Mixed{1});
2✔
2097
                REQUIRE(obj);
2!
2098
                Geospatial geo = obj.get<Geospatial>("location");
2✔
2099
                REQUIRE(geo.get_type_string() == "Point");
2!
2100
                REQUIRE(geo.get_type() == Geospatial::Type::Point);
2!
2101
                GeoPoint point = geo.get<GeoPoint>();
2✔
2102
                REQUIRE(point.longitude == points[0].longitude);
2!
2103
                REQUIRE(point.latitude == points[0].latitude);
2!
2104
                REQUIRE(!point.get_altitude());
2!
2105
                ColKey location_col = table->get_column_key("location");
2✔
2106
                auto run_query_locally = [&table, &location_col](Geospatial bounds) -> size_t {
26✔
2107
                    Query query = table->column<Link>(location_col).geo_within(Geospatial(bounds));
26✔
2108
                    return query.find_all().size();
26✔
2109
                };
26✔
2110
                auto run_query_as_flx = [&](Geospatial bounds) -> size_t {
14✔
2111
                    size_t num_objects = 0;
14✔
2112
                    harness->do_with_new_realm([&](SharedRealm realm) {
14✔
2113
                        auto subs =
14✔
2114
                            create_subscription(realm, "class_restaurant", "location", [&](Query q, ColKey c) {
14✔
2115
                                return q.get_table()->column<Link>(c).geo_within(Geospatial(bounds));
14✔
2116
                            });
14✔
2117
                        auto sub_res =
14✔
2118
                            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
14✔
2119
                        CHECK(sub_res.is_ok());
14!
2120
                        CHECK(realm->get_active_subscription_set().version() == 1);
14!
2121
                        realm->refresh();
14✔
2122
                        num_objects = realm->get_class("restaurant").num_objects();
14✔
2123
                    });
14✔
2124
                    return num_objects;
14✔
2125
                };
14✔
2126

1✔
2127
                reset_utils::wait_for_num_objects_in_atlas(harness->app()->current_user(),
2✔
2128
                                                           harness->session().app_session(), "restaurant",
2✔
2129
                                                           points.size() - invalids_to_be_compensated);
2✔
2130

1✔
2131
                {
2✔
2132
                    GeoPolygon bounds{
2✔
2133
                        {{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}, GeoPoint{-80, 40.7128}}}};
2✔
2134
                    size_t local_matches = run_query_locally(bounds);
2✔
2135
                    size_t server_results = run_query_on_server(make_polygon_filter(bounds));
2✔
2136
                    size_t flx_results = run_query_as_flx(bounds);
2✔
2137
                    CHECK(flx_results == local_matches);
2!
2138
                    CHECK(server_results == local_matches);
2!
2139
                }
2✔
2140
                {
2✔
2141
                    GeoCircle circle{.5, GeoPoint{0, 90}};
2✔
2142
                    size_t local_matches = run_query_locally(circle);
2✔
2143
                    size_t server_results = run_query_on_server(make_circle_filter(circle));
2✔
2144
                    size_t flx_results = run_query_as_flx(circle);
2✔
2145
                    CHECK(server_results == local_matches);
2!
2146
                    CHECK(flx_results == local_matches);
2!
2147
                }
2✔
2148
                { // a ring with 3 points without a matching begin/end is an error
2✔
2149
                    GeoPolygon open_bounds{{{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}}}};
2✔
2150
                    CHECK_THROWS_WITH(run_query_locally(open_bounds),
2✔
2151
                                      "Invalid region in GEOWITHIN query for parameter 'GeoPolygon({[-80, 40.7128], "
2✔
2152
                                      "[20, 60], [20, 20]})': 'Ring is not closed, first vertex 'GeoPoint([-80, "
2✔
2153
                                      "40.7128])' does not equal last vertex 'GeoPoint([20, 20])''");
2✔
2154
                    run_query_on_server(make_polygon_filter(open_bounds), "(BadValue) Loop is not closed");
2✔
2155
                }
2✔
2156
                {
2✔
2157
                    GeoCircle circle = GeoCircle::from_kms(10, GeoPoint{-180.1, -90.1});
2✔
2158
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
2159
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([-180.1, -90.1], "
2✔
2160
                                      "0.00156787)': 'Longitude/latitude is out of bounds, lng: -180.1 lat: -90.1'");
2✔
2161
                    run_query_on_server(make_circle_filter(circle), "(BadValue) longitude/latitude is out of bounds");
2✔
2162
                }
2✔
2163
                {
2✔
2164
                    GeoCircle circle = GeoCircle::from_kms(-1, GeoPoint{0, 0});
2✔
2165
                    CHECK_THROWS_WITH(run_query_locally(circle),
2✔
2166
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([0, 0], "
2✔
2167
                                      "-0.000156787)': 'The radius of a circle must be a non-negative number'");
2✔
2168
                    run_query_on_server(make_circle_filter(circle),
2✔
2169
                                        "(BadValue) radius must be a non-negative number");
2✔
2170
                }
2✔
2171
                {
2✔
2172
                    // This box is from Gershøj to CPH airport. It includes CPH and Ragnarok but not NYC.
1✔
2173
                    std::vector<Geospatial> valid_box_variations = {
2✔
2174
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
2175
                               GeoPoint{12.64773, 55.61211}}, // Gershøj, CPH Airport (Top Left, Bottom Right)
2✔
2176
                        GeoBox{GeoPoint{12.64773, 55.61211},
2✔
2177
                               GeoPoint{11.97575, 55.71601}}, // CPH Airport, Gershøj (Bottom Right, Top Left)
2✔
2178
                        GeoBox{GeoPoint{12.64773, 55.71601},
2✔
2179
                               GeoPoint{11.97575, 55.61211}}, // Upper Right, Bottom Left
2✔
2180
                        GeoBox{GeoPoint{11.97575, 55.61211},
2✔
2181
                               GeoPoint{12.64773, 55.71601}}, // Bottom Left, Upper Right
2✔
2182
                    };
2✔
2183
                    constexpr size_t expected_results = 2;
2✔
2184
                    for (auto& geo : valid_box_variations) {
8✔
2185
                        size_t local_matches = run_query_locally(geo);
8✔
2186
                        size_t server_matches =
8✔
2187
                            run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()));
8✔
2188
                        size_t flx_matches = run_query_as_flx(geo);
8✔
2189
                        CHECK(local_matches == expected_results);
8!
2190
                        CHECK(server_matches == expected_results);
8!
2191
                        CHECK(flx_matches == expected_results);
8!
2192
                    }
8✔
2193
                    std::vector<Geospatial> invalid_boxes = {
2✔
2194
                        GeoBox{GeoPoint{11.97575, 55.71601}, GeoPoint{11.97575, 55.71601}}, // same point twice
2✔
2195
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
2196
                               GeoPoint{11.97575, 57.0}}, // two points on the same longitude
2✔
2197
                        GeoBox{GeoPoint{11.97575, 55.71601},
2✔
2198
                               GeoPoint{12, 55.71601}}, // two points on the same latitude
2✔
2199
                    };
2✔
2200
                    for (auto& geo : invalid_boxes) {
6✔
2201
                        REQUIRE_THROWS_CONTAINING(run_query_locally(geo),
6✔
2202
                                                  "Invalid region in GEOWITHIN query for parameter 'GeoPolygon");
6✔
2203
                        run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()),
6✔
2204
                                            "(BadValue) Loop must have at least 3 different vertices");
6✔
2205
                    }
6✔
2206
                }
2✔
2207
                { // a box region that wraps the north pole. It contains the north pole point
2✔
2208
                    // and two others, one each on distinct sides of the globe.
1✔
2209
                    constexpr double lat = 82.83799;
2✔
2210
                    Geospatial north_pole_box =
2✔
2211
                        GeoPolygon{{{GeoPoint{-78.33951, lat}, GeoPoint{-90.33951, lat}, GeoPoint{90.33951, lat},
2✔
2212
                                     GeoPoint{78.33951, lat}, GeoPoint{-78.33951, lat}}}};
2✔
2213
                    constexpr size_t num_matching_points = 3;
2✔
2214
                    size_t local_matches = run_query_locally(north_pole_box);
2✔
2215
                    size_t server_matches =
2✔
2216
                        run_query_on_server(make_polygon_filter(north_pole_box.get<GeoPolygon>()));
2✔
2217
                    size_t flx_matches = run_query_as_flx(north_pole_box);
2✔
2218
                    CHECK(local_matches == num_matching_points);
2!
2219
                    CHECK(server_matches == num_matching_points);
2!
2220
                    CHECK(flx_matches == num_matching_points);
2!
2221
                }
2✔
2222
            }
2✔
2223
        });
2✔
2224
    }
2✔
2225

3✔
2226
    // Add new sections before this
3✔
2227
    SECTION("teardown") {
6✔
2228
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
2229
        harness.reset();
2✔
2230
    }
2✔
2231
}
6✔
2232
#endif // REALM_ENABLE_GEOSPATIAL
2233

2234
TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][flx][bootstrap][baas]") {
2✔
2235
    FLXSyncTestHarness harness("flx_bootstrap_reconnect", {g_large_array_schema, {"queryable_int_field"}});
2✔
2236

1✔
2237
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
2238
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
2239
                                          SyncConfig::FLXSyncEnabled{});
2✔
2240
    interrupted_realm_config.cache = false;
2✔
2241

1✔
2242
    {
2✔
2243
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2244
        Realm::Config config = interrupted_realm_config;
2✔
2245
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2246
        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2247
        config.sync_config->on_sync_client_event_hook =
2✔
2248
            [promise = std::move(shared_promise), seen_version_one = false](std::weak_ptr<SyncSession> weak_session,
2✔
2249
                                                                            const SyncClientHookData& data) mutable {
18✔
2250
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
2251
                    return SyncClientHookAction::NoAction;
10✔
2252
                }
10✔
2253

4✔
2254
                auto session = weak_session.lock();
8✔
2255
                if (!session) {
8✔
2256
                    return SyncClientHookAction::NoAction;
×
2257
                }
×
2258

4✔
2259
                // If we haven't seen at least one download message for query version 1, then do nothing yet.
4✔
2260
                if (data.query_version == 0 || (data.query_version == 1 && !std::exchange(seen_version_one, true))) {
8✔
2261
                    return SyncClientHookAction::NoAction;
6✔
2262
                }
6✔
2263

1✔
2264
                REQUIRE(data.query_version == 1);
2!
2265
                REQUIRE(data.batch_state == sync::DownloadBatchState::MoreToCome);
2!
2266
                auto latest_subs = session->get_flx_subscription_store()->get_latest();
2✔
2267
                REQUIRE(latest_subs.version() == 1);
2!
2268
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2269

1✔
2270
                session->close();
2✔
2271
                promise->emplace_value();
2✔
2272

1✔
2273
                return SyncClientHookAction::TriggerReconnect;
2✔
2274
            };
2✔
2275

1✔
2276
        auto realm = Realm::get_shared_realm(config);
2✔
2277
        {
2✔
2278
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2279
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2280
            mut_subs.insert_or_assign(Query(table));
2✔
2281
            mut_subs.commit();
2✔
2282
        }
2✔
2283

1✔
2284
        interrupted.get();
2✔
2285
        realm->sync_session()->shutdown_and_wait();
2✔
2286
    }
2✔
2287

1✔
2288
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
2289

1✔
2290
    {
2✔
2291
        DBOptions options;
2✔
2292
        options.encryption_key = test_util::crypt_key();
2✔
2293
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2294
        auto sub_store = sync::SubscriptionStore::create(realm);
2✔
2295
        auto version_info = sub_store->get_version_info();
2✔
2296
        REQUIRE(version_info.active == 0);
2!
2297
        REQUIRE(version_info.latest == 1);
2!
2298
        auto latest_subs = sub_store->get_latest();
2✔
2299
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
2!
2300
        REQUIRE(latest_subs.size() == 1);
2!
2301
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
2302
    }
2✔
2303

1✔
2304
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2305
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2306
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2307
    wait_for_advance(*realm);
2✔
2308
    REQUIRE(table->size() == obj_ids_at_end.size());
2!
2309
    for (auto& id : obj_ids_at_end) {
10✔
2310
        REQUIRE(table->find_primary_key(Mixed{id}));
10!
2311
    }
10✔
2312

1✔
2313
    auto active_subs = realm->get_active_subscription_set();
2✔
2314
    auto latest_subs = realm->get_latest_subscription_set();
2✔
2315
    REQUIRE(active_subs.version() == latest_subs.version());
2!
2316
    REQUIRE(active_subs.version() == int64_t(1));
2!
2317
}
2✔
2318

2319
TEST_CASE("flx: dev mode uploads schema before query change", "[sync][flx][query][baas]") {
2✔
2320
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
2321
    auto default_schema = FLXSyncTestHarness::default_server_schema();
2✔
2322
    server_schema.queryable_fields = default_schema.queryable_fields;
2✔
2323
    server_schema.dev_mode_enabled = true;
2✔
2324
    server_schema.schema = Schema{};
2✔
2325

1✔
2326
    FLXSyncTestHarness harness("flx_dev_mode", server_schema);
2✔
2327
    auto foo_obj_id = ObjectId::gen();
2✔
2328
    auto bar_obj_id = ObjectId::gen();
2✔
2329
    harness.do_with_new_realm(
2✔
2330
        [&](SharedRealm realm) {
2✔
2331
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2332
            // auto queryable_str_field = table->get_column_key("queryable_str_field");
1✔
2333
            // auto queryable_int_field = table->get_column_key("queryable_int_field");
1✔
2334
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2335
            new_query.insert_or_assign(Query(table));
2✔
2336
            new_query.commit();
2✔
2337

1✔
2338
            CppContext c(realm);
2✔
2339
            realm->begin_transaction();
2✔
2340
            Object::create(c, realm, "TopLevel",
2✔
2341
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
2342
                                            {"queryable_str_field", "foo"s},
2✔
2343
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2344
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
2345
            Object::create(c, realm, "TopLevel",
2✔
2346
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
2347
                                            {"queryable_str_field", "bar"s},
2✔
2348
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2349
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
2350
            realm->commit_transaction();
2✔
2351

1✔
2352
            wait_for_upload(*realm);
2✔
2353
        },
2✔
2354
        default_schema.schema);
2✔
2355

1✔
2356
    harness.do_with_new_realm(
2✔
2357
        [&](SharedRealm realm) {
2✔
2358
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
2359
            auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2360
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2361
            new_query.insert_or_assign(Query(table).greater_equal(queryable_int_field, int64_t(5)));
2✔
2362
            auto subs = new_query.commit();
2✔
2363
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2364
            wait_for_download(*realm);
2✔
2365
            Results results(realm, table);
2✔
2366

1✔
2367
            realm->refresh();
2✔
2368
            CHECK(results.size() == 2);
2!
2369
            CHECK(table->get_object_with_primary_key({foo_obj_id}).is_valid());
2!
2370
            CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2371
        },
2✔
2372
        default_schema.schema);
2✔
2373
}
2✔
2374

2375
// This is a test case for the server's fix for RCORE-969
2376
TEST_CASE("flx: change-of-query history divergence", "[sync][flx][query][baas]") {
2✔
2377
    FLXSyncTestHarness harness("flx_coq_divergence");
2✔
2378

1✔
2379
    // first we create an object on the server and upload it.
1✔
2380
    auto foo_obj_id = ObjectId::gen();
2✔
2381
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2382
        CppContext c(realm);
2✔
2383
        Object::create(c, realm, "TopLevel",
2✔
2384
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2385
                                        {"queryable_str_field", "foo"s},
2✔
2386
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2387
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
2388
    });
2✔
2389

1✔
2390
    // Now create another realm and wait for it to be fully synchronized with bootstrap version zero. i.e.
1✔
2391
    // our progress counters should be past the history entry containing the object created above.
1✔
2392
    auto test_file_config = harness.make_test_file();
2✔
2393
    auto realm = Realm::get_shared_realm(test_file_config);
2✔
2394
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
2395
    auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2396

1✔
2397
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2398
    wait_for_upload(*realm);
2✔
2399
    wait_for_download(*realm);
2✔
2400

1✔
2401
    // Now disconnect the sync session
1✔
2402
    realm->sync_session()->pause();
2✔
2403

1✔
2404
    // And move the "foo" object created above into view and create a different diverging copy of it locally.
1✔
2405
    auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2406
    mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2407
    auto subs = mut_subs.commit();
2✔
2408

1✔
2409
    realm->begin_transaction();
2✔
2410
    CppContext c(realm);
2✔
2411
    Object::create(c, realm, "TopLevel",
2✔
2412
                   std::any(AnyDict{{"_id", foo_obj_id},
2✔
2413
                                    {"queryable_str_field", "foo"s},
2✔
2414
                                    {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2415
                                    {"non_queryable_field", "created locally"s}}));
2✔
2416
    realm->commit_transaction();
2✔
2417

1✔
2418
    // Reconnect the sync session and wait for the subscription that moved "foo" into view to be fully synchronized.
1✔
2419
    realm->sync_session()->resume();
2✔
2420
    wait_for_upload(*realm);
2✔
2421
    wait_for_download(*realm);
2✔
2422
    subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2423

1✔
2424
    wait_for_advance(*realm);
2✔
2425

1✔
2426
    // The bootstrap should have erase/re-created our object and we should have the version from the server
1✔
2427
    // locally.
1✔
2428
    auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2429
    REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2430
    REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2431

1✔
2432
    // Likewise, if we create a new realm and download all the objects, we should see the initial server version
1✔
2433
    // in the new realm rather than the "created locally" one.
1✔
2434
    harness.load_initial_data([&](SharedRealm realm) {
2✔
2435
        CppContext c(realm);
2✔
2436

1✔
2437
        auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
2✔
2438
        REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
2!
2439
        REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
2!
2440
    });
2✔
2441
}
2✔
2442

2443
TEST_CASE("flx: writes work offline", "[sync][flx][baas]") {
2✔
2444
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2445

1✔
2446
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2447
        auto sync_session = realm->sync_session();
2✔
2448
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2449
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2450
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2451
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2452
        new_query.insert_or_assign(Query(table));
2✔
2453
        new_query.commit();
2✔
2454

1✔
2455
        auto foo_obj_id = ObjectId::gen();
2✔
2456
        auto bar_obj_id = ObjectId::gen();
2✔
2457

1✔
2458
        CppContext c(realm);
2✔
2459
        realm->begin_transaction();
2✔
2460
        Object::create(c, realm, "TopLevel",
2✔
2461
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2462
                                        {"queryable_str_field", "foo"s},
2✔
2463
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2464
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2465
        Object::create(c, realm, "TopLevel",
2✔
2466
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2467
                                        {"queryable_str_field", "bar"s},
2✔
2468
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2469
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2470
        realm->commit_transaction();
2✔
2471

1✔
2472
        wait_for_upload(*realm);
2✔
2473
        wait_for_download(*realm);
2✔
2474
        sync_session->pause();
2✔
2475

1✔
2476
        // Make it so the subscriptions only match the "foo" object
1✔
2477
        {
2✔
2478
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2479
            mut_subs.clear();
2✔
2480
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2481
            mut_subs.commit();
2✔
2482
        }
2✔
2483

1✔
2484
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2485
        // multiple subscription set updates offline and that the last one eventually takes effect when
1✔
2486
        // you come back online and fully synchronize.
1✔
2487
        {
2✔
2488
            Results results(realm, table);
2✔
2489
            realm->begin_transaction();
2✔
2490
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2491
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2492
            realm->commit_transaction();
2✔
2493
        }
2✔
2494

1✔
2495
        // Update our subscriptions so that both foo/bar will be included
1✔
2496
        {
2✔
2497
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2498
            mut_subs.clear();
2✔
2499
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2500
            mut_subs.commit();
2✔
2501
        }
2✔
2502

1✔
2503
        // Make foo out of view for the current subscription.
1✔
2504
        {
2✔
2505
            Results results(realm, table);
2✔
2506
            realm->begin_transaction();
2✔
2507
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2508
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2509
            realm->commit_transaction();
2✔
2510
        }
2✔
2511

1✔
2512
        sync_session->resume();
2✔
2513
        wait_for_upload(*realm);
2✔
2514
        wait_for_download(*realm);
2✔
2515

1✔
2516
        realm->refresh();
2✔
2517
        Results results(realm, table);
2✔
2518
        CHECK(results.size() == 1);
2!
2519
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2520
    });
2✔
2521
}
2✔
2522

2523
TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") {
2✔
2524
    FLXSyncTestHarness harness("flx_offline_writes");
2✔
2525

1✔
2526
    harness.do_with_new_realm([&](SharedRealm realm) {
2✔
2527
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2528
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
2529
        auto queryable_int_field = table->get_column_key("queryable_int_field");
2✔
2530
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2531
        new_query.insert_or_assign(Query(table));
2✔
2532
        new_query.commit();
2✔
2533

1✔
2534
        auto foo_obj_id = ObjectId::gen();
2✔
2535
        auto bar_obj_id = ObjectId::gen();
2✔
2536

1✔
2537
        CppContext c(realm);
2✔
2538
        realm->begin_transaction();
2✔
2539
        Object::create(c, realm, "TopLevel",
2✔
2540
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
2541
                                        {"queryable_str_field", "foo"s},
2✔
2542
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
2543
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
2544
        Object::create(c, realm, "TopLevel",
2✔
2545
                       std::any(AnyDict{{"_id", bar_obj_id},
2✔
2546
                                        {"queryable_str_field", "bar"s},
2✔
2547
                                        {"queryable_int_field", static_cast<int64_t>(10)},
2✔
2548
                                        {"non_queryable_field", "non queryable 2"s}}));
2✔
2549
        realm->commit_transaction();
2✔
2550

1✔
2551
        wait_for_upload(*realm);
2✔
2552

1✔
2553
        // Make it so the subscriptions only match the "foo" object
1✔
2554
        {
2✔
2555
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2556
            mut_subs.clear();
2✔
2557
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
2✔
2558
            mut_subs.commit();
2✔
2559
        }
2✔
2560

1✔
2561
        // Make foo so that it will match the next subscription update. This checks whether you can do
1✔
2562
        // multiple subscription set updates without waiting and that the last one eventually takes effect when
1✔
2563
        // you fully synchronize.
1✔
2564
        {
2✔
2565
            Results results(realm, table);
2✔
2566
            realm->begin_transaction();
2✔
2567
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2568
            foo_obj.set<int64_t>(queryable_int_field, 15);
2✔
2569
            realm->commit_transaction();
2✔
2570
        }
2✔
2571

1✔
2572
        // Update our subscriptions so that both foo/bar will be included
1✔
2573
        {
2✔
2574
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2575
            mut_subs.clear();
2✔
2576
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
2✔
2577
            mut_subs.commit();
2✔
2578
        }
2✔
2579

1✔
2580
        // Make foo out-of-view for the current subscription.
1✔
2581
        {
2✔
2582
            Results results(realm, table);
2✔
2583
            realm->begin_transaction();
2✔
2584
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
2✔
2585
            foo_obj.set<int64_t>(queryable_int_field, 0);
2✔
2586
            realm->commit_transaction();
2✔
2587
        }
2✔
2588

1✔
2589
        wait_for_upload(*realm);
2✔
2590
        wait_for_download(*realm);
2✔
2591

1✔
2592
        realm->refresh();
2✔
2593
        Results results(realm, table);
2✔
2594
        CHECK(results.size() == 1);
2!
2595
        Obj obj = results.get(0);
2✔
2596
        CHECK(obj.get_primary_key().get_object_id() == bar_obj_id);
2!
2597
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
2!
2598
    });
2✔
2599
}
2✔
2600

2601
TEST_CASE("flx: verify websocket protocol number and prefixes", "[sync][protocol]") {
2✔
2602
    // Update the expected value whenever the protocol version is updated - this ensures
1✔
2603
    // that the current protocol version does not change unexpectedly.
1✔
2604
    REQUIRE(10 == sync::get_current_protocol_version());
2✔
2605
    // This was updated in Protocol V8 to use '#' instead of '/' to support the Web SDK
1✔
2606
    REQUIRE("com.mongodb.realm-sync#" == sync::get_pbs_websocket_protocol_prefix());
2✔
2607
    REQUIRE("com.mongodb.realm-query-sync#" == sync::get_flx_websocket_protocol_prefix());
2✔
2608
}
2✔
2609

2610
// TODO: remote-baas: This test fails consistently with Windows remote baas server - to be fixed in RCORE-1674
2611
#ifndef _WIN32
2612
TEST_CASE("flx: subscriptions persist after closing/reopening", "[sync][flx][baas]") {
2✔
2613
    FLXSyncTestHarness harness("flx_bad_query");
2✔
2614
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2615

1✔
2616
    {
2✔
2617
        auto orig_realm = Realm::get_shared_realm(config);
2✔
2618
        auto mut_subs = orig_realm->get_latest_subscription_set().make_mutable_copy();
2✔
2619
        mut_subs.insert_or_assign(Query(orig_realm->read_group().get_table("class_TopLevel")));
2✔
2620
        mut_subs.commit();
2✔
2621
        orig_realm->close();
2✔
2622
    }
2✔
2623

1✔
2624
    {
2✔
2625
        auto new_realm = Realm::get_shared_realm(config);
2✔
2626
        auto latest_subs = new_realm->get_latest_subscription_set();
2✔
2627
        CHECK(latest_subs.size() == 1);
2!
2628
        latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
2629
    }
2✔
2630
}
2✔
2631
#endif
2632

2633
TEST_CASE("flx: no subscription store created for PBS app", "[sync][flx][baas]") {
2✔
2634
    auto server_app_config = minimal_app_config("flx_connect_as_pbs", g_minimal_schema);
2✔
2635
    TestAppSession session(create_app(server_app_config));
2✔
2636
    SyncTestFile config(session.app(), bson::Bson{}, g_minimal_schema);
2✔
2637

1✔
2638
    auto realm = Realm::get_shared_realm(config);
2✔
2639
    CHECK(!wait_for_download(*realm));
2!
2640
    CHECK(!wait_for_upload(*realm));
2!
2641

1✔
2642
    CHECK(!realm->sync_session()->get_flx_subscription_store());
2!
2643

1✔
2644
    CHECK_THROWS_AS(realm->get_active_subscription_set(), IllegalOperation);
2✔
2645
    CHECK_THROWS_AS(realm->get_latest_subscription_set(), IllegalOperation);
2✔
2646
}
2✔
2647

2648
TEST_CASE("flx: connect to FLX as PBS returns an error", "[sync][flx][baas]") {
2✔
2649
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2650
    SyncTestFile config(harness.app(), bson::Bson{}, harness.schema());
2✔
2651
    std::mutex sync_error_mutex;
2✔
2652
    util::Optional<SyncError> sync_error;
2✔
2653
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2654
        std::lock_guard<std::mutex> lk(sync_error_mutex);
2✔
2655
        sync_error = std::move(error);
2✔
2656
    };
2✔
2657
    auto realm = Realm::get_shared_realm(config);
2✔
2658
    timed_wait_for([&] {
3,205✔
2659
        std::lock_guard<std::mutex> lk(sync_error_mutex);
3,205✔
2660
        return static_cast<bool>(sync_error);
3,205✔
2661
    });
3,205✔
2662

1✔
2663
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2664
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2665
}
2✔
2666

2667
TEST_CASE("flx: connect to FLX with partition value returns an error", "[sync][flx][protocol][baas]") {
2✔
2668
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
2✔
2669
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2670
    config.sync_config->partition_value = "\"foobar\"";
2✔
2671

1✔
2672
    REQUIRE_EXCEPTION(Realm::get_shared_realm(config), IllegalCombination,
2✔
2673
                      "Cannot specify a partition value when flexible sync is enabled");
2✔
2674
}
2✔
2675

2676
TEST_CASE("flx: connect to PBS as FLX returns an error", "[sync][flx][protocol][baas]") {
2✔
2677
    auto server_app_config = minimal_app_config("flx_connect_as_pbs", g_minimal_schema);
2✔
2678
    TestAppSession session(create_app(server_app_config));
2✔
2679
    auto app = session.app();
2✔
2680
    auto user = app->current_user();
2✔
2681

1✔
2682
    SyncTestFile config(user, g_minimal_schema, SyncConfig::FLXSyncEnabled{});
2✔
2683

1✔
2684
    std::mutex sync_error_mutex;
2✔
2685
    util::Optional<SyncError> sync_error;
2✔
2686
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
2✔
2687
        std::lock_guard lk(sync_error_mutex);
2✔
2688
        sync_error = std::move(error);
2✔
2689
    };
2✔
2690
    auto realm = Realm::get_shared_realm(config);
2✔
2691
    timed_wait_for([&] {
10,611✔
2692
        std::lock_guard lk(sync_error_mutex);
10,611✔
2693
        return static_cast<bool>(sync_error);
10,611✔
2694
    });
10,611✔
2695

1✔
2696
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
2!
2697
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
2!
2698
}
2✔
2699

2700
TEST_CASE("flx: commit subscription while refreshing the access token", "[sync][flx][token][baas]") {
2✔
2701
    class HookedTransport : public SynchronousTestTransport {
2✔
2702
    public:
2✔
2703
        void send_request_to_server(const Request& request,
2✔
2704
                                    util::UniqueFunction<void(const Response&)>&& completion) override
2✔
2705
        {
10✔
2706
            if (request_hook) {
10✔
2707
                request_hook(request);
2✔
2708
            }
2✔
2709
            SynchronousTestTransport::send_request_to_server(request, std::move(completion));
10✔
2710
        }
10✔
2711
        util::UniqueFunction<void(const Request&)> request_hook;
2✔
2712
    };
2✔
2713

1✔
2714
    auto transport = std::make_shared<HookedTransport>();
2✔
2715
    FLXSyncTestHarness harness("flx_wait_access_token2", FLXSyncTestHarness::default_server_schema(), transport);
2✔
2716
    auto app = harness.app();
2✔
2717
    std::shared_ptr<SyncUser> user = app->current_user();
2✔
2718
    REQUIRE(user);
2!
2719
    REQUIRE(!user->access_token_refresh_required());
2!
2720
    // Set a bad access token, with an expired time. This will trigger a refresh initiated by the client.
1✔
2721
    std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
2✔
2722
    using namespace std::chrono_literals;
2✔
2723
    auto expires = std::chrono::system_clock::to_time_t(now - 30s);
2✔
2724
    user->update_access_token(encode_fake_jwt("fake_access_token", expires));
2✔
2725
    REQUIRE(user->access_token_refresh_required());
2!
2726

1✔
2727
    bool seen_waiting_for_access_token = false;
2✔
2728
    // Commit a subcription set while there is no sync session.
1✔
2729
    // A session is created when the access token is refreshed.
1✔
2730
    transport->request_hook = [&](const Request&) {
2✔
2731
        auto user = app->current_user();
2✔
2732
        REQUIRE(user);
2!
2733
        for (auto& session : user->all_sessions()) {
2✔
2734
            if (session->state() == SyncSession::State::WaitingForAccessToken) {
2✔
2735
                REQUIRE(!seen_waiting_for_access_token);
2!
2736
                seen_waiting_for_access_token = true;
2✔
2737

1✔
2738
                auto store = session->get_flx_subscription_store();
2✔
2739
                REQUIRE(store);
2!
2740
                auto mut_subs = store->get_latest().make_mutable_copy();
2✔
2741
                mut_subs.commit();
2✔
2742
            }
2✔
2743
        }
2✔
2744
    };
2✔
2745
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
2746
    // This triggers the token refresh.
1✔
2747
    auto r = Realm::get_shared_realm(config);
2✔
2748
    REQUIRE(seen_waiting_for_access_token);
2!
2749
}
2✔
2750

2751
TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][bootstrap][baas]") {
6✔
2752
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
6✔
2753

3✔
2754
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
6✔
2755
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
6✔
2756
                                          SyncConfig::FLXSyncEnabled{});
6✔
2757
    interrupted_realm_config.cache = false;
6✔
2758

3✔
2759
    auto check_interrupted_state = [&](const DBRef& realm) {
6✔
2760
        auto tr = realm->start_read();
6✔
2761
        auto top_level = tr->get_table("class_TopLevel");
6✔
2762
        REQUIRE(top_level);
6!
2763
        REQUIRE(top_level->is_empty());
6!
2764

3✔
2765
        auto sub_store = sync::SubscriptionStore::create(realm);
6✔
2766
        auto version_info = sub_store->get_version_info();
6✔
2767
        REQUIRE(version_info.latest == 1);
6!
2768
        REQUIRE(version_info.active == 0);
6!
2769
        auto latest_subs = sub_store->get_latest();
6✔
2770
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
6!
2771
        REQUIRE(latest_subs.size() == 1);
6!
2772
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
6!
2773
    };
6✔
2774

3✔
2775
    auto mutate_realm = [&] {
5✔
2776
        harness.load_initial_data([&](SharedRealm realm) {
4✔
2777
            auto table = realm->read_group().get_table("class_TopLevel");
4✔
2778
            Results res(realm, Query(table).greater(table->get_column_key("queryable_int_field"), int64_t(10)));
4✔
2779
            REQUIRE(res.size() == 2);
4!
2780
            res.clear();
4✔
2781
        });
4✔
2782
    };
4✔
2783

3✔
2784
    SECTION("exception occurs during bootstrap application") {
6✔
2785
        {
2✔
2786
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2787
            Realm::Config config = interrupted_realm_config;
2✔
2788
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2789
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2790
            config.sync_config->on_sync_client_event_hook =
2✔
2791
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2792
                                                      const SyncClientHookData& data) mutable {
36✔
2793
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2794
                        return SyncClientHookAction::NoAction;
22✔
2795
                    }
22✔
2796
                    auto session = weak_session.lock();
14✔
2797
                    if (!session) {
14✔
2798
                        return SyncClientHookAction::NoAction;
×
2799
                    }
×
2800

7✔
2801
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2802
                        session->close();
2✔
2803
                        promise->emplace_value();
2✔
2804
                        return SyncClientHookAction::EarlyReturn;
2✔
2805
                    }
2✔
2806
                    return SyncClientHookAction::NoAction;
12✔
2807
                };
12✔
2808
            auto realm = Realm::get_shared_realm(config);
2✔
2809
            {
2✔
2810
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2811
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2812
                mut_subs.insert_or_assign(Query(table));
2✔
2813
                mut_subs.commit();
2✔
2814
            }
2✔
2815

1✔
2816
            interrupted.get();
2✔
2817
            realm->sync_session()->shutdown_and_wait();
2✔
2818
            realm->close();
2✔
2819
        }
2✔
2820

1✔
2821
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2822

1✔
2823
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2824
        // we expected it to be in.
1✔
2825
        {
2✔
2826
            DBOptions options;
2✔
2827
            options.encryption_key = test_util::crypt_key();
2✔
2828
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2829
            auto logger = util::Logger::get_default_logger();
2✔
2830
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2831
            REQUIRE(bootstrap_store.has_pending());
2!
2832
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2833
            REQUIRE(pending_batch.query_version == 1);
2!
2834
            REQUIRE(pending_batch.progress);
2!
2835

1✔
2836
            check_interrupted_state(realm);
2✔
2837
        }
2✔
2838

1✔
2839
        interrupted_realm_config.sync_config->simulate_integration_error = true;
2✔
2840
        auto error_pf = util::make_promise_future<SyncError>();
2✔
2841
        interrupted_realm_config.sync_config->error_handler =
2✔
2842
            [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
2✔
2843
                std::shared_ptr<SyncSession>, SyncError error) {
2✔
2844
                promise->emplace_value(std::move(error));
2✔
2845
            };
2✔
2846

1✔
2847
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2848
        const auto& error = error_pf.future.get();
2✔
2849
        REQUIRE(error.is_fatal);
2!
2850
        REQUIRE(error.status == ErrorCodes::BadChangeset);
2!
2851
    }
2✔
2852

3✔
2853
    SECTION("interrupted before final bootstrap message") {
6✔
2854
        {
2✔
2855
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2856
            Realm::Config config = interrupted_realm_config;
2✔
2857
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2858
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2859
            config.sync_config->on_sync_client_event_hook =
2✔
2860
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2861
                                                      const SyncClientHookData& data) mutable {
16✔
2862
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
16✔
2863
                        return SyncClientHookAction::NoAction;
12✔
2864
                    }
12✔
2865
                    auto session = weak_session.lock();
4✔
2866
                    if (!session) {
4✔
2867
                        return SyncClientHookAction::NoAction;
×
2868
                    }
×
2869

2✔
2870
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::MoreToCome) {
4✔
2871
                        session->force_close();
2✔
2872
                        promise->emplace_value();
2✔
2873
                        return SyncClientHookAction::TriggerReconnect;
2✔
2874
                    }
2✔
2875
                    return SyncClientHookAction::NoAction;
2✔
2876
                };
2✔
2877
            auto realm = Realm::get_shared_realm(config);
2✔
2878
            {
2✔
2879
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2880
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2881
                mut_subs.insert_or_assign(Query(table));
2✔
2882
                mut_subs.commit();
2✔
2883
            }
2✔
2884

1✔
2885
            interrupted.get();
2✔
2886
            realm->sync_session()->shutdown_and_wait();
2✔
2887
            realm->close();
2✔
2888
        }
2✔
2889

1✔
2890
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2891

1✔
2892
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2893
        // we expected it to be in.
1✔
2894
        {
2✔
2895
            DBOptions options;
2✔
2896
            options.encryption_key = test_util::crypt_key();
2✔
2897
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2898
            auto logger = util::Logger::get_default_logger();
2✔
2899
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2900
            REQUIRE(bootstrap_store.has_pending());
2!
2901
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2902
            REQUIRE(pending_batch.query_version == 1);
2!
2903
            REQUIRE(!pending_batch.progress);
2!
2904
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2905
            REQUIRE(pending_batch.changesets.size() == 1);
2!
2906

1✔
2907
            check_interrupted_state(realm);
2✔
2908
        }
2✔
2909

1✔
2910
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2911
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2912
        mutate_realm();
2✔
2913

1✔
2914
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
2915
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
2916
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
2917
        realm->get_latest_subscription_set()
2✔
2918
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
2919
            .get();
2✔
2920

1✔
2921
        wait_for_advance(*realm);
2✔
2922
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
2923

1✔
2924
        REQUIRE(table->size() == expected_obj_ids.size());
2!
2925
        for (auto& id : expected_obj_ids) {
6✔
2926
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
2927
        }
6✔
2928
    }
2✔
2929

3✔
2930
    SECTION("interrupted after final bootstrap message before processing") {
6✔
2931
        {
2✔
2932
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
2933
            Realm::Config config = interrupted_realm_config;
2✔
2934
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
2935
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
2936
            config.sync_config->on_sync_client_event_hook =
2✔
2937
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2938
                                                      const SyncClientHookData& data) mutable {
36✔
2939
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
36✔
2940
                        return SyncClientHookAction::NoAction;
22✔
2941
                    }
22✔
2942
                    auto session = weak_session.lock();
14✔
2943
                    if (!session) {
14✔
2944
                        return SyncClientHookAction::NoAction;
×
2945
                    }
×
2946

7✔
2947
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
14✔
2948
                        session->force_close();
2✔
2949
                        promise->emplace_value();
2✔
2950
                        return SyncClientHookAction::TriggerReconnect;
2✔
2951
                    }
2✔
2952
                    return SyncClientHookAction::NoAction;
12✔
2953
                };
12✔
2954
            auto realm = Realm::get_shared_realm(config);
2✔
2955
            {
2✔
2956
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
2957
                auto table = realm->read_group().get_table("class_TopLevel");
2✔
2958
                mut_subs.insert_or_assign(Query(table));
2✔
2959
                mut_subs.commit();
2✔
2960
            }
2✔
2961

1✔
2962
            interrupted.get();
2✔
2963
            realm->sync_session()->shutdown_and_wait();
2✔
2964
            realm->close();
2✔
2965
        }
2✔
2966

1✔
2967
        _impl::RealmCoordinator::assert_no_open_realms();
2✔
2968

1✔
2969
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
1✔
2970
        // we expected it to be in.
1✔
2971
        {
2✔
2972
            DBOptions options;
2✔
2973
            options.encryption_key = test_util::crypt_key();
2✔
2974
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
2975
            auto logger = util::Logger::get_default_logger();
2✔
2976
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
2✔
2977
            REQUIRE(bootstrap_store.has_pending());
2!
2978
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
2✔
2979
            REQUIRE(pending_batch.query_version == 1);
2!
2980
            REQUIRE(static_cast<bool>(pending_batch.progress));
2!
2981
            REQUIRE(pending_batch.remaining_changesets == 0);
2!
2982
            REQUIRE(pending_batch.changesets.size() == 6);
2!
2983

1✔
2984
            check_interrupted_state(realm);
2✔
2985
        }
2✔
2986

1✔
2987
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
1✔
2988
        // if the bootstrap batches weren't being cached until lastInBatch were true.
1✔
2989
        mutate_realm();
2✔
2990

1✔
2991
        auto [saw_valid_state_promise, saw_valid_state_future] = util::make_promise_future<void>();
2✔
2992
        auto shared_saw_valid_state_promise =
2✔
2993
            std::make_shared<decltype(saw_valid_state_promise)>(std::move(saw_valid_state_promise));
2✔
2994
        // This hook will let us check what the state of the realm is before it's integrated any new download
1✔
2995
        // messages from the server. This should be the full 5 object bootstrap that was received before we
1✔
2996
        // called mutate_realm().
1✔
2997
        interrupted_realm_config.sync_config->on_sync_client_event_hook =
2✔
2998
            [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr<SyncSession> weak_session,
2✔
2999
                                                                     const SyncClientHookData& data) {
18✔
3000
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
18✔
3001
                    return SyncClientHookAction::NoAction;
16✔
3002
                }
16✔
3003
                auto session = weak_session.lock();
2✔
3004
                if (!session) {
2✔
3005
                    return SyncClientHookAction::NoAction;
×
3006
                }
×
3007

1✔
3008
                if (data.query_version != 1 || data.batch_state == sync::DownloadBatchState::MoreToCome) {
2✔
3009
                    return SyncClientHookAction::NoAction;
×
3010
                }
×
3011

1✔
3012
                auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
2✔
3013
                auto active_sub_set = session->get_flx_subscription_store()->get_active();
2✔
3014
                auto version_info = session->get_flx_subscription_store()->get_version_info();
2✔
3015
                REQUIRE(version_info.pending_mark == active_sub_set.version());
2!
3016
                REQUIRE(version_info.active == active_sub_set.version());
2!
3017
                REQUIRE(version_info.latest == latest_sub_set.version());
2!
3018
                REQUIRE(latest_sub_set.version() == active_sub_set.version());
2!
3019
                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3020

1✔
3021
                auto db = SyncSession::OnlyForTesting::get_db(*session);
2✔
3022
                auto tr = db->start_read();
2✔
3023

1✔
3024
                auto table = tr->get_table("class_TopLevel");
2✔
3025
                REQUIRE(table->size() == obj_ids_at_end.size());
2!
3026
                for (auto& id : obj_ids_at_end) {
10✔
3027
                    REQUIRE(table->find_primary_key(Mixed{id}));
10!
3028
                }
10✔
3029

1✔
3030
                promise->emplace_value();
2✔
3031
                return SyncClientHookAction::NoAction;
2✔
3032
            };
2✔
3033

1✔
3034
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
1✔
3035
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
3036
        saw_valid_state_future.get();
2✔
3037
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
3038
        realm->get_latest_subscription_set()
2✔
3039
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
2✔
3040
            .get();
2✔
3041

1✔
3042
        wait_for_advance(*realm);
2✔
3043
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
2✔
3044

1✔
3045
        // After we've downloaded all the mutations there should only by 3 objects left.
1✔
3046
        REQUIRE(table->size() == expected_obj_ids.size());
2!
3047
        for (auto& id : expected_obj_ids) {
6✔
3048
            REQUIRE(table->find_primary_key(Mixed{id}));
6!
3049
        }
6✔
3050
    }
2✔
3051
}
6✔
3052

3053
// Check that a document with the given id is present and has the expected fields
3054
static void check_document(const std::vector<bson::BsonDocument>& documents, ObjectId id,
3055
                           std::initializer_list<std::pair<const char*, bson::Bson>> fields)
3056
{
428✔
3057
    auto it = std::find_if(documents.begin(), documents.end(), [&](auto&& doc) {
43,098✔
3058
        auto it = doc.entries().find("_id");
43,098✔
3059
        REQUIRE(it != doc.entries().end());
43,098!
3060
        return it->second == id;
43,098✔
3061
    });
43,098✔
3062
    REQUIRE(it != documents.end());
428!
3063
    auto& doc = it->entries();
428✔
3064
    for (auto& [name, expected_value] : fields) {
434✔
3065
        auto it = doc.find(name);
434✔
3066
        REQUIRE(it != doc.end());
434!
3067

217✔
3068
        // bson documents are ordered  but Realm dictionaries aren't, so the
217✔
3069
        // document might validly be in a different order than we expected and
217✔
3070
        // we need to do a comparison that doesn't check order.
217✔
3071
        if (expected_value.type() == bson::Bson::Type::Document) {
434✔
3072
            REQUIRE(static_cast<const bson::BsonDocument&>(it->second).entries() ==
8!
3073
                    static_cast<const bson::BsonDocument&>(expected_value).entries());
8✔
3074
        }
8✔
3075
        else {
426✔
3076
            REQUIRE(it->second == expected_value);
426!
3077
        }
426✔
3078
    }
434✔
3079
}
428✔
3080

3081
TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") {
22✔
3082
    using namespace ::realm::bson;
22✔
3083

11✔
3084
    static auto server_schema = [] {
12✔
3085
        FLXSyncTestHarness::ServerSchema server_schema;
2✔
3086
        server_schema.queryable_fields = {"queryable_str_field"};
2✔
3087
        server_schema.schema = {
2✔
3088
            {"Asymmetric",
2✔
3089
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3090
             {
2✔
3091
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3092
                 {"location", PropertyType::String | PropertyType::Nullable},
2✔
3093
                 {"embedded obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
2✔
3094
                 {"embedded list", PropertyType::Object | PropertyType::Array, "Embedded"},
2✔
3095
                 {"embedded dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
2✔
3096
                  "Embedded"},
2✔
3097
                 {"link obj", PropertyType::Object | PropertyType::Nullable, "TopLevel"},
2✔
3098
                 {"link list", PropertyType::Object | PropertyType::Array, "TopLevel"},
2✔
3099
                 {"link dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
2✔
3100
                  "TopLevel"},
2✔
3101
             }},
2✔
3102
            {"Embedded", ObjectSchema::ObjectType::Embedded, {{"value", PropertyType::String}}},
2✔
3103
            {"TopLevel",
2✔
3104
             {
2✔
3105
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3106
                 {"value", PropertyType::Int},
2✔
3107
             }},
2✔
3108
        };
2✔
3109
        return server_schema;
2✔
3110
    }();
2✔
3111
    static auto harness = std::make_unique<FLXSyncTestHarness>("asymmetric_sync", server_schema);
22✔
3112

11✔
3113
    // We reuse a single app for each section, so tests will see the documents
11✔
3114
    // created by previous tests and we need to add those documents to the count
11✔
3115
    // we're waiting for
11✔
3116
    static std::unordered_map<std::string, size_t> previous_count;
22✔
3117
    auto get_documents = [&](const char* name, size_t expected_count) {
20✔
3118
        auto& count = previous_count[name];
18✔
3119
        auto documents =
18✔
3120
            harness->session().get_documents(*harness->app()->current_user(), name, count + expected_count);
18✔
3121
        count = documents.size();
18✔
3122
        return documents;
18✔
3123
    };
18✔
3124

11✔
3125
    SECTION("basic object construction") {
22✔
3126
        auto foo_obj_id = ObjectId::gen();
2✔
3127
        auto bar_obj_id = ObjectId::gen();
2✔
3128
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3129
            realm->begin_transaction();
2✔
3130
            CppContext c(realm);
2✔
3131
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
3132
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
3133
            realm->commit_transaction();
2✔
3134

1✔
3135
            auto documents = get_documents("Asymmetric", 2);
2✔
3136
            check_document(documents, foo_obj_id, {{"location", "foo"}});
2✔
3137
            check_document(documents, bar_obj_id, {{"location", "bar"}});
2✔
3138
        });
2✔
3139

1✔
3140
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3141
            wait_for_download(*realm);
2✔
3142

1✔
3143
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3144
            REQUIRE(table->size() == 0);
2!
3145
            // Cannot query asymmetric tables.
1✔
3146
            CHECK_THROWS_AS(Query(table), LogicError);
2✔
3147
        });
2✔
3148
    }
2✔
3149

11✔
3150
    SECTION("do not allow objects with same key within the same transaction") {
22✔
3151
        auto foo_obj_id = ObjectId::gen();
2✔
3152
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3153
            realm->begin_transaction();
2✔
3154
            CppContext c(realm);
2✔
3155
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
3156
            CHECK_THROWS_WITH(
2✔
3157
                Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "bar"s}})),
2✔
3158
                "Attempting to create an object of type 'Asymmetric' with an existing primary key value 'not "
2✔
3159
                "implemented'");
2✔
3160
            realm->commit_transaction();
2✔
3161

1✔
3162
            auto documents = get_documents("Asymmetric", 1);
2✔
3163
            check_document(documents, foo_obj_id, {{"location", "foo"}});
2✔
3164
        });
2✔
3165
    }
2✔
3166

11✔
3167
    SECTION("create multiple objects - separate commits") {
22✔
3168
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3169
            CppContext c(realm);
2✔
3170
            std::vector<ObjectId> obj_ids;
2✔
3171
            for (int i = 0; i < 100; ++i) {
202✔
3172
                realm->begin_transaction();
200✔
3173
                obj_ids.push_back(ObjectId::gen());
200✔
3174
                Object::create(c, realm, "Asymmetric",
200✔
3175
                               std::any(AnyDict{
200✔
3176
                                   {"_id", obj_ids.back()},
200✔
3177
                                   {"location", util::format("foo_%1", i)},
200✔
3178
                               }));
200✔
3179
                realm->commit_transaction();
200✔
3180
            }
200✔
3181

1✔
3182
            wait_for_upload(*realm);
2✔
3183
            wait_for_download(*realm);
2✔
3184

1✔
3185
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3186
            REQUIRE(table->size() == 0);
2!
3187

1✔
3188
            auto documents = get_documents("Asymmetric", 100);
2✔
3189
            for (int i = 0; i < 100; ++i) {
202✔
3190
                check_document(documents, obj_ids[i], {{"location", util::format("foo_%1", i)}});
200✔
3191
            }
200✔
3192
        });
2✔
3193
    }
2✔
3194

11✔
3195
    SECTION("create multiple objects - same commit") {
22✔
3196
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3197
            CppContext c(realm);
2✔
3198
            realm->begin_transaction();
2✔
3199
            std::vector<ObjectId> obj_ids;
2✔
3200
            for (int i = 0; i < 100; ++i) {
202✔
3201
                obj_ids.push_back(ObjectId::gen());
200✔
3202
                Object::create(c, realm, "Asymmetric",
200✔
3203
                               std::any(AnyDict{
200✔
3204
                                   {"_id", obj_ids.back()},
200✔
3205
                                   {"location", util::format("bar_%1", i)},
200✔
3206
                               }));
200✔
3207
            }
200✔
3208
            realm->commit_transaction();
2✔
3209

1✔
3210
            wait_for_upload(*realm);
2✔
3211
            wait_for_download(*realm);
2✔
3212

1✔
3213
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3214
            REQUIRE(table->size() == 0);
2!
3215

1✔
3216
            auto documents = get_documents("Asymmetric", 100);
2✔
3217
            for (int i = 0; i < 100; ++i) {
202✔
3218
                check_document(documents, obj_ids[i], {{"location", util::format("bar_%1", i)}});
200✔
3219
            }
200✔
3220
        });
2✔
3221
    }
2✔
3222

11✔
3223
    SECTION("open with schema mismatch on IsAsymmetric") {
22✔
3224
        auto schema = server_schema.schema;
2✔
3225
        schema.find("Asymmetric")->table_type = ObjectSchema::ObjectType::TopLevel;
2✔
3226

1✔
3227
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
2✔
3228
            SyncTestFile config(user, schema, SyncConfig::FLXSyncEnabled{});
2✔
3229
            auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
3230
            auto error_count = 0;
2✔
3231
            auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
3232
                                &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
3233
                ++error_count;
4✔
3234
                if (error_count == 1) {
4✔
3235
                    // Bad changeset detected by the client.
1✔
3236
                    CHECK(err.status == ErrorCodes::BadChangeset);
2!
3237
                }
2✔
3238
                else if (error_count == 2) {
2✔
3239
                    // Server asking for a client reset.
1✔
3240
                    CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
3241
                    CHECK(err.is_client_reset_requested());
2!
3242
                    promise.get_promise().emplace_value(std::move(err));
2✔
3243
                }
2✔
3244
            };
4✔
3245

1✔
3246
            config.sync_config->error_handler = err_handler;
2✔
3247
            auto realm = Realm::get_shared_realm(config);
2✔
3248

1✔
3249
            auto err = error_future.get();
2✔
3250
            CHECK(error_count == 2);
2!
3251
        });
2✔
3252
    }
2✔
3253

11✔
3254
    SECTION("basic embedded object construction") {
22✔
3255
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3256
            auto obj_id = ObjectId::gen();
2✔
3257
            realm->begin_transaction();
2✔
3258
            CppContext c(realm);
2✔
3259
            Object::create(c, realm, "Asymmetric",
2✔
3260
                           std::any(AnyDict{
2✔
3261
                               {"_id", obj_id},
2✔
3262
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
2✔
3263
                           }));
2✔
3264
            realm->commit_transaction();
2✔
3265
            wait_for_upload(*realm);
2✔
3266

1✔
3267
            auto documents = get_documents("Asymmetric", 1);
2✔
3268
            check_document(documents, obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
2✔
3269
        });
2✔
3270

1✔
3271
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3272
            wait_for_download(*realm);
2✔
3273

1✔
3274
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3275
            REQUIRE(table->size() == 0);
2!
3276
        });
2✔
3277
    }
2✔
3278

11✔
3279
    SECTION("replace embedded object") {
22✔
3280
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3281
            CppContext c(realm);
2✔
3282
            auto foo_obj_id = ObjectId::gen();
2✔
3283

1✔
3284
            realm->begin_transaction();
2✔
3285
            Object::create(c, realm, "Asymmetric",
2✔
3286
                           std::any(AnyDict{
2✔
3287
                               {"_id", foo_obj_id},
2✔
3288
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
2✔
3289
                           }));
2✔
3290
            realm->commit_transaction();
2✔
3291

1✔
3292
            // Update embedded field to `null`. The server discards this write
1✔
3293
            // as asymmetric sync can only create new objects.
1✔
3294
            realm->begin_transaction();
2✔
3295
            Object::create(c, realm, "Asymmetric",
2✔
3296
                           std::any(AnyDict{
2✔
3297
                               {"_id", foo_obj_id},
2✔
3298
                               {"embedded obj", std::any()},
2✔
3299
                           }));
2✔
3300
            realm->commit_transaction();
2✔
3301

1✔
3302
            // create a second object so that we can know when the translator
1✔
3303
            // has processed everything
1✔
3304
            realm->begin_transaction();
2✔
3305
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", ObjectId::gen()}, {}}));
2✔
3306
            realm->commit_transaction();
2✔
3307

1✔
3308
            wait_for_upload(*realm);
2✔
3309
            wait_for_download(*realm);
2✔
3310

1✔
3311
            auto table = realm->read_group().get_table("class_Asymmetric");
2✔
3312
            REQUIRE(table->size() == 0);
2!
3313

1✔
3314
            auto documents = get_documents("Asymmetric", 2);
2✔
3315
            check_document(documents, foo_obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
2✔
3316
        });
2✔
3317
    }
2✔
3318

11✔
3319
    SECTION("embedded collections") {
22✔
3320
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3321
            CppContext c(realm);
2✔
3322
            auto obj_id = ObjectId::gen();
2✔
3323

1✔
3324
            realm->begin_transaction();
2✔
3325
            Object::create(c, realm, "Asymmetric",
2✔
3326
                           std::any(AnyDict{
2✔
3327
                               {"_id", obj_id},
2✔
3328
                               {"embedded list", AnyVector{AnyDict{{"value", "foo"s}}, AnyDict{{"value", "bar"s}}}},
2✔
3329
                               {"embedded dictionary",
2✔
3330
                                AnyDict{
2✔
3331
                                    {"key1", AnyDict{{"value", "foo"s}}},
2✔
3332
                                    {"key2", AnyDict{{"value", "bar"s}}},
2✔
3333
                                }},
2✔
3334
                           }));
2✔
3335
            realm->commit_transaction();
2✔
3336

1✔
3337
            auto documents = get_documents("Asymmetric", 1);
2✔
3338
            check_document(
2✔
3339
                documents, obj_id,
2✔
3340
                {
2✔
3341
                    {"embedded list", BsonArray{BsonDocument{{"value", "foo"}}, BsonDocument{{"value", "bar"}}}},
2✔
3342
                    {"embedded dictionary",
2✔
3343
                     BsonDocument{
2✔
3344
                         {"key1", BsonDocument{{"value", "foo"}}},
2✔
3345
                         {"key2", BsonDocument{{"value", "bar"}}},
2✔
3346
                     }},
2✔
3347
                });
2✔
3348
        });
2✔
3349
    }
2✔
3350

11✔
3351
    SECTION("asymmetric table not allowed in PBS") {
22✔
3352
        Schema schema{
2✔
3353
            {"Asymmetric2",
2✔
3354
             ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3355
             {
2✔
3356
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
2✔
3357
                 {"location", PropertyType::Int},
2✔
3358
                 {"reading", PropertyType::Int},
2✔
3359
             }},
2✔
3360
        };
2✔
3361

1✔
3362
        SyncTestFile config(harness->app(), Bson{}, schema);
2✔
3363
        REQUIRE_EXCEPTION(
2✔
3364
            Realm::get_shared_realm(config), SchemaValidationFailed,
2✔
3365
            Catch::Matchers::ContainsSubstring("Asymmetric table 'Asymmetric2' not allowed in partition based sync"));
2✔
3366
    }
2✔
3367

11✔
3368
    SECTION("links to top-level objects") {
22✔
3369
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3370
            subscribe_to_all_and_bootstrap(*realm);
2✔
3371

1✔
3372
            ObjectId obj_id = ObjectId::gen();
2✔
3373
            std::array<ObjectId, 5> target_obj_ids;
2✔
3374
            for (auto& id : target_obj_ids) {
10✔
3375
                id = ObjectId::gen();
10✔
3376
            }
10✔
3377

1✔
3378
            realm->begin_transaction();
2✔
3379
            CppContext c(realm);
2✔
3380
            Object::create(c, realm, "Asymmetric",
2✔
3381
                           std::any(AnyDict{
2✔
3382
                               {"_id", obj_id},
2✔
3383
                               {"link obj", AnyDict{{"_id", target_obj_ids[0]}, {"value", INT64_C(10)}}},
2✔
3384
                               {"link list",
2✔
3385
                                AnyVector{
2✔
3386
                                    AnyDict{{"_id", target_obj_ids[1]}, {"value", INT64_C(11)}},
2✔
3387
                                    AnyDict{{"_id", target_obj_ids[2]}, {"value", INT64_C(12)}},
2✔
3388
                                }},
2✔
3389
                               {"link dictionary",
2✔
3390
                                AnyDict{
2✔
3391
                                    {"key1", AnyDict{{"_id", target_obj_ids[3]}, {"value", INT64_C(13)}}},
2✔
3392
                                    {"key2", AnyDict{{"_id", target_obj_ids[4]}, {"value", INT64_C(14)}}},
2✔
3393
                                }},
2✔
3394
                           }));
2✔
3395
            realm->commit_transaction();
2✔
3396
            wait_for_upload(*realm);
2✔
3397

1✔
3398
            auto docs1 = get_documents("Asymmetric", 1);
2✔
3399
            check_document(docs1, obj_id,
2✔
3400
                           {{"link obj", target_obj_ids[0]},
2✔
3401
                            {"link list", BsonArray{{target_obj_ids[1], target_obj_ids[2]}}},
2✔
3402
                            {
2✔
3403
                                "link dictionary",
2✔
3404
                                BsonDocument{
2✔
3405
                                    {"key1", target_obj_ids[3]},
2✔
3406
                                    {"key2", target_obj_ids[4]},
2✔
3407
                                },
2✔
3408
                            }});
2✔
3409

1✔
3410
            auto docs2 = get_documents("TopLevel", 5);
2✔
3411
            for (int64_t i = 0; i < 5; ++i) {
12✔
3412
                check_document(docs2, target_obj_ids[i], {{"value", 10 + i}});
10✔
3413
            }
10✔
3414
        });
2✔
3415
    }
2✔
3416

11✔
3417
    // Add any new test sections above this point
11✔
3418

11✔
3419
    SECTION("teardown") {
22✔
3420
        harness.reset();
2✔
3421
    }
2✔
3422
}
22✔
3423

3424
TEST_CASE("flx: data ingest - dev mode", "[sync][flx][data ingest][baas]") {
2✔
3425
    FLXSyncTestHarness::ServerSchema server_schema;
2✔
3426
    server_schema.dev_mode_enabled = true;
2✔
3427
    server_schema.schema = Schema{};
2✔
3428

1✔
3429
    auto schema = Schema{{"Asymmetric",
2✔
3430
                          ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3431
                          {
2✔
3432
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3433
                              {"location", PropertyType::String | PropertyType::Nullable},
2✔
3434
                          }},
2✔
3435
                         {"TopLevel",
2✔
3436
                          {
2✔
3437
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3438
                              {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
2✔
3439
                          }}};
2✔
3440

1✔
3441
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
2✔
3442

1✔
3443
    auto foo_obj_id = ObjectId::gen();
2✔
3444
    auto bar_obj_id = ObjectId::gen();
2✔
3445

1✔
3446
    harness.do_with_new_realm(
2✔
3447
        [&](SharedRealm realm) {
2✔
3448
            CppContext c(realm);
2✔
3449
            realm->begin_transaction();
2✔
3450
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
2✔
3451
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
2✔
3452
            realm->commit_transaction();
2✔
3453

1✔
3454
            auto docs = harness.session().get_documents(*realm->config().sync_config->user, "Asymmetric", 2);
2✔
3455
            check_document(docs, foo_obj_id, {{"location", "foo"}});
2✔
3456
            check_document(docs, bar_obj_id, {{"location", "bar"}});
2✔
3457
        },
2✔
3458
        schema);
2✔
3459
}
2✔
3460

3461
TEST_CASE("flx: data ingest - write not allowed", "[sync][flx][data ingest][baas]") {
2✔
3462
    AppCreateConfig::ServiceRole role;
2✔
3463
    role.name = "asymmetric_write_perms";
2✔
3464

1✔
3465
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
3466
    doc_filters.read = true;
2✔
3467
    doc_filters.write = false;
2✔
3468
    role.document_filters = doc_filters;
2✔
3469

1✔
3470
    role.insert_filter = true;
2✔
3471
    role.delete_filter = true;
2✔
3472
    role.read = true;
2✔
3473
    role.write = true;
2✔
3474

1✔
3475
    Schema schema({
2✔
3476
        {"Asymmetric",
2✔
3477
         ObjectSchema::ObjectType::TopLevelAsymmetric,
2✔
3478
         {
2✔
3479
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
3480
             {"location", PropertyType::String | PropertyType::Nullable},
2✔
3481
             {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
2✔
3482
         }},
2✔
3483
        {"Embedded",
2✔
3484
         ObjectSchema::ObjectType::Embedded,
2✔
3485
         {
2✔
3486
             {"value", PropertyType::String | PropertyType::Nullable},
2✔
3487
         }},
2✔
3488
    });
2✔
3489
    FLXSyncTestHarness::ServerSchema server_schema{schema, {}, {role}};
2✔
3490
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
2✔
3491

1✔
3492
    auto error_received_pf = util::make_promise_future<void>();
2✔
3493
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3494
    config.sync_config->on_sync_client_event_hook =
2✔
3495
        [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
2✔
3496
            std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
14✔
3497
            if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
14✔
3498
                return SyncClientHookAction::NoAction;
10✔
3499
            }
10✔
3500
            auto session = weak_session.lock();
4✔
3501
            REQUIRE(session);
4!
3502

2✔
3503
            auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
4✔
3504

2✔
3505
            if (error_code == sync::ProtocolError::initial_sync_not_completed) {
4✔
3506
                return SyncClientHookAction::NoAction;
2✔
3507
            }
2✔
3508

1✔
3509
            REQUIRE(error_code == sync::ProtocolError::write_not_allowed);
2!
3510
            REQUIRE_FALSE(data.error_info->compensating_write_server_version.has_value());
2!
3511
            REQUIRE_FALSE(data.error_info->compensating_writes.empty());
2!
3512
            promise.get_promise().emplace_value();
2✔
3513

1✔
3514
            return SyncClientHookAction::EarlyReturn;
2✔
3515
        };
2✔
3516

1✔
3517
    auto realm = Realm::get_shared_realm(config);
2✔
3518

1✔
3519
    // Create an asymmetric object and upload it to the server.
1✔
3520
    {
2✔
3521
        realm->begin_transaction();
2✔
3522
        CppContext c(realm);
2✔
3523
        Object::create(c, realm, "Asymmetric",
2✔
3524
                       std::any(AnyDict{{"_id", ObjectId::gen()}, {"embedded_obj", AnyDict{{"value", "foo"s}}}}));
2✔
3525
        realm->commit_transaction();
2✔
3526
    }
2✔
3527

1✔
3528
    error_received_pf.future.get();
2✔
3529
    realm->close();
2✔
3530
}
2✔
3531

3532
TEST_CASE("flx: send client error", "[sync][flx][baas]") {
2✔
3533
    FLXSyncTestHarness harness("flx_client_error");
2✔
3534

1✔
3535
    // An integration error is simulated while bootstrapping.
1✔
3536
    // This results in the client sending an error message to the server.
1✔
3537
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3538
    config.sync_config->simulate_integration_error = true;
2✔
3539

1✔
3540
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
2✔
3541
    auto error_count = 0;
2✔
3542
    auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
2✔
3543
                        &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
4✔
3544
        ++error_count;
4✔
3545
        if (error_count == 1) {
4✔
3546
            // Bad changeset detected by the client.
1✔
3547
            CHECK(err.status == ErrorCodes::BadChangeset);
2!
3548
        }
2✔
3549
        else if (error_count == 2) {
2✔
3550
            // Server asking for a client reset.
1✔
3551
            CHECK(err.status == ErrorCodes::SyncClientResetRequired);
2!
3552
            CHECK(err.is_client_reset_requested());
2!
3553
            promise.get_promise().emplace_value(std::move(err));
2✔
3554
        }
2✔
3555
    };
4✔
3556

1✔
3557
    config.sync_config->error_handler = err_handler;
2✔
3558
    auto realm = Realm::get_shared_realm(config);
2✔
3559
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
3560
    auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3561
    new_query.insert_or_assign(Query(table));
2✔
3562
    new_query.commit();
2✔
3563

1✔
3564
    auto err = error_future.get();
2✔
3565
    CHECK(error_count == 2);
2!
3566
}
2✔
3567

3568
TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") {
6✔
3569
    FLXSyncTestHarness harness("bootstrap_full_sync");
6✔
3570

3✔
3571
    auto setup_subs = [](SharedRealm& realm) {
12✔
3572
        auto table = realm->read_group().get_table("class_TopLevel");
12✔
3573
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
12✔
3574
        new_query.clear();
12✔
3575
        auto col = table->get_column_key("queryable_str_field");
12✔
3576
        new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
12✔
3577
        return new_query.commit();
12✔
3578
    };
12✔
3579

3✔
3580
    auto bar_obj_id = ObjectId::gen();
6✔
3581
    auto bizz_obj_id = ObjectId::gen();
6✔
3582
    auto setup_and_poison_cache = [&] {
6✔
3583
        harness.load_initial_data([&](SharedRealm realm) {
6✔
3584
            CppContext c(realm);
6✔
3585
            Object::create(c, realm, "TopLevel",
6✔
3586
                           std::any(AnyDict{{"_id", bar_obj_id},
6✔
3587
                                            {"queryable_str_field", std::string{"bar"}},
6✔
3588
                                            {"queryable_int_field", static_cast<int64_t>(10)},
6✔
3589
                                            {"non_queryable_field", std::string{"non queryable 2"}}}));
6✔
3590
        });
6✔
3591

3✔
3592
        harness.do_with_new_realm([&](SharedRealm realm) {
6✔
3593
            // first set a subscription to force the creation/caching of a broker snapshot on the server.
3✔
3594
            setup_subs(realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
6✔
3595
            wait_for_advance(*realm);
6✔
3596
            auto table = realm->read_group().get_table("class_TopLevel");
6✔
3597
            REQUIRE(table->find_primary_key(bar_obj_id));
6!
3598

3✔
3599
            // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
3✔
3600
            // wait for a MARK message to come back, we'd miss it in our results.
3✔
3601
            CppContext c(realm);
6✔
3602
            realm->begin_transaction();
6✔
3603
            Object::create(c, realm, "TopLevel",
6✔
3604
                           std::any(AnyDict{{"_id", bizz_obj_id},
6✔
3605
                                            {"queryable_str_field", std::string{"bizz"}},
6✔
3606
                                            {"queryable_int_field", static_cast<int64_t>(15)},
6✔
3607
                                            {"non_queryable_field", std::string{"non queryable 3"}}}));
6✔
3608
            realm->commit_transaction();
6✔
3609
            wait_for_upload(*realm);
6✔
3610
        });
6✔
3611
    };
6✔
3612

3✔
3613
    SECTION("regular subscription change") {
6✔
3614
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3615
        std::atomic<bool> saw_truncated_bootstrap{false};
2✔
3616
        triggered_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
2✔
3617
                                                                      const SyncClientHookData& data) {
30✔
3618
            auto sess = weak_sess.lock();
30✔
3619
            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
30✔
3620
                return SyncClientHookAction::NoAction;
28✔
3621
            }
28✔
3622

1✔
3623
            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3624
            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3625
            REQUIRE(data.num_changesets == 1);
2!
3626
            auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3627
            auto read_tr = db->start_read();
2✔
3628
            auto table = read_tr->get_table("class_TopLevel");
2✔
3629
            REQUIRE(table->find_primary_key(bar_obj_id));
2!
3630
            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3631
            saw_truncated_bootstrap.store(true);
2✔
3632

1✔
3633
            return SyncClientHookAction::NoAction;
2✔
3634
        };
2✔
3635
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3636

1✔
3637
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3638
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3639
        // the changes we're about to create.
1✔
3640
        wait_for_upload(*problem_realm);
2✔
3641
        wait_for_download(*problem_realm);
2✔
3642

1✔
3643
        nlohmann::json command_request = {
2✔
3644
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3645
        };
2✔
3646
        auto resp_body =
2✔
3647
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3648
                .get();
2✔
3649
        REQUIRE(resp_body == "{}");
2!
3650

1✔
3651
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3652
        setup_and_poison_cache();
2✔
3653

1✔
3654
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3655
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3656
        // processed.
1✔
3657
        setup_subs(problem_realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3658
        wait_for_advance(*problem_realm);
2✔
3659

1✔
3660
        REQUIRE(saw_truncated_bootstrap.load());
2!
3661
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3662
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3663
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3664
    }
2✔
3665

3✔
3666
// TODO: remote-baas: This test fails intermittently with Windows remote baas server - to be fixed in RCORE-1674
3✔
3667
#ifndef _WIN32
6✔
3668
    SECTION("disconnect between bootstrap and mark") {
6✔
3669
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3670
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
3671
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3672
            [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &bizz_obj_id,
2✔
3673
             &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
30✔
3674
                auto sess = weak_sess.lock();
30✔
3675
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
30✔
3676
                    return SyncClientHookAction::NoAction;
28✔
3677
                }
28✔
3678

1✔
3679
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3680
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3681
                REQUIRE(data.num_changesets == 1);
2!
3682
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3683
                auto read_tr = db->start_read();
2✔
3684
                auto table = read_tr->get_table("class_TopLevel");
2✔
3685
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3686
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3687

1✔
3688
                sess->pause();
2✔
3689
                promise.get_promise().emplace_value();
2✔
3690
                return SyncClientHookAction::NoAction;
2✔
3691
            };
2✔
3692
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3693

1✔
3694
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3695
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3696
        // the changes we're about to create.
1✔
3697
        wait_for_upload(*problem_realm);
2✔
3698
        wait_for_download(*problem_realm);
2✔
3699

1✔
3700
        nlohmann::json command_request = {
2✔
3701
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3702
        };
2✔
3703
        auto resp_body =
2✔
3704
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3705
                .get();
2✔
3706
        REQUIRE(resp_body == "{}");
2!
3707

1✔
3708
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3709
        setup_and_poison_cache();
2✔
3710

1✔
3711
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3712
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3713
        // processed.
1✔
3714
        auto sub_set = setup_subs(problem_realm);
2✔
3715
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3716

1✔
3717
        interrupted.get();
2✔
3718
        problem_realm->sync_session()->shutdown_and_wait();
2✔
3719
        REQUIRE(sub_complete_future.is_ready());
2!
3720
        sub_set.refresh();
2✔
3721
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3722

1✔
3723
        sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3724
        problem_realm->sync_session()->resume();
2✔
3725
        sub_complete_future.get();
2✔
3726
        wait_for_advance(*problem_realm);
2✔
3727

1✔
3728
        sub_set.refresh();
2✔
3729
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3730
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3731
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3732
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3733
    }
2✔
3734
#endif
6✔
3735
    SECTION("error/suspend between bootstrap and mark") {
6✔
3736
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3737
        triggered_config.sync_config->on_sync_client_event_hook =
2✔
3738
            [&bizz_obj_id, &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) {
30✔
3739
                auto sess = weak_sess.lock();
30✔
3740
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
30✔
3741
                    return SyncClientHookAction::NoAction;
28✔
3742
                }
28✔
3743

1✔
3744
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
2✔
3745
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
2!
3746
                REQUIRE(data.num_changesets == 1);
2!
3747
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
2✔
3748
                auto read_tr = db->start_read();
2✔
3749
                auto table = read_tr->get_table("class_TopLevel");
2✔
3750
                REQUIRE(table->find_primary_key(bar_obj_id));
2!
3751
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
2!
3752

1✔
3753
                return SyncClientHookAction::TriggerReconnect;
2✔
3754
            };
2✔
3755
        auto problem_realm = Realm::get_shared_realm(triggered_config);
2✔
3756

1✔
3757
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
1✔
3758
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
1✔
3759
        // the changes we're about to create.
1✔
3760
        wait_for_upload(*problem_realm);
2✔
3761
        wait_for_download(*problem_realm);
2✔
3762

1✔
3763
        nlohmann::json command_request = {
2✔
3764
            {"command", "PAUSE_ROUTER_SESSION"},
2✔
3765
        };
2✔
3766
        auto resp_body =
2✔
3767
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
2✔
3768
                .get();
2✔
3769
        REQUIRE(resp_body == "{}");
2!
3770

1✔
3771
        // Put some data into the server, this will be the data that will be in the broker cache.
1✔
3772
        setup_and_poison_cache();
2✔
3773

1✔
3774
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
1✔
3775
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
1✔
3776
        // processed.
1✔
3777
        auto sub_set = setup_subs(problem_realm);
2✔
3778
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
3779

1✔
3780
        sub_complete_future.get();
2✔
3781
        wait_for_advance(*problem_realm);
2✔
3782

1✔
3783
        sub_set.refresh();
2✔
3784
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
2!
3785
        auto table = problem_realm->read_group().get_table("class_TopLevel");
2✔
3786
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3787
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3788
    }
2✔
3789
}
6✔
3790

3791
TEST_CASE("flx: convert flx sync realm to bundled realm", "[app][flx][baas]") {
12✔
3792
    static auto foo_obj_id = ObjectId::gen();
12✔
3793
    static auto bar_obj_id = ObjectId::gen();
12✔
3794
    static auto bizz_obj_id = ObjectId::gen();
12✔
3795
    static std::optional<FLXSyncTestHarness> harness;
12✔
3796
    if (!harness) {
12✔
3797
        harness.emplace("bundled_flx_realms");
2✔
3798
        harness->load_initial_data([&](SharedRealm realm) {
2✔
3799
            CppContext c(realm);
2✔
3800
            Object::create(c, realm, "TopLevel",
2✔
3801
                           std::any(AnyDict{{"_id", foo_obj_id},
2✔
3802
                                            {"queryable_str_field", "foo"s},
2✔
3803
                                            {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3804
                                            {"non_queryable_field", "non queryable 1"s}}));
2✔
3805
            Object::create(c, realm, "TopLevel",
2✔
3806
                           std::any(AnyDict{{"_id", bar_obj_id},
2✔
3807
                                            {"queryable_str_field", "bar"s},
2✔
3808
                                            {"queryable_int_field", static_cast<int64_t>(10)},
2✔
3809
                                            {"non_queryable_field", "non queryable 2"s}}));
2✔
3810
        });
2✔
3811
    }
2✔
3812

6✔
3813
    SECTION("flx to flx (should succeed)") {
12✔
3814
        create_user_and_log_in(harness->app());
2✔
3815
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3816
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3817
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3818
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3819
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3820
            auto subs = std::move(mut_subs).commit();
2✔
3821

1✔
3822
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3823
            wait_for_advance(*realm);
2✔
3824

1✔
3825
            realm->convert(target_config);
2✔
3826
        });
2✔
3827

1✔
3828
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3829

1✔
3830
        target_realm->begin_transaction();
2✔
3831
        CppContext c(target_realm);
2✔
3832
        Object::create(c, target_realm, "TopLevel",
2✔
3833
                       std::any(AnyDict{{"_id", bizz_obj_id},
2✔
3834
                                        {"queryable_str_field", "bizz"s},
2✔
3835
                                        {"queryable_int_field", static_cast<int64_t>(15)},
2✔
3836
                                        {"non_queryable_field", "non queryable 3"s}}));
2✔
3837
        target_realm->commit_transaction();
2✔
3838

1✔
3839
        wait_for_upload(*target_realm);
2✔
3840
        wait_for_download(*target_realm);
2✔
3841

1✔
3842
        auto latest_subs = target_realm->get_active_subscription_set();
2✔
3843
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3844
        REQUIRE(latest_subs.size() == 1);
2!
3845
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
2!
3846
        REQUIRE(latest_subs.at(0).query_string ==
2!
3847
                Query(table).greater(table->get_column_key("queryable_int_field"), 5).get_description());
2✔
3848

1✔
3849
        REQUIRE(table->size() == 2);
2!
3850
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3851
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3852
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3853
    }
2✔
3854

6✔
3855
    SECTION("flx to local (should succeed)") {
12✔
3856
        TestFile target_config;
2✔
3857

1✔
3858
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3859
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3860
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3861
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3862
            auto subs = std::move(mut_subs).commit();
2✔
3863

1✔
3864
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3865
            wait_for_advance(*realm);
2✔
3866

1✔
3867
            target_config.schema = realm->schema();
2✔
3868
            target_config.schema_version = realm->schema_version();
2✔
3869
            realm->convert(target_config);
2✔
3870
        });
2✔
3871

1✔
3872
        auto target_realm = Realm::get_shared_realm(target_config);
2✔
3873
        REQUIRE_THROWS(target_realm->get_active_subscription_set());
2✔
3874

1✔
3875
        auto table = target_realm->read_group().get_table("class_TopLevel");
2✔
3876
        REQUIRE(table->size() == 2);
2!
3877
        REQUIRE(table->find_primary_key(bar_obj_id));
2!
3878
        REQUIRE(table->find_primary_key(bizz_obj_id));
2!
3879
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
2!
3880
    }
2✔
3881

6✔
3882
    SECTION("flx to pbs (should fail to convert)") {
12✔
3883
        create_user_and_log_in(harness->app());
2✔
3884
        SyncTestFile target_config(harness->app()->current_user(), "12345"s, harness->schema());
2✔
3885
        harness->do_with_new_realm([&](SharedRealm realm) {
2✔
3886
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
3887
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
3888
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
2✔
3889
            auto subs = std::move(mut_subs).commit();
2✔
3890

1✔
3891
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
2✔
3892
            wait_for_advance(*realm);
2✔
3893

1✔
3894
            REQUIRE_THROWS(realm->convert(target_config));
2✔
3895
        });
2✔
3896
    }
2✔
3897

6✔
3898
    SECTION("pbs to flx (should fail to convert)") {
12✔
3899
        create_user_and_log_in(harness->app());
2✔
3900
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3901

1✔
3902
        auto pbs_app_config = minimal_app_config("pbs_to_flx_convert", harness->schema());
2✔
3903

1✔
3904
        TestAppSession pbs_app_session(create_app(pbs_app_config));
2✔
3905
        SyncTestFile source_config(pbs_app_session.app()->current_user(), "54321"s, pbs_app_config.schema);
2✔
3906
        auto realm = Realm::get_shared_realm(source_config);
2✔
3907

1✔
3908
        realm->begin_transaction();
2✔
3909
        CppContext c(realm);
2✔
3910
        Object::create(c, realm, "TopLevel",
2✔
3911
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3912
                                        {"queryable_str_field", "foo"s},
2✔
3913
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3914
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3915
        realm->commit_transaction();
2✔
3916

1✔
3917
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3918
    }
2✔
3919

6✔
3920
    SECTION("local to flx (should fail to convert)") {
12✔
3921
        TestFile source_config;
2✔
3922
        source_config.schema = harness->schema();
2✔
3923
        source_config.schema_version = 1;
2✔
3924

1✔
3925
        auto realm = Realm::get_shared_realm(source_config);
2✔
3926
        auto foo_obj_id = ObjectId::gen();
2✔
3927

1✔
3928
        realm->begin_transaction();
2✔
3929
        CppContext c(realm);
2✔
3930
        Object::create(c, realm, "TopLevel",
2✔
3931
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
3932
                                        {"queryable_str_field", "foo"s},
2✔
3933
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
3934
                                        {"non_queryable_field", "non queryable 1"s}}));
2✔
3935
        realm->commit_transaction();
2✔
3936

1✔
3937
        create_user_and_log_in(harness->app());
2✔
3938
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
2✔
3939

1✔
3940
        REQUIRE_THROWS(realm->convert(target_config));
2✔
3941
    }
2✔
3942

6✔
3943
    // Add new sections before this
6✔
3944
    SECTION("teardown") {
12✔
3945
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
2✔
3946
        harness.reset();
2✔
3947
    }
2✔
3948
}
12✔
3949

3950
TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][flx][compensating write][baas]") {
2✔
3951
    AppCreateConfig::ServiceRole role;
2✔
3952
    role.name = "compensating_write_perms";
2✔
3953

1✔
3954
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
2✔
3955
    doc_filters.read = true;
2✔
3956
    doc_filters.write =
2✔
3957
        nlohmann::json{{"queryable_str_field", nlohmann::json{{"$in", nlohmann::json::array({"foo", "bar"})}}}};
2✔
3958
    role.document_filters = doc_filters;
2✔
3959

1✔
3960
    role.insert_filter = true;
2✔
3961
    role.delete_filter = true;
2✔
3962
    role.read = true;
2✔
3963
    role.write = true;
2✔
3964
    FLXSyncTestHarness::ServerSchema server_schema{
2✔
3965
        g_simple_embedded_obj_schema, {"queryable_str_field", "queryable_int_field"}, {role}};
2✔
3966
    FLXSyncTestHarness::Config harness_config("flx_bad_query", server_schema);
2✔
3967
    harness_config.reconnect_mode = ReconnectMode::testing;
2✔
3968
    FLXSyncTestHarness harness(std::move(harness_config));
2✔
3969

1✔
3970
    auto test_obj_id_1 = ObjectId::gen();
2✔
3971
    auto test_obj_id_2 = ObjectId::gen();
2✔
3972

1✔
3973
    create_user_and_log_in(harness.app());
2✔
3974
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
3975
    config.cache = false;
2✔
3976

1✔
3977
    {
2✔
3978
        auto error_received_pf = util::make_promise_future<void>();
2✔
3979
        config.sync_config->on_sync_client_event_hook =
2✔
3980
            [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
2✔
3981
                std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
28✔
3982
                if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
28✔
3983
                    return SyncClientHookAction::NoAction;
23✔
3984
                }
23✔
3985
                auto session = weak_session.lock();
5✔
3986
                REQUIRE(session);
5!
3987

3✔
3988
                auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
5✔
3989

3✔
3990
                if (error_code == sync::ProtocolError::initial_sync_not_completed) {
5✔
3991
                    return SyncClientHookAction::NoAction;
3✔
3992
                }
3✔
3993

1✔
3994
                REQUIRE(error_code == sync::ProtocolError::compensating_write);
2!
3995
                REQUIRE_FALSE(data.error_info->compensating_writes.empty());
2!
3996
                promise.get_promise().emplace_value();
2✔
3997

1✔
3998
                return SyncClientHookAction::TriggerReconnect;
2✔
3999
            };
2✔
4000

1✔
4001
        auto realm = Realm::get_shared_realm(config);
2✔
4002
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
4003
        auto queryable_str_field = table->get_column_key("queryable_str_field");
2✔
4004
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
2✔
4005
        new_query.insert_or_assign(Query(table).equal(queryable_str_field, "bizz"));
2✔
4006
        std::move(new_query).commit();
2✔
4007

1✔
4008
        wait_for_upload(*realm);
2✔
4009
        wait_for_download(*realm);
2✔
4010

1✔
4011
        CppContext c(realm);
2✔
4012
        realm->begin_transaction();
2✔
4013
        Object::create(c, realm, "TopLevel",
2✔
4014
                       util::Any(AnyDict{
2✔
4015
                           {"_id", test_obj_id_1},
2✔
4016
                           {"queryable_str_field", std::string{"foo"}},
2✔
4017
                       }));
2✔
4018
        realm->commit_transaction();
2✔
4019

1✔
4020
        realm->begin_transaction();
2✔
4021
        Object::create(c, realm, "TopLevel",
2✔
4022
                       util::Any(AnyDict{
2✔
4023
                           {"_id", test_obj_id_2},
2✔
4024
                           {"queryable_str_field", std::string{"baz"}},
2✔
4025
                       }));
2✔
4026
        realm->commit_transaction();
2✔
4027

1✔
4028
        error_received_pf.future.get();
2✔
4029
        realm->sync_session()->shutdown_and_wait();
2✔
4030
        config.sync_config->on_sync_client_event_hook = {};
2✔
4031
    }
2✔
4032

1✔
4033
    _impl::RealmCoordinator::clear_all_caches();
2✔
4034

1✔
4035
    std::mutex errors_mutex;
2✔
4036
    std::condition_variable new_compensating_write;
2✔
4037
    std::vector<std::pair<ObjectId, sync::version_type>> error_to_download_version;
2✔
4038
    std::vector<sync::CompensatingWriteErrorInfo> compensating_writes;
2✔
4039
    sync::version_type download_version;
2✔
4040

1✔
4041
    config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
2✔
4042
                                                        const SyncClientHookData& data) mutable {
12✔
4043
        auto session = weak_session.lock();
12✔
4044
        if (!session) {
12✔
4045
            return SyncClientHookAction::NoAction;
×
4046
        }
×
4047

6✔
4048
        if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
12✔
4049
            if (data.event == SyncClientHookEvent::DownloadMessageReceived) {
8✔
4050
                download_version = data.progress.download.server_version;
4✔
4051
            }
4✔
4052

4✔
4053
            return SyncClientHookAction::NoAction;
8✔
4054
        }
8✔
4055

2✔
4056
        auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
4✔
4057
        REQUIRE(error_code == sync::ProtocolError::compensating_write);
4!
4058
        REQUIRE(!data.error_info->compensating_writes.empty());
4!
4059
        std::lock_guard<std::mutex> lk(errors_mutex);
4✔
4060
        for (const auto& compensating_write : data.error_info->compensating_writes) {
4✔
4061
            error_to_download_version.emplace_back(compensating_write.primary_key.get_object_id(),
4✔
4062
                                                   *data.error_info->compensating_write_server_version);
4✔
4063
        }
4✔
4064

2✔
4065
        return SyncClientHookAction::NoAction;
4✔
4066
    };
4✔
4067

1✔
4068
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) {
4✔
4069
        std::unique_lock<std::mutex> lk(errors_mutex);
4✔
4070
        REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
4!
4071
        for (const auto& compensating_write : error.compensating_writes_info) {
4✔
4072
            auto tracked_error = std::find_if(error_to_download_version.begin(), error_to_download_version.end(),
4✔
4073
                                              [&](const auto& pair) {
6✔
4074
                                                  return pair.first == compensating_write.primary_key.get_object_id();
6✔
4075
                                              });
6✔
4076
            REQUIRE(tracked_error != error_to_download_version.end());
4!
4077
            CHECK(tracked_error->second <= download_version);
4!
4078
            compensating_writes.push_back(compensating_write);
4✔
4079
        }
4✔
4080
        new_compensating_write.notify_one();
4✔
4081
    };
4✔
4082

1✔
4083
    auto realm = Realm::get_shared_realm(config);
2✔
4084

1✔
4085
    wait_for_upload(*realm);
2✔
4086
    wait_for_download(*realm);
2✔
4087

1✔
4088
    std::unique_lock<std::mutex> lk(errors_mutex);
2✔
4089
    new_compensating_write.wait_for(lk, std::chrono::seconds(30), [&] {
2✔
4090
        return compensating_writes.size() == 2;
2✔
4091
    });
2✔
4092

1✔
4093
    REQUIRE(compensating_writes.size() == 2);
2!
4094
    auto& write_info = compensating_writes[0];
2✔
4095
    CHECK(write_info.primary_key.is_type(type_ObjectId));
2!
4096
    CHECK(write_info.primary_key.get_object_id() == test_obj_id_1);
2!
4097
    CHECK(write_info.object_name == "TopLevel");
2!
4098
    CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring("object is outside of the current query view"));
2✔
4099

1✔
4100
    write_info = compensating_writes[1];
2✔
4101
    REQUIRE(write_info.primary_key.is_type(type_ObjectId));
2!
4102
    REQUIRE(write_info.primary_key.get_object_id() == test_obj_id_2);
2!
4103
    REQUIRE(write_info.object_name == "TopLevel");
2!
4104
    REQUIRE(write_info.reason ==
2!
4105
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", test_obj_id_2));
2✔
4106
    auto top_level_table = realm->read_group().get_table("class_TopLevel");
2✔
4107
    REQUIRE(top_level_table->is_empty());
2!
4108
}
2✔
4109

4110
TEST_CASE("flx: bootstrap changesets are applied continuously", "[sync][flx][bootstrap][baas]") {
2✔
4111
    FLXSyncTestHarness harness("flx_bootstrap_ordering", {g_large_array_schema, {"queryable_int_field"}});
2✔
4112
    fill_large_array_schema(harness);
2✔
4113

1✔
4114
    std::unique_ptr<std::thread> th;
2✔
4115
    sync::version_type user_commit_version = UINT_FAST64_MAX;
2✔
4116
    sync::version_type bootstrap_version = UINT_FAST64_MAX;
2✔
4117
    SharedRealm realm;
2✔
4118
    std::condition_variable cv;
2✔
4119
    std::mutex mutex;
2✔
4120
    bool allow_to_commit = false;
2✔
4121

1✔
4122
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
4123
    auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
2✔
4124
    auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
2✔
4125
    config.sync_config->on_sync_client_event_hook =
2✔
4126
        [promise = std::move(shared_promise), &th, &realm, &user_commit_version, &bootstrap_version, &cv, &mutex,
2✔
4127
         &allow_to_commit](std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) {
50✔
4128
            if (data.query_version == 0) {
50✔
4129
                return SyncClientHookAction::NoAction;
12✔
4130
            }
12✔
4131
            if (data.event != SyncClientHookEvent::DownloadMessageIntegrated) {
38✔
4132
                return SyncClientHookAction::NoAction;
26✔
4133
            }
26✔
4134
            auto session = weak_session.lock();
12✔
4135
            if (!session) {
12✔
4136
                return SyncClientHookAction::NoAction;
×
4137
            }
×
4138
            if (data.batch_state != sync::DownloadBatchState::MoreToCome) {
12✔
4139
                // Read version after bootstrap is done.
1✔
4140
                auto db = TestHelper::get_db(realm);
2✔
4141
                ReadTransaction rt(db);
2✔
4142
                bootstrap_version = rt.get_version();
2✔
4143
                {
2✔
4144
                    std::lock_guard<std::mutex> lock(mutex);
2✔
4145
                    allow_to_commit = true;
2✔
4146
                }
2✔
4147
                cv.notify_one();
2✔
4148
                session->force_close();
2✔
4149
                promise->emplace_value();
2✔
4150
                return SyncClientHookAction::NoAction;
2✔
4151
            }
2✔
4152

5✔
4153
            if (th) {
10✔
4154
                return SyncClientHookAction::NoAction;
8✔
4155
            }
8✔
4156

1✔
4157
            auto func = [&] {
2✔
4158
                // Attempt to commit a local change after the first bootstrap batch was committed.
1✔
4159
                auto db = TestHelper::get_db(realm);
2✔
4160
                WriteTransaction wt(db);
2✔
4161
                TableRef table = wt.get_table("class_TopLevel");
2✔
4162
                table->create_object_with_primary_key(ObjectId::gen());
2✔
4163
                {
2✔
4164
                    std::unique_lock<std::mutex> lock(mutex);
2✔
4165
                    // Wait to commit until we read the final bootstrap version.
1✔
4166
                    cv.wait(lock, [&] {
2✔
4167
                        return allow_to_commit;
2✔
4168
                    });
2✔
4169
                }
2✔
4170
                user_commit_version = wt.commit();
2✔
4171
            };
2✔
4172
            th = std::make_unique<std::thread>(std::move(func));
2✔
4173

1✔
4174
            return SyncClientHookAction::NoAction;
2✔
4175
        };
2✔
4176

1✔
4177
    realm = Realm::get_shared_realm(config);
2✔
4178
    auto table = realm->read_group().get_table("class_TopLevel");
2✔
4179
    Query query(table);
2✔
4180
    {
2✔
4181
        auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
4182
        new_subs.insert_or_assign(query);
2✔
4183
        new_subs.commit();
2✔
4184
    }
2✔
4185
    interrupted.get();
2✔
4186
    th->join();
2✔
4187

1✔
4188
    // The user commit is the last one.
1✔
4189
    CHECK(user_commit_version == bootstrap_version + 1);
2!
4190
}
2✔
4191

4192
TEST_CASE("flx: open realm + register subscription callback while bootstrapping",
4193
          "[sync][flx][bootstrap][async open][baas]") {
12✔
4194
    FLXSyncTestHarness harness("flx_bootstrap_and_subscribe");
12✔
4195
    auto foo_obj_id = ObjectId::gen();
12✔
4196
    harness.load_initial_data([&](SharedRealm realm) {
12✔
4197
        CppContext c(realm);
12✔
4198
        Object::create(c, realm, "TopLevel",
12✔
4199
                       std::any(AnyDict{{"_id", foo_obj_id},
12✔
4200
                                        {"queryable_str_field", "foo"s},
12✔
4201
                                        {"queryable_int_field", static_cast<int64_t>(5)},
12✔
4202
                                        {"non_queryable_field", "created as initial data seed"s}}));
12✔
4203
    });
12✔
4204
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
12✔
4205

6✔
4206
    std::atomic<bool> subscription_invoked = false;
12✔
4207
    auto subscription_pf = util::make_promise_future<bool>();
12✔
4208
    // create a subscription to commit when realm is open for the first time or asked to rerun on open
6✔
4209
    auto init_subscription_callback_with_promise =
12✔
4210
        [&, promise_holder = util::CopyablePromiseHolder(std::move(subscription_pf.promise))](
12✔
4211
            std::shared_ptr<Realm> realm) mutable {
10✔
4212
            REQUIRE(realm);
8!
4213
            auto table = realm->read_group().get_table("class_TopLevel");
8✔
4214
            Query query(table);
8✔
4215
            auto subscription = realm->get_latest_subscription_set();
8✔
4216
            auto mutable_subscription = subscription.make_mutable_copy();
8✔
4217
            mutable_subscription.insert_or_assign(query);
8✔
4218
            auto promise = promise_holder.get_promise();
8✔
4219
            mutable_subscription.commit();
8✔
4220
            subscription_invoked = true;
8✔
4221
            promise.emplace_value(true);
8✔
4222
        };
8✔
4223
    // verify that the subscription has changed the database
6✔
4224
    auto verify_subscription = [](SharedRealm realm) {
12✔
4225
        REQUIRE(realm);
12!
4226
        auto table_ref = realm->read_group().get_table("class_TopLevel");
12✔
4227
        REQUIRE(table_ref);
12!
4228
        REQUIRE(table_ref->get_column_count() == 4);
12!
4229
        REQUIRE(table_ref->get_column_key("_id"));
12!
4230
        REQUIRE(table_ref->get_column_key("queryable_str_field"));
12!
4231
        REQUIRE(table_ref->get_column_key("queryable_int_field"));
12!
4232
        REQUIRE(table_ref->get_column_key("non_queryable_field"));
12!
4233
        REQUIRE(table_ref->size() == 1);
12!
4234
        auto str_col = table_ref->get_column_key("queryable_str_field");
12✔
4235
        REQUIRE(table_ref->get_object(0).get<String>(str_col) == "foo");
12!
4236
        return true;
12✔
4237
    };
12✔
4238

6✔
4239
    SECTION("Sync open") {
12✔
4240
        // sync open with subscription callback. Subscription will be run, since this is the first time that realm is
1✔
4241
        // opened
1✔
4242
        subscription_invoked = false;
2✔
4243
        config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
4244
        auto realm = Realm::get_shared_realm(config);
2✔
4245
        REQUIRE(subscription_pf.future.get());
2!
4246
        auto sb = realm->get_latest_subscription_set();
2✔
4247
        auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4248
        auto state = future.get();
2✔
4249
        REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4250
        realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4251
        REQUIRE(verify_subscription(realm));
2!
4252
    }
2✔
4253

6✔
4254
    SECTION("Sync Open + Async Open") {
12✔
4255
        {
2✔
4256
            subscription_invoked = false;
2✔
4257
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
2✔
4258
            auto realm = Realm::get_shared_realm(config);
2✔
4259
            REQUIRE(subscription_pf.future.get());
2!
4260
            auto sb = realm->get_latest_subscription_set();
2✔
4261
            auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4262
            auto state = future.get();
2✔
4263
            REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4264
            realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4265
            REQUIRE(verify_subscription(realm));
2!
4266
        }
2✔
4267
        {
2✔
4268
            auto subscription_pf_async = util::make_promise_future<bool>();
2✔
4269
            auto init_subscription_asyc_callback =
2✔
4270
                [promise_holder_async = util::CopyablePromiseHolder(std::move(subscription_pf_async.promise))](
2✔
4271
                    std::shared_ptr<Realm> realm) mutable {
2✔
4272
                    REQUIRE(realm);
2!
4273
                    auto table = realm->read_group().get_table("class_TopLevel");
2✔
4274
                    Query query(table);
2✔
4275
                    auto subscription = realm->get_latest_subscription_set();
2✔
4276
                    auto mutable_subscription = subscription.make_mutable_copy();
2✔
4277
                    mutable_subscription.insert_or_assign(query);
2✔
4278
                    auto promise = promise_holder_async.get_promise();
2✔
4279
                    mutable_subscription.commit();
2✔
4280
                    promise.emplace_value(true);
2✔
4281
                };
2✔
4282
            auto open_realm_pf = util::make_promise_future<bool>();
2✔
4283
            auto open_realm_completed_callback =
2✔
4284
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4285
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
2✔
4286
                    auto promise = promise_holder.get_promise();
2✔
4287
                    if (err)
2✔
4288
                        promise.emplace_value(false);
×
4289
                    else
2✔
4290
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
2✔
4291
                };
2✔
4292

1✔
4293
            config.sync_config->subscription_initializer = init_subscription_asyc_callback;
2✔
4294
            config.sync_config->rerun_init_subscription_on_open = true;
2✔
4295
            auto async_open = Realm::get_synchronized_realm(config);
2✔
4296
            async_open->start(open_realm_completed_callback);
2✔
4297
            REQUIRE(open_realm_pf.future.get());
2!
4298
            REQUIRE(subscription_pf_async.future.get());
2!
4299
            config.sync_config->rerun_init_subscription_on_open = false;
2✔
4300
            auto realm = Realm::get_shared_realm(config);
2✔
4301
            REQUIRE(realm->get_latest_subscription_set().version() == 2);
2!
4302
            REQUIRE(realm->get_active_subscription_set().version() == 2);
2!
4303
        }
2✔
4304
    }
2✔
4305

6✔
4306
    SECTION("Async open") {
12✔
4307
        SECTION("Initial async open with no rerun on open set") {
8✔
4308
            // subscription will be run since this is the first time we are opening the realm file.
2✔
4309
            auto open_realm_pf = util::make_promise_future<bool>();
4✔
4310
            auto open_realm_completed_callback =
4✔
4311
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
4✔
4312
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4313
                    auto promise = promise_holder.get_promise();
4✔
4314
                    if (err)
4✔
4315
                        promise.emplace_value(false);
×
4316
                    else
4✔
4317
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
4✔
4318
                };
4✔
4319

2✔
4320
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
4✔
4321
            auto async_open = Realm::get_synchronized_realm(config);
4✔
4322
            async_open->start(open_realm_completed_callback);
4✔
4323
            REQUIRE(open_realm_pf.future.get());
4!
4324
            REQUIRE(subscription_pf.future.get());
4!
4325

2✔
4326
            SECTION("rerun on open = false. Subscription not run") {
4✔
4327
                subscription_invoked = false;
2✔
4328
                auto async_open = Realm::get_synchronized_realm(config);
2✔
4329
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
4330
                auto open_realm_completed_callback =
2✔
4331
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4332
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
4333
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
4334
                        // below
1✔
4335
                        promise_holder.get_promise().emplace_value(true);
2✔
4336
                    };
2✔
4337
                async_open->start(open_realm_completed_callback);
2✔
4338
                REQUIRE(open_realm_pf.future.get());
2!
4339
                REQUIRE_FALSE(subscription_invoked.load());
2!
4340
            }
2✔
4341

2✔
4342
            SECTION("rerun on open = true. Subscription not run cause realm already opened once") {
4✔
4343
                subscription_invoked = false;
2✔
4344
                auto realm = Realm::get_shared_realm(config);
2✔
4345
                auto init_subscription = [&subscription_invoked](std::shared_ptr<Realm> realm) mutable {
1✔
4346
                    REQUIRE(realm);
×
4347
                    auto table = realm->read_group().get_table("class_TopLevel");
×
4348
                    Query query(table);
×
4349
                    auto subscription = realm->get_latest_subscription_set();
×
4350
                    auto mutable_subscription = subscription.make_mutable_copy();
×
4351
                    mutable_subscription.insert_or_assign(query);
×
4352
                    mutable_subscription.commit();
×
4353
                    subscription_invoked.store(true);
×
4354
                };
×
4355
                config.sync_config->rerun_init_subscription_on_open = true;
2✔
4356
                config.sync_config->subscription_initializer = init_subscription;
2✔
4357
                auto async_open = Realm::get_synchronized_realm(config);
2✔
4358
                auto open_realm_pf = util::make_promise_future<bool>();
2✔
4359
                auto open_realm_completed_callback =
2✔
4360
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
2✔
4361
                        ThreadSafeReference, std::exception_ptr) mutable {
2✔
4362
                        // no need to verify if the subscription has changed the db, since it has not run as we test
1✔
4363
                        // below
1✔
4364
                        promise_holder.get_promise().emplace_value(true);
2✔
4365
                    };
2✔
4366
                async_open->start(open_realm_completed_callback);
2✔
4367
                REQUIRE(open_realm_pf.future.get());
2!
4368
                REQUIRE_FALSE(subscription_invoked.load());
2!
4369
                REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
4370
                REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
4371
            }
2✔
4372
        }
4✔
4373

4✔
4374
        SECTION("rerun on open set for multiple async open tasks (subscription runs only once)") {
8✔
4375
            auto init_subscription = [](std::shared_ptr<Realm> realm) mutable {
8✔
4376
                REQUIRE(realm);
8!
4377
                auto table = realm->read_group().get_table("class_TopLevel");
8✔
4378
                Query query(table);
8✔
4379
                auto subscription = realm->get_latest_subscription_set();
8✔
4380
                auto mutable_subscription = subscription.make_mutable_copy();
8✔
4381
                mutable_subscription.insert_or_assign(query);
8✔
4382
                mutable_subscription.commit();
8✔
4383
            };
8✔
4384

2✔
4385
            auto open_task1_pf = util::make_promise_future<SharedRealm>();
4✔
4386
            auto open_task2_pf = util::make_promise_future<SharedRealm>();
4✔
4387
            auto open_callback1 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task1_pf.promise))](
4✔
4388
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4389
                REQUIRE_FALSE(err);
4!
4390
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
4391
                REQUIRE(realm);
4!
4392
                promise_holder.get_promise().emplace_value(realm);
4✔
4393
            };
4✔
4394
            auto open_callback2 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task2_pf.promise))](
4✔
4395
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
4✔
4396
                REQUIRE_FALSE(err);
4!
4397
                auto realm = Realm::get_shared_realm(std::move(ref));
4✔
4398
                REQUIRE(realm);
4!
4399
                promise_holder.get_promise().emplace_value(realm);
4✔
4400
            };
4✔
4401

2✔
4402
            config.sync_config->rerun_init_subscription_on_open = true;
4✔
4403
            config.sync_config->subscription_initializer = init_subscription;
4✔
4404

2✔
4405
            SECTION("Realm was already created, but we want to rerun on first open using multiple tasks") {
4✔
4406
                {
2✔
4407
                    subscription_invoked = false;
2✔
4408
                    auto realm = Realm::get_shared_realm(config);
2✔
4409
                    auto sb = realm->get_latest_subscription_set();
2✔
4410
                    auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
2✔
4411
                    auto state = future.get();
2✔
4412
                    REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
2!
4413
                    realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
2✔
4414
                    REQUIRE(verify_subscription(realm));
2!
4415
                    REQUIRE(realm->get_latest_subscription_set().version() == 1);
2!
4416
                    REQUIRE(realm->get_active_subscription_set().version() == 1);
2!
4417
                }
2✔
4418

1✔
4419
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
4420
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
4421
                async_open_task1->start(open_callback1);
2✔
4422
                async_open_task2->start(open_callback2);
2✔
4423

1✔
4424
                auto realm1 = open_task1_pf.future.get();
2✔
4425
                auto realm2 = open_task2_pf.future.get();
2✔
4426

1✔
4427
                const auto version_expected = 2;
2✔
4428
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
4429
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
4430
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
4431
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
4432
                REQUIRE(r1_latest == version_expected);
2!
4433
                REQUIRE(r1_active == version_expected);
2!
4434
            }
2✔
4435
            SECTION("First time realm is created but opened via open async. Both tasks could run the subscription") {
4✔
4436
                auto async_open_task1 = Realm::get_synchronized_realm(config);
2✔
4437
                auto async_open_task2 = Realm::get_synchronized_realm(config);
2✔
4438
                async_open_task1->start(open_callback1);
2✔
4439
                async_open_task2->start(open_callback2);
2✔
4440
                auto realm1 = open_task1_pf.future.get();
2✔
4441
                auto realm2 = open_task2_pf.future.get();
2✔
4442

1✔
4443
                auto r1_latest = realm1->get_latest_subscription_set().version();
2✔
4444
                auto r1_active = realm1->get_active_subscription_set().version();
2✔
4445
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
2!
4446
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
2!
4447
                // the callback may be run twice, if task1 is the first task to open realm
1✔
4448
                // but it is scheduled after tasks2, which have opened realm later but
1✔
4449
                // by the time it runs, subscription version is equal to 0 (realm creation).
1✔
4450
                // This can only happen the first time that realm is created. All the other times
1✔
4451
                // the init_sb callback is guaranteed to run once.
1✔
4452
                REQUIRE(r1_latest >= 1);
2!
4453
                REQUIRE(r1_latest <= 2);
2!
4454
                REQUIRE(r1_active >= 1);
2!
4455
                REQUIRE(r1_active <= 2);
2!
4456
            }
2✔
4457
        }
4✔
4458
    }
8✔
4459
}
12✔
4460
TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset][async open][baas]") {
2✔
4461
    FLXSyncTestHarness harness("flx_bootstrap_reset");
2✔
4462
    auto foo_obj_id = ObjectId::gen();
2✔
4463
    std::atomic<bool> subscription_invoked = false;
2✔
4464
    harness.load_initial_data([&](SharedRealm realm) {
2✔
4465
        CppContext c(realm);
2✔
4466
        Object::create(c, realm, "TopLevel",
2✔
4467
                       std::any(AnyDict{{"_id", foo_obj_id},
2✔
4468
                                        {"queryable_str_field", "foo"s},
2✔
4469
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
4470
                                        {"non_queryable_field", "created as initial data seed"s}}));
2✔
4471
    });
2✔
4472
    SyncTestFile realm_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
4473

1✔
4474
    auto subscription_callback = [&](std::shared_ptr<Realm> realm) {
2✔
4475
        REQUIRE(realm);
2!
4476
        auto table = realm->read_group().get_table("class_TopLevel");
2✔
4477
        Query query(table);
2✔
4478
        auto subscription = realm->get_latest_subscription_set();
2✔
4479
        auto mutable_subscription = subscription.make_mutable_copy();
2✔
4480
        mutable_subscription.insert_or_assign(query);
2✔
4481
        subscription_invoked = true;
2✔
4482
        mutable_subscription.commit();
2✔
4483
    };
2✔
4484

1✔
4485
    auto before_callback_called = util::make_promise_future<void>();
2✔
4486
    auto after_callback_called = util::make_promise_future<void>();
2✔
4487
    realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
2✔
4488
    realm_config.sync_config->subscription_initializer = subscription_callback;
2✔
4489

1✔
4490
    realm_config.sync_config->on_sync_client_event_hook =
2✔
4491
        [&, client_reset_triggered = false](std::weak_ptr<SyncSession> weak_sess,
2✔
4492
                                            const SyncClientHookData& event_data) mutable {
28✔
4493
            auto sess = weak_sess.lock();
28✔
4494
            if (!sess) {
28✔
4495
                return SyncClientHookAction::NoAction;
×
4496
            }
×
4497
            if (sess->path() != realm_config.path) {
28✔
4498
                return SyncClientHookAction::NoAction;
16✔
4499
            }
16✔
4500

7✔
4501
            if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
12✔
4502
                return SyncClientHookAction::NoAction;
10✔
4503
            }
10✔
4504

1✔
4505
            if (client_reset_triggered) {
2✔
4506
                return SyncClientHookAction::NoAction;
×
4507
            }
×
4508
            client_reset_triggered = true;
2✔
4509
            reset_utils::trigger_client_reset(harness.session().app_session());
2✔
4510
            return SyncClientHookAction::EarlyReturn;
2✔
4511
        };
2✔
4512

1✔
4513
    realm_config.sync_config->notify_before_client_reset =
2✔
4514
        [promise = util::CopyablePromiseHolder(std::move(before_callback_called.promise))](
2✔
4515
            std::shared_ptr<Realm> realm) mutable {
2✔
4516
            CHECK(realm->schema_version() == 1);
2!
4517
            promise.get_promise().emplace_value();
2✔
4518
        };
2✔
4519

1✔
4520
    realm_config.sync_config->notify_after_client_reset =
2✔
4521
        [promise = util::CopyablePromiseHolder(std::move(after_callback_called.promise))](
2✔
4522
            std::shared_ptr<Realm> realm, ThreadSafeReference, bool) mutable {
2✔
4523
            CHECK(realm->schema_version() == 1);
2!
4524
            promise.get_promise().emplace_value();
2✔
4525
        };
2✔
4526

1✔
4527
    auto realm_task = Realm::get_synchronized_realm(realm_config);
2✔
4528
    auto realm_pf = util::make_promise_future<SharedRealm>();
2✔
4529
    realm_task->start([promise_holder = util::CopyablePromiseHolder(std::move(realm_pf.promise))](
2✔
4530
                          ThreadSafeReference ref, std::exception_ptr ex) mutable {
2✔
4531
        auto promise = promise_holder.get_promise();
2✔
4532
        if (ex) {
2✔
4533
            try {
×
4534
                std::rethrow_exception(ex);
×
4535
            }
×
4536
            catch (...) {
×
4537
                promise.set_error(exception_to_status());
×
4538
            }
×
4539
            return;
×
4540
        }
2✔
4541
        auto realm = Realm::get_shared_realm(std::move(ref));
2✔
4542
        if (!realm) {
2✔
4543
            promise.set_error({ErrorCodes::RuntimeError, "could not get realm from threadsaferef"});
×
4544
        }
×
4545
        promise.emplace_value(std::move(realm));
2✔
4546
    });
2✔
4547
    auto realm = realm_pf.future.get();
2✔
4548
    before_callback_called.future.get();
2✔
4549
    after_callback_called.future.get();
2✔
4550
    REQUIRE(subscription_invoked.load());
2!
4551
}
2✔
4552

4553
// Test that resending pending subscription sets does not cause any inconsistencies in the progress cursors.
4554
TEST_CASE("flx sync: resend pending subscriptions when reconnecting", "[sync][flx][baas]") {
2✔
4555
    FLXSyncTestHarness harness("flx_pending_subscriptions", {g_large_array_schema, {"queryable_int_field"}});
2✔
4556

1✔
4557
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
2✔
4558
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
2✔
4559
                                          SyncConfig::FLXSyncEnabled{});
2✔
4560
    interrupted_realm_config.cache = false;
2✔
4561

1✔
4562
    {
2✔
4563
        auto pf = util::make_promise_future<void>();
2✔
4564
        Realm::Config config = interrupted_realm_config;
2✔
4565
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
2✔
4566
        config.sync_config->on_sync_client_event_hook =
2✔
4567
            [promise = util::CopyablePromiseHolder(std::move(pf.promise))](std::weak_ptr<SyncSession> weak_session,
2✔
4568
                                                                           const SyncClientHookData& data) mutable {
50✔
4569
                if (data.event != SyncClientHookEvent::BootstrapMessageProcessed &&
50✔
4570
                    data.event != SyncClientHookEvent::BootstrapProcessed) {
43✔
4571
                    return SyncClientHookAction::NoAction;
32✔
4572
                }
32✔
4573
                auto session = weak_session.lock();
18✔
4574
                if (!session) {
18✔
4575
                    return SyncClientHookAction::NoAction;
×
4576
                }
×
4577
                if (data.query_version != 1) {
18✔
4578
                    return SyncClientHookAction::NoAction;
4✔
4579
                }
4✔
4580

7✔
4581
                // Commit a subscriptions set whenever a bootstrap message is received for query version 1.
7✔
4582
                if (data.event == SyncClientHookEvent::BootstrapMessageProcessed) {
14✔
4583
                    auto latest_subs = session->get_flx_subscription_store()->get_latest().make_mutable_copy();
12✔
4584
                    latest_subs.commit();
12✔
4585
                    return SyncClientHookAction::NoAction;
12✔
4586
                }
12✔
4587
                // At least one subscription set was created.
1✔
4588
                CHECK(session->get_flx_subscription_store()->get_latest().version() > 1);
2!
4589
                promise.get_promise().emplace_value();
2✔
4590
                // Reconnect once query version 1 is bootstrapped.
1✔
4591
                return SyncClientHookAction::TriggerReconnect;
2✔
4592
            };
2✔
4593

1✔
4594
        auto realm = Realm::get_shared_realm(config);
2✔
4595
        {
2✔
4596
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
2✔
4597
            auto table = realm->read_group().get_table("class_TopLevel");
2✔
4598
            mut_subs.insert_or_assign(Query(table));
2✔
4599
            mut_subs.commit();
2✔
4600
        }
2✔
4601
        pf.future.get();
2✔
4602
        realm->sync_session()->shutdown_and_wait();
2✔
4603
        realm->close();
2✔
4604
    }
2✔
4605

1✔
4606
    _impl::RealmCoordinator::assert_no_open_realms();
2✔
4607

1✔
4608
    // Check at least one subscription set needs to be resent.
1✔
4609
    {
2✔
4610
        DBOptions options;
2✔
4611
        options.encryption_key = test_util::crypt_key();
2✔
4612
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
2✔
4613
        auto sub_store = sync::SubscriptionStore::create(realm);
2✔
4614
        auto version_info = sub_store->get_version_info();
2✔
4615
        REQUIRE(version_info.latest > version_info.active);
2!
4616
    }
2✔
4617

1✔
4618
    // Resend the pending subscriptions.
1✔
4619
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
2✔
4620
    wait_for_upload(*realm);
2✔
4621
    wait_for_download(*realm);
2✔
4622
}
2✔
4623

4624
TEST_CASE("flx: fatal errors and session becoming inactive cancel pending waits", "[sync][flx][baas]") {
2✔
4625
    std::vector<ObjectSchema> schema{
2✔
4626
        {"TopLevel",
2✔
4627
         {
2✔
4628
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
2✔
4629
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
2✔
4630
         }},
2✔
4631
    };
2✔
4632

1✔
4633
    FLXSyncTestHarness harness("flx_cancel_pending_waits", {schema, {"queryable_int_field"}});
2✔
4634
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
2✔
4635

1✔
4636
    auto check_status = [](auto status) {
4✔
4637
        CHECK(!status.is_ok());
4!
4638
        std::string reason = status.get_status().reason();
4✔
4639
        // Subscription notification is cancelled either because the sync session is inactive, or because a fatal
2✔
4640
        // error is received from the server.
2✔
4641
        if (reason.find("Sync session became inactive") == std::string::npos &&
4✔
4642
            reason.find("Invalid schema change (UPLOAD): non-breaking schema change: adding \"Int\" column at field "
3✔
4643
                        "\"other_col\" in schema \"TopLevel\", schema changes from clients are restricted when "
2✔
4644
                        "developer mode is disabled") == std::string::npos) {
1✔
4645
            FAIL(reason);
×
4646
        }
×
4647
    };
4✔
4648

1✔
4649
    auto create_subscription = [](auto realm) -> realm::sync::SubscriptionSet {
4✔
4650
        auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
4✔
4651
        auto table = realm->read_group().get_table("class_TopLevel");
4✔
4652
        mut_subs.insert_or_assign(Query(table));
4✔
4653
        return mut_subs.commit();
4✔
4654
    };
4✔
4655

1✔
4656
    auto [error_occured_promise, error_occurred] = util::make_promise_future<void>();
2✔
4657
    config.sync_config->error_handler = [promise = util::CopyablePromiseHolder(std::move(error_occured_promise))](
2✔
4658
                                            std::shared_ptr<SyncSession>, SyncError) mutable {
2✔
4659
        promise.get_promise().emplace_value();
2✔
4660
    };
2✔
4661

1✔
4662
    auto realm = Realm::get_shared_realm(config);
2✔
4663
    wait_for_download(*realm);
2✔
4664

1✔
4665
    auto subs = create_subscription(realm);
2✔
4666
    auto subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
4667

1✔
4668
    realm->sync_session()->pause();
2✔
4669
    auto state = subs_future.get_no_throw();
2✔
4670
    check_status(state);
2✔
4671

1✔
4672
    auto [download_complete_promise, download_complete] = util::make_promise_future<void>();
2✔
4673
    realm->sync_session()->wait_for_upload_completion([promise = std::move(download_complete_promise)](auto) mutable {
2✔
4674
        promise.emplace_value();
2✔
4675
    });
2✔
4676
    schema[0].persisted_properties.push_back({"other_col", PropertyType::Int | PropertyType::Nullable});
2✔
4677
    realm->update_schema(schema);
2✔
4678

1✔
4679
    subs = create_subscription(realm);
2✔
4680
    subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
2✔
4681

1✔
4682
    harness.load_initial_data([&](SharedRealm realm) {
2✔
4683
        CppContext c(realm);
2✔
4684
        Object::create(c, realm, "TopLevel",
2✔
4685
                       std::any(AnyDict{{"_id", ObjectId::gen()},
2✔
4686
                                        {"queryable_int_field", static_cast<int64_t>(5)},
2✔
4687
                                        {"other_col", static_cast<int64_t>(42)}}));
2✔
4688
    });
2✔
4689

1✔
4690
    realm->sync_session()->resume();
2✔
4691
    download_complete.get();
2✔
4692
    error_occurred.get();
2✔
4693
    state = subs_future.get_no_throw();
2✔
4694
    check_status(state);
2✔
4695
}
2✔
4696

4697
} // namespace realm::app
4698

4699
#endif // REALM_ENABLE_AUTH_TESTS
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc