• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2210

09 Apr 2024 03:41PM UTC coverage: 92.601% (+0.5%) from 92.106%
2210

push

Evergreen

web-flow
Merge pull request #7300 from realm/tg/rework-metadata-storage

Rework sync user handling and metadata storage

102800 of 195548 branches covered (52.57%)

3051 of 3153 new or added lines in 46 files covered. (96.76%)

41 existing lines in 11 files now uncovered.

249129 of 269035 relevant lines covered (92.6%)

46864217.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.22
/test/object-store/sync/flx_sync.cpp
1
////////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2021 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#if REALM_ENABLE_AUTH_TESTS
20

21
#include <catch2/catch_all.hpp>
22

23
#include <util/test_file.hpp>
24
#include <util/crypt_key.hpp>
25
#include <util/sync/flx_sync_harness.hpp>
26
#include <util/sync/sync_test_utils.hpp>
27

28
#include <realm/object_id.hpp>
29
#include <realm/query_expression.hpp>
30

31
#include <realm/object-store/binding_context.hpp>
32
#include <realm/object-store/impl/object_accessor_impl.hpp>
33
#include <realm/object-store/impl/realm_coordinator.hpp>
34
#include <realm/object-store/schema.hpp>
35
#include <realm/object-store/sync/generic_network_transport.hpp>
36
#include <realm/object-store/sync/mongo_client.hpp>
37
#include <realm/object-store/sync/mongo_database.hpp>
38
#include <realm/object-store/sync/mongo_collection.hpp>
39
#include <realm/object-store/sync/async_open_task.hpp>
40
#include <realm/util/bson/bson.hpp>
41
#include <realm/object-store/sync/sync_session.hpp>
42

43
#include <realm/sync/client_base.hpp>
44
#include <realm/sync/config.hpp>
45
#include <realm/sync/noinst/client_history_impl.hpp>
46
#include <realm/sync/noinst/client_reset.hpp>
47
#include <realm/sync/noinst/client_reset_operation.hpp>
48
#include <realm/sync/noinst/pending_bootstrap_store.hpp>
49
#include <realm/sync/noinst/server/access_token.hpp>
50
#include <realm/sync/protocol.hpp>
51
#include <realm/sync/subscriptions.hpp>
52

53
#include <realm/util/future.hpp>
54
#include <realm/util/logger.hpp>
55

56
#include <catch2/catch_all.hpp>
57

58
#include <filesystem>
59
#include <iostream>
60
#include <stdexcept>
61

62
using namespace std::string_literals;
63

64
namespace realm {
65

66
class TestHelper {
67
public:
68
    static DBRef& get_db(SharedRealm const& shared_realm)
69
    {
70
        return Realm::Internal::get_db(*shared_realm);
71
    }
72
};
73

74
} // namespace realm
75

76
namespace realm::app {
77

78
namespace {
79
const Schema g_minimal_schema{
80
    {"TopLevel",
81
     {
82
         {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
83
     }},
84
};
85

86
const Schema g_large_array_schema{
87
    ObjectSchema("TopLevel",
88
                 {
89
                     {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
90
                     {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
91
                     {"list_of_strings", PropertyType::Array | PropertyType::String},
92
                 }),
93
};
94

95
const Schema g_simple_embedded_obj_schema{
96
    {"TopLevel",
97
     {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
98
      {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
99
      {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
100
    {"TopLevel_embedded_obj",
101
     ObjectSchema::ObjectType::Embedded,
102
     {
103
         {"str_field", PropertyType::String | PropertyType::Nullable},
104
     }},
105
};
106

107
// Populates a FLXSyncTestHarness with the g_large_array_schema with objects that are large enough that
108
// they are guaranteed to fill multiple bootstrap download messages. Currently this means generating 5
109
// objects each with 1024 array entries of 1024 bytes each.
110
//
111
// Returns a list of the _id values for the objects created.
112
std::vector<ObjectId> fill_large_array_schema(FLXSyncTestHarness& harness)
113
{
119✔
114
    std::vector<ObjectId> ret;
119✔
115
    REQUIRE(harness.schema() == g_large_array_schema);
119!
116
    harness.load_initial_data([&](SharedRealm realm) {
119✔
117
        CppContext c(realm);
119✔
118
        for (int i = 0; i < 5; ++i) {
714✔
119
            auto id = ObjectId::gen();
595✔
120
            auto obj = Object::create(c, realm, "TopLevel",
595✔
121
                                      std::any(AnyDict{{"_id", id},
595✔
122
                                                       {"list_of_strings", AnyVector{}},
595✔
123
                                                       {"queryable_int_field", static_cast<int64_t>(i * 5)}}));
595✔
124
            List str_list(obj, realm->schema().find("TopLevel")->property_for_name("list_of_strings"));
595✔
125
            for (int j = 0; j < 1024; ++j) {
609,875✔
126
                str_list.add(c, std::any(std::string(1024, 'a' + (j % 26))));
609,280✔
127
            }
609,280✔
128

280✔
129
            ret.push_back(id);
595✔
130
        }
595✔
131
    });
119✔
132
    return ret;
119✔
133
}
119✔
134

135
} // namespace
136

137
TEST_CASE("flx: connect to FLX-enabled app", "[sync][flx][baas]") {
17✔
138
    FLXSyncTestHarness harness("basic_flx_connect");
17✔
139

8✔
140
    auto foo_obj_id = ObjectId::gen();
17✔
141
    auto bar_obj_id = ObjectId::gen();
17✔
142
    harness.load_initial_data([&](SharedRealm realm) {
17✔
143
        CppContext c(realm);
17✔
144
        Object::create(c, realm, "TopLevel",
17✔
145
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
146
                                        {"queryable_str_field", "foo"s},
17✔
147
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
148
                                        {"non_queryable_field", "non queryable 1"s}}));
17✔
149
        Object::create(c, realm, "TopLevel",
17✔
150
                       std::any(AnyDict{{"_id", bar_obj_id},
17✔
151
                                        {"queryable_str_field", "bar"s},
17✔
152
                                        {"queryable_int_field", static_cast<int64_t>(10)},
17✔
153
                                        {"non_queryable_field", "non queryable 2"s}}));
17✔
154
    });
17✔
155

8✔
156

8✔
157
    harness.do_with_new_realm([&](SharedRealm realm) {
17✔
158
        {
17✔
159
            auto empty_subs = realm->get_latest_subscription_set();
17✔
160
            CHECK(empty_subs.size() == 0);
17!
161
            CHECK(empty_subs.version() == 0);
17!
162
            empty_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
163
        }
17✔
164

8✔
165
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
166
        auto col_key = table->get_column_key("queryable_str_field");
17✔
167
        Query query_foo(table);
17✔
168
        query_foo.equal(col_key, "foo");
17✔
169
        {
17✔
170
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
171
            new_subs.insert_or_assign(query_foo);
17✔
172
            auto subs = new_subs.commit();
17✔
173
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
174
        }
17✔
175

8✔
176
        {
17✔
177
            wait_for_advance(*realm);
17✔
178
            Results results(realm, table);
17✔
179
            CHECK(results.size() == 1);
17!
180
            auto obj = results.get<Obj>(0);
17✔
181
            CHECK(obj.is_valid());
17!
182
            CHECK(obj.get<ObjectId>("_id") == foo_obj_id);
17!
183
        }
17✔
184

8✔
185
        {
17✔
186
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
187
            Query new_query_bar(table);
17✔
188
            new_query_bar.equal(col_key, "bar");
17✔
189
            mut_subs.insert_or_assign(new_query_bar);
17✔
190
            auto subs = mut_subs.commit();
17✔
191
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
192
        }
17✔
193

8✔
194
        {
17✔
195
            wait_for_advance(*realm);
17✔
196
            Results results(realm, Query(table));
17✔
197
            CHECK(results.size() == 2);
17!
198
        }
17✔
199

8✔
200
        {
17✔
201
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
202
            CHECK(mut_subs.erase(query_foo));
17!
203
            Query new_query_bar(table);
17✔
204
            new_query_bar.equal(col_key, "bar");
17✔
205
            mut_subs.insert_or_assign(new_query_bar);
17✔
206
            auto subs = mut_subs.commit();
17✔
207
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
208
        }
17✔
209

8✔
210
        {
17✔
211
            wait_for_advance(*realm);
17✔
212
            Results results(realm, Query(table));
17✔
213
            CHECK(results.size() == 1);
17!
214
            auto obj = results.get<Obj>(0);
17✔
215
            CHECK(obj.is_valid());
17!
216
            CHECK(obj.get<ObjectId>("_id") == bar_obj_id);
17!
217
        }
17✔
218

8✔
219
        {
17✔
220
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
221
            mut_subs.clear();
17✔
222
            auto subs = mut_subs.commit();
17✔
223
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
224
        }
17✔
225

8✔
226
        {
17✔
227
            wait_for_advance(*realm);
17✔
228
            Results results(realm, table);
17✔
229
            CHECK(results.size() == 0);
17!
230
        }
17✔
231
    });
17✔
232
}
17✔
233

234
TEST_CASE("flx: test commands work", "[sync][flx][test command][baas]") {
17✔
235
    FLXSyncTestHarness harness("test_commands");
17✔
236
    harness.do_with_new_realm([&](const SharedRealm& realm) {
17✔
237
        wait_for_upload(*realm);
17✔
238
        nlohmann::json command_request = {
17✔
239
            {"command", "PAUSE_ROUTER_SESSION"},
17✔
240
        };
17✔
241
        auto resp_body =
17✔
242
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), command_request.dump()).get();
17✔
243
        REQUIRE(resp_body == "{}");
17!
244

8✔
245
        auto bad_status =
17✔
246
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "foobar: }").get_no_throw();
17✔
247
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
17!
248
        REQUIRE_THAT(bad_status.get_status().reason(),
17✔
249
                     Catch::Matchers::ContainsSubstring("Invalid json input to send_test_command"));
17✔
250

8✔
251
        bad_status =
17✔
252
            SyncSession::OnlyForTesting::send_test_command(*realm->sync_session(), "{\"cmd\": \"\"}").get_no_throw();
17✔
253
        REQUIRE_FALSE(bad_status.is_ok());
17!
254
        REQUIRE(bad_status.get_status() == ErrorCodes::LogicError);
17!
255
        REQUIRE(bad_status.get_status().reason() ==
17!
256
                "Must supply command name in \"command\" field of test command json object");
17✔
257
    });
17✔
258
}
17✔
259

260

261
static auto make_error_handler()
262
{
340✔
263
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
340✔
264
    auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
340✔
265
    auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) {
322✔
266
        error_promise->emplace_value(std::move(err));
306✔
267
    };
306✔
268
    return std::make_pair(std::move(error_future), std::move(fn));
340✔
269
}
340✔
270

271
static auto make_client_reset_handler()
272
{
153✔
273
    auto [reset_promise, reset_future] = util::make_promise_future<ClientResyncMode>();
153✔
274
    auto shared_promise = std::make_shared<decltype(reset_promise)>(std::move(reset_promise));
153✔
275
    auto fn = [reset_promise = std::move(shared_promise)](SharedRealm, ThreadSafeReference, bool did_recover) {
153✔
276
        reset_promise->emplace_value(did_recover ? ClientResyncMode::Recover : ClientResyncMode::DiscardLocal);
117✔
277
    };
153✔
278
    return std::make_pair(std::move(reset_future), std::move(fn));
153✔
279
}
153✔
280

281

282
TEST_CASE("app: error handling integration test", "[sync][flx][baas]") {
153✔
283
    static std::optional<FLXSyncTestHarness> harness{"error_handling"};
153✔
284
    create_user_and_log_in(harness->app());
153✔
285
    SyncTestFile config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
153✔
286
    auto&& [error_future, error_handler] = make_error_handler();
153✔
287
    config.sync_config->error_handler = std::move(error_handler);
153✔
288
    config.sync_config->client_resync_mode = ClientResyncMode::Manual;
153✔
289

72✔
290
    SECTION("Resuming while waiting for session to auto-resume") {
153✔
291
        enum class TestState { InitialSuspend, InitialResume, SecondBind, SecondSuspend, SecondResume, Done };
17✔
292
        TestingStateMachine<TestState> state(TestState::InitialSuspend);
17✔
293
        config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession>,
17✔
294
                                                            const SyncClientHookData& data) {
255✔
295
            std::optional<TestState> wait_for;
255✔
296
            auto event = data.event;
255✔
297
            state.transition_with([&](TestState state) -> std::optional<TestState> {
255✔
298
                if (state == TestState::InitialSuspend && event == SyncClientHookEvent::SessionSuspended) {
255✔
299
                    // If we're getting suspended for the first time, notify the test thread that we're
8✔
300
                    // ready to be resumed.
8✔
301
                    wait_for = TestState::SecondBind;
17✔
302
                    return TestState::InitialResume;
17✔
303
                }
17✔
304
                else if (state == TestState::SecondBind && data.event == SyncClientHookEvent::BindMessageSent) {
238✔
305
                    return TestState::SecondSuspend;
17✔
306
                }
17✔
307
                else if (state == TestState::SecondSuspend && event == SyncClientHookEvent::SessionSuspended) {
221✔
308
                    wait_for = TestState::Done;
17✔
309
                    return TestState::SecondResume;
17✔
310
                }
17✔
311
                return std::nullopt;
204✔
312
            });
204✔
313
            if (wait_for) {
255✔
314
                state.wait_for(*wait_for);
34✔
315
            }
34✔
316
            return SyncClientHookAction::NoAction;
255✔
317
        };
255✔
318
        auto r = Realm::get_shared_realm(config);
17✔
319
        wait_for_upload(*r);
17✔
320
        nlohmann::json error_body = {
17✔
321
            {"tryAgain", true},           {"message", "fake error"},
17✔
322
            {"shouldClientReset", false}, {"isRecoveryModeDisabled", false},
17✔
323
            {"action", "Transient"},      {"backoffIntervalSec", 900},
17✔
324
            {"backoffMaxDelaySec", 900},  {"backoffMultiplier", 1},
17✔
325
        };
17✔
326
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
17✔
327
                                       {"args", nlohmann::json{{"errorCode", 229}, {"errorBody", error_body}}}};
17✔
328

8✔
329
        // First we trigger a retryable transient error that should cause the client to try to resume the
8✔
330
        // session in 5 minutes.
8✔
331
        auto test_cmd_res =
17✔
332
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
333
                .get();
17✔
334
        REQUIRE(test_cmd_res == "{}");
17!
335

8✔
336
        // Wait for the
8✔
337
        state.wait_for(TestState::InitialResume);
17✔
338

8✔
339
        // Once we're suspended, immediately tell the sync client to resume the session. This should cancel the
8✔
340
        // timer that would have auto-resumed the session.
8✔
341
        r->sync_session()->handle_reconnect();
17✔
342
        state.transition_with([&](TestState cur_state) {
17✔
343
            REQUIRE(cur_state == TestState::InitialResume);
17!
344
            return TestState::SecondBind;
17✔
345
        });
17✔
346
        state.wait_for(TestState::SecondSuspend);
17✔
347

8✔
348
        // Once we're connected again trigger another retryable transient error. Before RCORE-1770 the timer
8✔
349
        // to auto-resume the session would have still been active here and we would crash when trying to start
8✔
350
        // a second timer to auto-resume after this error.
8✔
351
        test_cmd_res =
17✔
352
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
353
                .get();
17✔
354
        REQUIRE(test_cmd_res == "{}");
17!
355
        state.wait_for(TestState::SecondResume);
17✔
356

8✔
357
        // Finally resume the session again which should cancel the second timer and the session should auto-resume
8✔
358
        // normally without crashing.
8✔
359
        r->sync_session()->handle_reconnect();
17✔
360
        state.transition_with([&](TestState cur_state) {
17✔
361
            REQUIRE(cur_state == TestState::SecondResume);
17!
362
            return TestState::Done;
17✔
363
        });
17✔
364
        wait_for_download(*r);
17✔
365
    }
17✔
366

72✔
367
    SECTION("handles unknown errors gracefully") {
153✔
368
        auto r = Realm::get_shared_realm(config);
17✔
369
        wait_for_download(*r);
17✔
370
        nlohmann::json error_body = {
17✔
371
            {"tryAgain", false},         {"message", "fake error"},
17✔
372
            {"shouldClientReset", true}, {"isRecoveryModeDisabled", false},
17✔
373
            {"action", "ClientReset"},
17✔
374
        };
17✔
375
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
17✔
376
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
17✔
377
        auto test_cmd_res =
17✔
378
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
379
                .get();
17✔
380
        REQUIRE(test_cmd_res == "{}");
17!
381
        auto error = wait_for_future(std::move(error_future)).get();
17✔
382
        REQUIRE(error.status == ErrorCodes::UnknownError);
17!
383
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
17!
384
        REQUIRE(error.is_fatal);
17!
385
        REQUIRE_THAT(error.status.reason(),
17✔
386
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
17✔
387
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
17✔
388
    }
17✔
389

72✔
390
    SECTION("unknown errors without actions are application bugs") {
153✔
391
        auto r = Realm::get_shared_realm(config);
17✔
392
        wait_for_download(*r);
17✔
393
        nlohmann::json error_body = {
17✔
394
            {"tryAgain", false},
17✔
395
            {"message", "fake error"},
17✔
396
            {"shouldClientReset", false},
17✔
397
            {"isRecoveryModeDisabled", false},
17✔
398
        };
17✔
399
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
17✔
400
                                       {"args", nlohmann::json{{"errorCode", 299}, {"errorBody", error_body}}}};
17✔
401
        auto test_cmd_res =
17✔
402
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
403
                .get();
17✔
404
        REQUIRE(test_cmd_res == "{}");
17!
405
        auto error = wait_for_future(std::move(error_future)).get();
17✔
406
        REQUIRE(error.status == ErrorCodes::UnknownError);
17!
407
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
17!
408
        REQUIRE(error.is_fatal);
17!
409
        REQUIRE_THAT(error.status.reason(),
17✔
410
                     Catch::Matchers::ContainsSubstring("Unknown sync protocol error code 299"));
17✔
411
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
17✔
412
    }
17✔
413

72✔
414
    SECTION("handles unknown actions gracefully") {
153✔
415
        auto r = Realm::get_shared_realm(config);
17✔
416
        wait_for_download(*r);
17✔
417
        nlohmann::json error_body = {
17✔
418
            {"tryAgain", false},
17✔
419
            {"message", "fake error"},
17✔
420
            {"shouldClientReset", true},
17✔
421
            {"isRecoveryModeDisabled", false},
17✔
422
            {"action", "FakeActionThatWillNeverExist"},
17✔
423
        };
17✔
424
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
17✔
425
                                       {"args", nlohmann::json{{"errorCode", 201}, {"errorBody", error_body}}}};
17✔
426
        auto test_cmd_res =
17✔
427
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
428
                .get();
17✔
429
        REQUIRE(test_cmd_res == "{}");
17!
430
        auto error = wait_for_future(std::move(error_future)).get();
17✔
431
        REQUIRE(error.status == ErrorCodes::RuntimeError);
17!
432
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
17!
433
        REQUIRE(error.is_fatal);
17!
434
        REQUIRE_THAT(error.status.reason(), !Catch::Matchers::ContainsSubstring("Unknown sync protocol error code"));
17✔
435
        REQUIRE_THAT(error.status.reason(), Catch::Matchers::ContainsSubstring("fake error"));
17✔
436
    }
17✔
437

72✔
438

72✔
439
    SECTION("unknown connection-level errors are still errors") {
153✔
440
        auto r = Realm::get_shared_realm(config);
17✔
441
        wait_for_download(*r);
17✔
442
        nlohmann::json error_body = {{"tryAgain", false},
17✔
443
                                     {"message", "fake error"},
17✔
444
                                     {"shouldClientReset", false},
17✔
445
                                     {"isRecoveryModeDisabled", false},
17✔
446
                                     {"action", "ApplicationBug"}};
17✔
447
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
17✔
448
                                       {"args", nlohmann::json{{"errorCode", 199}, {"errorBody", error_body}}}};
17✔
449
        auto test_cmd_res =
17✔
450
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
17✔
451
                .get();
17✔
452
        REQUIRE(test_cmd_res == "{}");
17!
453
        auto error = wait_for_future(std::move(error_future)).get();
17✔
454
        REQUIRE(error.status == ErrorCodes::SyncProtocolInvariantFailed);
17!
455
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ProtocolViolation);
17!
456
        REQUIRE(error.is_fatal);
17!
457
    }
17✔
458

72✔
459
    SECTION("client reset errors") {
153✔
460
        auto r = Realm::get_shared_realm(config);
51✔
461
        wait_for_download(*r);
51✔
462
        nlohmann::json error_body = {{"tryAgain", false},
51✔
463
                                     {"message", "fake error"},
51✔
464
                                     {"shouldClientReset", true},
51✔
465
                                     {"isRecoveryModeDisabled", false},
51✔
466
                                     {"action", "ClientReset"}};
51✔
467
        auto code = GENERATE(sync::ProtocolError::bad_client_file_ident, sync::ProtocolError::bad_server_version,
51✔
468
                             sync::ProtocolError::diverging_histories);
51✔
469
        nlohmann::json test_command = {{"command", "ECHO_ERROR"},
51✔
470
                                       {"args", nlohmann::json{{"errorCode", code}, {"errorBody", error_body}}}};
51✔
471
        auto test_cmd_res =
51✔
472
            wait_for_future(SyncSession::OnlyForTesting::send_test_command(*r->sync_session(), test_command.dump()))
51✔
473
                .get();
51✔
474
        REQUIRE(test_cmd_res == "{}");
51!
475
        auto error = wait_for_future(std::move(error_future)).get();
51✔
476
        REQUIRE(error.status == ErrorCodes::SyncClientResetRequired);
51!
477
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::ClientReset);
51!
478
        REQUIRE(error.is_client_reset_requested());
51!
479
        REQUIRE(error.is_fatal);
51!
480
    }
51✔
481

72✔
482

72✔
483
    SECTION("teardown") {
153✔
484
        harness.reset();
17✔
485
    }
17✔
486
}
153✔
487

488

489
TEST_CASE("flx: client reset", "[sync][flx][client reset][baas]") {
374✔
490
    std::vector<ObjectSchema> schema{
374✔
491
        {"TopLevel",
374✔
492
         {
374✔
493
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
374✔
494
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
374✔
495
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
374✔
496
             {"non_queryable_field", PropertyType::String | PropertyType::Nullable},
374✔
497
             {"list_of_ints_field", PropertyType::Int | PropertyType::Array},
374✔
498
             {"sum_of_list_field", PropertyType::Int},
374✔
499
         }},
374✔
500
        {"TopLevel2",
374✔
501
         {
374✔
502
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
374✔
503
             {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
374✔
504
         }},
374✔
505
    };
374✔
506

176✔
507
    // some of these tests make additive schema changes which is only allowed in dev mode
176✔
508
    constexpr bool dev_mode = true;
374✔
509
    FLXSyncTestHarness harness("flx_client_reset",
374✔
510
                               {schema, {"queryable_str_field", "queryable_int_field"}, {}, dev_mode});
374✔
511

176✔
512
    auto add_object = [](SharedRealm realm, std::string str_field, int64_t int_field,
374✔
513
                         ObjectId oid = ObjectId::gen()) {
1,037✔
514
        CppContext c(realm);
1,037✔
515
        realm->begin_transaction();
1,037✔
516

488✔
517
        int64_t r1 = random_int();
1,037✔
518
        int64_t r2 = random_int();
1,037✔
519
        int64_t r3 = random_int();
1,037✔
520
        int64_t sum = uint64_t(r1) + r2 + r3;
1,037✔
521

488✔
522
        Object::create(c, realm, "TopLevel",
1,037✔
523
                       std::any(AnyDict{{"_id", oid},
1,037✔
524
                                        {"queryable_str_field", str_field},
1,037✔
525
                                        {"queryable_int_field", int_field},
1,037✔
526
                                        {"non_queryable_field", "non queryable 1"s},
1,037✔
527
                                        {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
1,037✔
528
                                        {"sum_of_list_field", sum}}));
1,037✔
529
        realm->commit_transaction();
1,037✔
530
    };
1,037✔
531

176✔
532
    auto subscribe_to_and_add_objects = [&](SharedRealm realm, size_t num_objects) {
356✔
533
        auto table = realm->read_group().get_table("class_TopLevel");
340✔
534
        auto id_col = table->get_primary_key_column();
340✔
535
        auto sub_set = realm->get_latest_subscription_set();
340✔
536
        for (size_t i = 0; i < num_objects; ++i) {
1,105✔
537
            auto oid = ObjectId::gen();
765✔
538
            auto mut_sub = sub_set.make_mutable_copy();
765✔
539
            mut_sub.clear();
765✔
540
            mut_sub.insert_or_assign(Query(table).equal(id_col, oid));
765✔
541
            sub_set = mut_sub.commit();
765✔
542
            add_object(realm, util::format("added _id='%1'", oid), 0, oid);
765✔
543
        }
765✔
544
    };
340✔
545

176✔
546
    auto add_subscription_for_new_object = [&](SharedRealm realm, std::string str_field,
374✔
547
                                               int64_t int_field) -> sync::SubscriptionSet {
302✔
548
        auto table = realm->read_group().get_table("class_TopLevel");
238✔
549
        auto queryable_str_field = table->get_column_key("queryable_str_field");
238✔
550
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
238✔
551
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, StringData(str_field)));
238✔
552
        auto resulting_set = sub_set.commit();
238✔
553
        add_object(realm, str_field, int_field);
238✔
554
        return resulting_set;
238✔
555
    };
238✔
556

176✔
557
    auto add_invalid_subscription = [](SharedRealm realm) -> sync::SubscriptionSet {
185✔
558
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
559
        auto queryable_str_field = table->get_column_key("non_queryable_field");
17✔
560
        auto sub_set = realm->get_latest_subscription_set().make_mutable_copy();
17✔
561
        sub_set.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
17✔
562
        auto resulting_set = sub_set.commit();
17✔
563
        return resulting_set;
17✔
564
    };
17✔
565

176✔
566
    auto count_queries_with_str = [](sync::SubscriptionSet subs, std::string_view str) {
212✔
567
        size_t count = 0;
68✔
568
        for (auto sub : subs) {
119✔
569
            if (sub.query_string.find(str) != std::string::npos) {
119✔
570
                ++count;
51✔
571
            }
51✔
572
        }
119✔
573
        return count;
68✔
574
    };
68✔
575
    create_user_and_log_in(harness.app());
374✔
576
    auto user1 = harness.app()->current_user();
374✔
577
    create_user_and_log_in(harness.app());
374✔
578
    auto user2 = harness.app()->current_user();
374✔
579
    SyncTestFile config_local(user1, harness.schema(), SyncConfig::FLXSyncEnabled{});
374✔
580
    config_local.path += ".local";
374✔
581
    SyncTestFile config_remote(user2, harness.schema(), SyncConfig::FLXSyncEnabled{});
374✔
582
    config_remote.path += ".remote";
374✔
583
    const std::string str_field_value = "foo";
374✔
584
    const int64_t local_added_int = 100;
374✔
585
    const int64_t local_added_int2 = 150;
374✔
586
    const int64_t remote_added_int = 200;
374✔
587
    size_t before_reset_count = 0;
374✔
588
    size_t after_reset_count = 0;
374✔
589
    config_local.sync_config->notify_before_client_reset = [&before_reset_count](SharedRealm) {
302✔
590
        ++before_reset_count;
238✔
591
    };
238✔
592
    config_local.sync_config->notify_after_client_reset = [&after_reset_count](SharedRealm, ThreadSafeReference,
374✔
593
                                                                               bool) {
176✔
594
        ++after_reset_count;
×
595
    };
×
596

176✔
597
    SECTION("Recover: offline writes and subscription (single subscription)") {
374✔
598
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
599
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
600
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
601
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
602
        test_reset
17✔
603
            ->populate_initial_object([&](SharedRealm realm) {
17✔
604
                auto pk_of_added_object = ObjectId::gen();
17✔
605
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
606
                auto table = realm->read_group().get_table(ObjectStore::table_name_for_object_type("TopLevel"));
17✔
607
                REALM_ASSERT(table);
17✔
608
                mut_subs.insert_or_assign(Query(table));
17✔
609
                mut_subs.commit();
17✔
610

8✔
611
                realm->begin_transaction();
17✔
612
                CppContext c(realm);
17✔
613
                int64_t r1 = random_int();
17✔
614
                int64_t r2 = random_int();
17✔
615
                int64_t r3 = random_int();
17✔
616
                int64_t sum = uint64_t(r1) + r2 + r3;
17✔
617

8✔
618
                Object::create(c, realm, "TopLevel",
17✔
619
                               std::any(AnyDict{{"_id"s, pk_of_added_object},
17✔
620
                                                {"queryable_str_field"s, "initial value"s},
17✔
621
                                                {"list_of_ints_field", std::vector<std::any>{r1, r2, r3}},
17✔
622
                                                {"sum_of_list_field", sum}}));
17✔
623

8✔
624
                realm->commit_transaction();
17✔
625
                wait_for_upload(*realm);
17✔
626
                return pk_of_added_object;
17✔
627
            })
17✔
628
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
629
                add_object(local_realm, str_field_value, local_added_int);
17✔
630
            })
17✔
631
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
632
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
633
                sync::SubscriptionSet::State actual =
17✔
634
                    remote_realm->get_latest_subscription_set()
17✔
635
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
636
                        .get();
17✔
637
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
17!
638
            })
17✔
639
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
17✔
640
                wait_for_advance(*local_realm);
17✔
641
                ClientResyncMode mode = client_reset_future.get();
17✔
642
                REQUIRE(mode == ClientResyncMode::Recover);
17!
643
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
644
                auto str_col = table->get_column_key("queryable_str_field");
17✔
645
                auto int_col = table->get_column_key("queryable_int_field");
17✔
646
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
17✔
647
                tv.sort(int_col);
17✔
648
                // the object we created while offline was recovered, and the remote object was downloaded
8✔
649
                REQUIRE(tv.size() == 2);
17!
650
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
17!
651
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
17!
652
            })
17✔
653
            ->run();
17✔
654
    }
17✔
655

176✔
656
    SECTION("Recover: subscription and offline writes after client reset failure") {
374✔
657
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
658
        auto&& [error_future, error_handler] = make_error_handler();
17✔
659
        config_local.sync_config->error_handler = error_handler;
17✔
660

8✔
661
        std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(config_local.path);
17✔
662
        // create a non-empty directory that we'll fail to delete
8✔
663
        util::make_dir(fresh_path);
17✔
664
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
17✔
665

8✔
666
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
667
        test_reset
17✔
668
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
669
                auto mut_sub = local_realm->get_latest_subscription_set().make_mutable_copy();
17✔
670
                auto table = local_realm->read_group().get_table("class_TopLevel2");
17✔
671
                mut_sub.insert_or_assign(Query(table));
17✔
672
                mut_sub.commit();
17✔
673

8✔
674
                CppContext c(local_realm);
17✔
675
                local_realm->begin_transaction();
17✔
676
                Object::create(c, local_realm, "TopLevel2",
17✔
677
                               std::any(AnyDict{{"_id"s, ObjectId::gen()}, {"queryable_str_field"s, "foo"s}}));
17✔
678
                local_realm->commit_transaction();
17✔
679
            })
17✔
680
            ->on_post_reset([](SharedRealm local_realm) {
17✔
681
                // Verify offline subscription was not removed.
8✔
682
                auto subs = local_realm->get_latest_subscription_set();
17✔
683
                auto table = local_realm->read_group().get_table("class_TopLevel2");
17✔
684
                REQUIRE(subs.find(Query(table)));
17!
685
            })
17✔
686
            ->run();
17✔
687

8✔
688
        // Remove the folder preventing the completion of a client reset.
8✔
689
        util::try_remove_dir_recursive(fresh_path);
17✔
690

8✔
691
        RealmConfig config_copy = config_local;
17✔
692
        config_copy.sync_config = std::make_shared<SyncConfig>(*config_copy.sync_config);
17✔
693
        config_copy.sync_config->error_handler = nullptr;
17✔
694
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
695
        config_copy.sync_config->notify_after_client_reset = reset_handler;
17✔
696

8✔
697
        // Attempt to open the realm again.
8✔
698
        // This time the client reset succeeds and the offline subscription and writes are recovered.
8✔
699
        auto realm = Realm::get_shared_realm(config_copy);
17✔
700
        ClientResyncMode mode = reset_future.get();
17✔
701
        REQUIRE(mode == ClientResyncMode::Recover);
17!
702

8✔
703
        auto table = realm->read_group().get_table("class_TopLevel2");
17✔
704
        auto str_col = table->get_column_key("queryable_str_field");
17✔
705
        REQUIRE(table->size() == 1);
17!
706
        REQUIRE(table->get_object(0).get<String>(str_col) == "foo");
17!
707
    }
17✔
708

176✔
709
    SECTION("Recover: offline writes and subscriptions (multiple subscriptions)") {
374✔
710
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
711
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
712
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
713
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
714
        test_reset
17✔
715
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
716
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
17✔
717
            })
17✔
718
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
719
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
720
                sync::SubscriptionSet::State actual =
17✔
721
                    remote_realm->get_latest_subscription_set()
17✔
722
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
723
                        .get();
17✔
724
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
17!
725
            })
17✔
726
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
17✔
727
                ClientResyncMode mode = client_reset_future.get();
17✔
728
                REQUIRE(mode == ClientResyncMode::Recover);
17!
729
                auto subs = local_realm->get_latest_subscription_set();
17✔
730
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
731
                // make sure that the subscription for "foo" survived the reset
8✔
732
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
17✔
733
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
17!
734
                REQUIRE(count_of_foo == 1);
17!
735
                local_realm->refresh();
17✔
736
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
737
                auto str_col = table->get_column_key("queryable_str_field");
17✔
738
                auto int_col = table->get_column_key("queryable_int_field");
17✔
739
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
17✔
740
                tv.sort(int_col);
17✔
741
                // the object we created while offline was recovered, and the remote object was downloaded
8✔
742
                REQUIRE(tv.size() == 2);
17!
743
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
17!
744
                CHECK(tv.get_object(1).get<Int>(int_col) == remote_added_int);
17!
745
            })
17✔
746
            ->run();
17✔
747
    }
17✔
748

176✔
749
    SECTION("Recover: offline writes interleaved with subscriptions and empty writes") {
374✔
750
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
751
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
752
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
753
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
754
        test_reset
17✔
755
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
756
                // The sequence of events bellow generates five changesets:
8✔
757
                //  1. create sub1 => empty changeset
8✔
758
                //  2. create local_added_int object
8✔
759
                //  3. create empty changeset
8✔
760
                //  4. create sub2 => empty changeset
8✔
761
                //  5. create local_added_int2 object
8✔
762
                //
8✔
763
                // Before sending 'sub2' to the server, an UPLOAD message is sent first.
8✔
764
                // The upload message contains changeset 2. (local_added_int) with the cursor
8✔
765
                // of changeset 3. (empty changeset).
8✔
766

8✔
767
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
17✔
768
                // Commit empty changeset.
8✔
769
                local_realm->begin_transaction();
17✔
770
                local_realm->commit_transaction();
17✔
771
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int2);
17✔
772
            })
17✔
773
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
774
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
775
                sync::SubscriptionSet::State actual =
17✔
776
                    remote_realm->get_latest_subscription_set()
17✔
777
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
778
                        .get();
17✔
779
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
17!
780
            })
17✔
781
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
17✔
782
                ClientResyncMode mode = client_reset_future.get();
17✔
783
                REQUIRE(mode == ClientResyncMode::Recover);
17!
784
                auto subs = local_realm->get_latest_subscription_set();
17✔
785
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
786
                // make sure that the subscription for "foo" survived the reset
8✔
787
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
17✔
788
                subs.refresh();
17✔
789
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
17!
790
                REQUIRE(count_of_foo == 1);
17!
791
                local_realm->refresh();
17✔
792
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
793
                auto str_col = table->get_column_key("queryable_str_field");
17✔
794
                auto int_col = table->get_column_key("queryable_int_field");
17✔
795
                auto tv = table->where().equal(str_col, StringData(str_field_value)).find_all();
17✔
796
                tv.sort(int_col);
17✔
797
                // the objects we created while offline was recovered, and the remote object was downloaded
8✔
798
                REQUIRE(tv.size() == 3);
17!
799
                CHECK(tv.get_object(0).get<Int>(int_col) == local_added_int);
17!
800
                CHECK(tv.get_object(1).get<Int>(int_col) == local_added_int2);
17!
801
                CHECK(tv.get_object(2).get<Int>(int_col) == remote_added_int);
17!
802
            })
17✔
803
            ->run();
17✔
804
    }
17✔
805

176✔
806
    auto validate_integrity_of_arrays = [](TableRef table) -> size_t {
248✔
807
        auto sum_col = table->get_column_key("sum_of_list_field");
136✔
808
        auto array_col = table->get_column_key("list_of_ints_field");
136✔
809
        auto query = table->column<Lst<Int>>(array_col).sum() == table->column<Int>(sum_col) &&
136✔
810
                     table->column<Lst<Int>>(array_col).size() > 0;
136✔
811
        return query.count();
136✔
812
    };
136✔
813

176✔
814
    SECTION("Recover: offline writes with associated subscriptions in the correct order") {
374✔
815
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
816
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
817
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
818
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
819
        constexpr size_t num_objects_added = 20;
17✔
820
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
17✔
821
        constexpr size_t num_objects_added_by_remote = 1;  // make_remote_changes()
17✔
822
        test_reset
17✔
823
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
824
                subscribe_to_and_add_objects(local_realm, num_objects_added);
17✔
825
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
826
                REQUIRE(table->size() == num_objects_added + num_objects_added_by_harness);
17!
827
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
17✔
828
                REQUIRE(count_of_valid_array_data == num_objects_added);
17!
829
            })
17✔
830
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
831
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
832
                sync::SubscriptionSet::State actual =
17✔
833
                    remote_realm->get_latest_subscription_set()
17✔
834
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
835
                        .get();
17✔
836
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
17!
837
            })
17✔
838
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
17✔
839
                ClientResyncMode mode = client_reset_future.get();
17✔
840
                REQUIRE(mode == ClientResyncMode::Recover);
17!
841
                local_realm->refresh();
17✔
842
                auto latest_subs = local_realm->get_latest_subscription_set();
17✔
843
                auto state = latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
844
                REQUIRE(state == sync::SubscriptionSet::State::Complete);
17!
845
                local_realm->refresh();
17✔
846
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
847
                if (table->size() != 1) {
17✔
848
                    table->to_json(std::cout);
×
849
                }
×
850
                REQUIRE(table->size() == 1);
17!
851
                auto mut_sub = latest_subs.make_mutable_copy();
17✔
852
                mut_sub.clear();
17✔
853
                mut_sub.insert_or_assign(Query(table));
17✔
854
                latest_subs = mut_sub.commit();
17✔
855
                latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
856
                local_realm->refresh();
17✔
857
                REQUIRE(table->size() ==
17!
858
                        num_objects_added + num_objects_added_by_harness + num_objects_added_by_remote);
17✔
859
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
17✔
860
                REQUIRE(count_of_valid_array_data == num_objects_added + num_objects_added_by_remote);
17!
861
            })
17✔
862
            ->run();
17✔
863
    }
17✔
864

176✔
865
    SECTION("Recover: incompatible property changes are rejected") {
374✔
866
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
867
        auto&& [error_future, err_handler] = make_error_handler();
17✔
868
        config_local.sync_config->error_handler = err_handler;
17✔
869
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
870
        constexpr size_t num_objects_added_before = 2;
17✔
871
        constexpr size_t num_objects_added_after = 2;
17✔
872
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
17✔
873
        constexpr std::string_view added_property_name = "new_property";
17✔
874
        test_reset
17✔
875
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
876
                subscribe_to_and_add_objects(local_realm, num_objects_added_before);
17✔
877
                Schema local_update = schema;
17✔
878
                Schema::iterator it = local_update.find("TopLevel");
17✔
879
                REQUIRE(it != local_update.end());
17!
880
                it->persisted_properties.push_back(
17✔
881
                    {std::string(added_property_name), PropertyType::Float | PropertyType::Nullable});
17✔
882
                local_realm->update_schema(local_update);
17✔
883
                subscribe_to_and_add_objects(local_realm, num_objects_added_after);
17✔
884
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
885
                REQUIRE(table->size() ==
17!
886
                        num_objects_added_before + num_objects_added_after + num_objects_added_by_harness);
17✔
887
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
17✔
888
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
17!
889
            })
17✔
890
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
891
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
892
                Schema remote_update = schema;
17✔
893
                Schema::iterator it = remote_update.find("TopLevel");
17✔
894
                REQUIRE(it != remote_update.end());
17!
895
                it->persisted_properties.push_back(
17✔
896
                    {std::string(added_property_name), PropertyType::UUID | PropertyType::Nullable});
17✔
897
                remote_realm->update_schema(remote_update);
17✔
898
                sync::SubscriptionSet::State actual =
17✔
899
                    remote_realm->get_latest_subscription_set()
17✔
900
                        .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
901
                        .get();
17✔
902
                REQUIRE(actual == sync::SubscriptionSet::State::Complete);
17!
903
            })
17✔
904
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm local_realm) mutable {
17✔
905
                auto sync_error = wait_for_future(std::move(err_future)).get();
17✔
906
                REQUIRE(before_reset_count == 1);
17!
907
                REQUIRE(after_reset_count == 0);
17!
908
                REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
17!
909
                REQUIRE(sync_error.is_client_reset_requested());
17!
910
                local_realm->refresh();
17✔
911
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
912
                // since schema validation happens in the first recovery commit, that whole commit is rolled back
8✔
913
                // and the final state here is "pre reset"
8✔
914
                REQUIRE(table->size() ==
17!
915
                        num_objects_added_before + num_objects_added_by_harness + num_objects_added_after);
17✔
916
                size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
17✔
917
                REQUIRE(count_of_valid_array_data == num_objects_added_before + num_objects_added_after);
17!
918
            })
17✔
919
            ->run();
17✔
920
    }
17✔
921

176✔
922
    SECTION("unsuccessful replay of local changes") {
374✔
923
        constexpr size_t num_objects_added_before = 2;
34✔
924
        constexpr size_t num_objects_added_after = 2;
34✔
925
        constexpr size_t num_objects_added_by_harness = 1; // BaasFLXClientReset.run()
34✔
926
        constexpr std::string_view added_property_name = "new_property";
34✔
927
        auto&& [error_future, err_handler] = make_error_handler();
34✔
928
        config_local.sync_config->error_handler = err_handler;
34✔
929

16✔
930
        // The local changes here are a bit contrived because removing a column is disallowed
16✔
931
        // at the object store layer for sync'd Realms. The only reason a recovery should fail in production
16✔
932
        // during the apply stage is due to programmer error or external factors such as out of disk space.
16✔
933
        // Any schema discrepancies are caught by the initial diff, so the way to make a recovery fail here is
16✔
934
        // to add and remove a column at the core level such that the schema diff passes, but instructions are
16✔
935
        // generated which will fail when applied.
16✔
936
        auto make_local_changes_that_will_fail = [&](SharedRealm local_realm) {
34✔
937
            subscribe_to_and_add_objects(local_realm, num_objects_added_before);
34✔
938
            auto table = local_realm->read_group().get_table("class_TopLevel");
34✔
939
            REQUIRE(table->size() == num_objects_added_before + num_objects_added_by_harness);
34!
940
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
34✔
941
            REQUIRE(count_of_valid_array_data == num_objects_added_before);
34!
942
            local_realm->begin_transaction();
34✔
943
            ColKey added = table->add_column(type_Int, added_property_name);
34✔
944
            table->remove_column(added);
34✔
945
            local_realm->commit_transaction();
34✔
946
            subscribe_to_and_add_objects(local_realm, num_objects_added_after); // these are lost!
34✔
947
        };
34✔
948

16✔
949
        VersionID expected_version;
34✔
950

16✔
951
        auto store_pre_reset_state = [&](SharedRealm local_realm) {
34✔
952
            expected_version = local_realm->read_transaction_version();
34✔
953
        };
34✔
954

16✔
955
        auto verify_post_reset_state = [&, err_future = std::move(error_future)](SharedRealm local_realm) {
34✔
956
            auto sync_error = err_future.get();
34✔
957
            REQUIRE(before_reset_count == 1);
34!
958
            REQUIRE(after_reset_count == 0);
34!
959
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
34!
960
            REQUIRE(sync_error.is_client_reset_requested());
34!
961

16✔
962
            // All changes should have been rolled back when recovery hit remove_column(),
16✔
963
            // leaving the Realm in the pre-reset state
16✔
964
            local_realm->refresh();
34✔
965
            auto table = local_realm->read_group().get_table("class_TopLevel");
34✔
966
            ColKey added = table->get_column_key(added_property_name);
34✔
967
            REQUIRE(!added);
34!
968
            const size_t expected_added_objects = num_objects_added_before + num_objects_added_after;
34✔
969
            REQUIRE(table->size() == num_objects_added_by_harness + expected_added_objects);
34!
970
            size_t count_of_valid_array_data = validate_integrity_of_arrays(table);
34✔
971
            REQUIRE(count_of_valid_array_data == expected_added_objects);
34!
972

16✔
973
            // The attempted client reset should have been recorded so that we
16✔
974
            // don't attempt it again
16✔
975
            REQUIRE(local_realm->read_transaction_version().version == expected_version.version + 1);
34!
976
        };
34✔
977

16✔
978
        SECTION("Recover: unsuccessful recovery leads to a manual reset") {
34✔
979
            config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
980
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
981
            test_reset->make_local_changes(make_local_changes_that_will_fail)
17✔
982
                ->on_post_local_changes(store_pre_reset_state)
17✔
983
                ->on_post_reset(std::move(verify_post_reset_state))
17✔
984
                ->run();
17✔
985
            RealmConfig config_copy = config_local;
17✔
986
            auto&& [error_future2, err_handler2] = make_error_handler();
17✔
987
            config_copy.sync_config->error_handler = err_handler2;
17✔
988
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
17✔
989
            auto sync_error = wait_for_future(std::move(error_future2)).get();
17✔
990
            REQUIRE(before_reset_count == 2);
17!
991
            REQUIRE(after_reset_count == 0);
17!
992
            REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
17!
993
            REQUIRE(sync_error.is_client_reset_requested());
17!
994
        }
17✔
995

16✔
996
        SECTION("RecoverOrDiscard: unsuccessful reapply leads to discard") {
34✔
997
            config_local.sync_config->client_resync_mode = ClientResyncMode::RecoverOrDiscard;
17✔
998
            auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
999
            test_reset->make_local_changes(make_local_changes_that_will_fail)
17✔
1000
                ->on_post_local_changes(store_pre_reset_state)
17✔
1001
                ->on_post_reset(std::move(verify_post_reset_state))
17✔
1002
                ->run();
17✔
1003

8✔
1004
            RealmConfig config_copy = config_local;
17✔
1005
            auto&& [client_reset_future, reset_handler] = make_client_reset_handler();
17✔
1006
            config_copy.sync_config->error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
8✔
1007
                REALM_ASSERT_EX(!err.is_fatal, err.status);
×
1008
                CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1009
            };
×
1010
            config_copy.sync_config->notify_after_client_reset = reset_handler;
17✔
1011
            auto realm_post_reset = Realm::get_shared_realm(config_copy);
17✔
1012
            ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
17✔
1013
            REQUIRE(mode == ClientResyncMode::DiscardLocal);
17!
1014
            realm_post_reset->refresh();
17✔
1015
            auto table = realm_post_reset->read_group().get_table("class_TopLevel");
17✔
1016
            ColKey added = table->get_column_key(added_property_name);
17✔
1017
            REQUIRE(!added);                                        // reverted local changes
17!
1018
            REQUIRE(table->size() == num_objects_added_by_harness); // discarded all offline local changes
17!
1019
        }
17✔
1020
    }
34✔
1021

176✔
1022
    SECTION("DiscardLocal: offline writes and subscriptions are lost") {
374✔
1023
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1024
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
1025
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
1026
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
1027
        test_reset
17✔
1028
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
1029
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
17✔
1030
            })
17✔
1031
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
1032
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
1033
            })
17✔
1034
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) mutable {
17✔
1035
                ClientResyncMode mode = wait_for_future(std::move(client_reset_future)).get();
17✔
1036
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
17!
1037
                auto subs = local_realm->get_latest_subscription_set();
17✔
1038
                wait_for_future(subs.get_state_change_notification(sync::SubscriptionSet::State::Complete)).get();
17✔
1039
                local_realm->refresh();
17✔
1040
                auto table = local_realm->read_group().get_table("class_TopLevel");
17✔
1041
                auto queryable_str_field = table->get_column_key("queryable_str_field");
17✔
1042
                auto queryable_int_field = table->get_column_key("queryable_int_field");
17✔
1043
                auto tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
17✔
1044
                // the object we created while offline was discarded, and the remote object was not downloaded
8✔
1045
                REQUIRE(tv.size() == 0);
17!
1046
                size_t count_of_foo = count_queries_with_str(subs, util::format("\"%1\"", str_field_value));
17✔
1047
                // make sure that the subscription for "foo" did not survive the reset
8✔
1048
                REQUIRE(count_of_foo == 0);
17!
1049
                REQUIRE(subs.state() == sync::SubscriptionSet::State::Complete);
17!
1050

8✔
1051
                // adding data and subscriptions to a reset Realm works as normal
8✔
1052
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
17✔
1053
                auto latest_subs = local_realm->get_latest_subscription_set();
17✔
1054
                REQUIRE(latest_subs.version() > subs.version());
17!
1055
                wait_for_future(latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete))
17✔
1056
                    .get();
17✔
1057
                local_realm->refresh();
17✔
1058
                count_of_foo = count_queries_with_str(latest_subs, util::format("\"%1\"", str_field_value));
17✔
1059
                REQUIRE(count_of_foo == 1);
17!
1060
                tv = table->where().equal(queryable_str_field, StringData(str_field_value)).find_all();
17✔
1061
                REQUIRE(tv.size() == 2);
17!
1062
                tv.sort(queryable_int_field);
17✔
1063
                REQUIRE(tv.get_object(0).get<int64_t>(queryable_int_field) == local_added_int);
17!
1064
                REQUIRE(tv.get_object(1).get<int64_t>(queryable_int_field) == remote_added_int);
17!
1065
            })
17✔
1066
            ->run();
17✔
1067
    }
17✔
1068

176✔
1069
    SECTION("DiscardLocal: an invalid subscription made while offline becomes superseded") {
374✔
1070
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1071
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
1072
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
1073
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
1074
        std::unique_ptr<sync::SubscriptionSet> invalid_sub;
17✔
1075
        test_reset
17✔
1076
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
1077
                invalid_sub = std::make_unique<sync::SubscriptionSet>(add_invalid_subscription(local_realm));
17✔
1078
                add_subscription_for_new_object(local_realm, str_field_value, local_added_int);
17✔
1079
            })
17✔
1080
            ->make_remote_changes([&](SharedRealm remote_realm) {
17✔
1081
                add_subscription_for_new_object(remote_realm, str_field_value, remote_added_int);
17✔
1082
            })
17✔
1083
            ->on_post_reset([&, client_reset_future = std::move(reset_future)](SharedRealm local_realm) {
17✔
1084
                local_realm->refresh();
17✔
1085
                sync::SubscriptionSet::State actual =
17✔
1086
                    invalid_sub->get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
1087
                REQUIRE(actual == sync::SubscriptionSet::State::Superseded);
17!
1088
                ClientResyncMode mode = client_reset_future.get();
17✔
1089
                REQUIRE(mode == ClientResyncMode::DiscardLocal);
17!
1090
            })
17✔
1091
            ->run();
17✔
1092
    }
17✔
1093

176✔
1094
    SECTION("DiscardLocal: an error is produced if a previously successful query becomes invalid due to "
374✔
1095
            "server changes across a reset") {
185✔
1096
        // Disable dev mode so non-queryable fields are not automatically added as queryable
8✔
1097
        const AppSession& app_session = harness.session().app_session();
17✔
1098
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
17✔
1099
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1100
        auto&& [error_future, err_handler] = make_error_handler();
17✔
1101
        config_local.sync_config->error_handler = err_handler;
17✔
1102
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
1103
        test_reset
17✔
1104
            ->setup([&](SharedRealm realm) {
17✔
1105
                if (realm->sync_session()->path() == config_local.path) {
17✔
1106
                    auto added_sub = add_subscription_for_new_object(realm, str_field_value, 0);
17✔
1107
                    added_sub.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
1108
                }
17✔
1109
            })
17✔
1110
            ->make_local_changes([&](SharedRealm local_realm) {
17✔
1111
                add_object(local_realm, str_field_value, local_added_int);
17✔
1112
                // Make "queryable_str_field" not a valid query field.
8✔
1113
                // Pre-reset, the Realm had a successful query on it, but now when the client comes back online
8✔
1114
                // and tries to reset, the fresh Realm download will fail with a query error.
8✔
1115
                const AppSession& app_session = harness.session().app_session();
17✔
1116
                auto baas_sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
17✔
1117
                auto baas_sync_config =
17✔
1118
                    app_session.admin_api.get_config(app_session.server_app_id, baas_sync_service);
17✔
1119
                REQUIRE(baas_sync_config.queryable_field_names->is_array());
17!
1120
                auto it = baas_sync_config.queryable_field_names->begin();
17✔
1121
                for (; it != baas_sync_config.queryable_field_names->end(); ++it) {
17✔
1122
                    if (*it == "queryable_str_field") {
17✔
1123
                        break;
17✔
1124
                    }
17✔
1125
                }
17✔
1126
                REQUIRE(it != baas_sync_config.queryable_field_names->end());
17!
1127
                baas_sync_config.queryable_field_names->erase(it);
17✔
1128
                app_session.admin_api.enable_sync(app_session.server_app_id, baas_sync_service.id, baas_sync_config);
17✔
1129
            })
17✔
1130
            ->on_post_reset([&, err_future = std::move(error_future)](SharedRealm) mutable {
17✔
1131
                auto sync_error = wait_for_future(std::move(err_future)).get();
17✔
1132
                INFO(sync_error.status);
17✔
1133
                CHECK(sync_error.status == ErrorCodes::AutoClientResetFailed);
17!
1134
            })
17✔
1135
            ->run();
17✔
1136
    }
17✔
1137

176✔
1138
    SECTION("DiscardLocal: completion callbacks fire after client reset even when there is no data to download") {
374✔
1139
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1140
        auto&& [reset_future, reset_handler] = make_client_reset_handler();
17✔
1141
        config_local.sync_config->notify_after_client_reset = reset_handler;
17✔
1142
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
1143
        test_reset
17✔
1144
            ->on_post_local_changes([&](SharedRealm realm) {
17✔
1145
                wait_for_upload(*realm);
17✔
1146
                wait_for_download(*realm);
17✔
1147
            })
17✔
1148
            ->run();
17✔
1149
    }
17✔
1150

176✔
1151
    SECTION("DiscardLocal: open realm after client reset failure") {
374✔
1152
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1153
        auto&& [error_future, error_handler] = make_error_handler();
17✔
1154
        config_local.sync_config->error_handler = error_handler;
17✔
1155

8✔
1156
        std::string fresh_path = realm::_impl::client_reset::get_fresh_path_for(config_local.path);
17✔
1157
        // create a non-empty directory that we'll fail to delete
8✔
1158
        util::make_dir(fresh_path);
17✔
1159
        util::File(util::File::resolve("file", fresh_path), util::File::mode_Write);
17✔
1160

8✔
1161
        auto test_reset = reset_utils::make_baas_flx_client_reset(config_local, config_remote, harness.session());
17✔
1162
        test_reset->run();
17✔
1163

8✔
1164
        // Client reset fails due to sync client not being able to create the fresh realm.
8✔
1165
        auto sync_error = wait_for_future(std::move(error_future)).get();
17✔
1166
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
17!
1167

8✔
1168
        // Open the realm again. This should not crash.
8✔
1169
        auto&& [err_future, err_handler] = make_error_handler();
17✔
1170
        config_local.sync_config->error_handler = std::move(err_handler);
17✔
1171

8✔
1172
        auto realm_post_reset = Realm::get_shared_realm(config_local);
17✔
1173
        sync_error = wait_for_future(std::move(err_future)).get();
17✔
1174
        REQUIRE(sync_error.status == ErrorCodes::AutoClientResetFailed);
17!
1175
    }
17✔
1176

176✔
1177
    enum class ResetMode { NoReset, InitiateClientReset };
374✔
1178
    auto seed_realm = [&harness, &subscribe_to_and_add_objects](RealmConfig config, ResetMode reset_mode) {
293✔
1179
        config.sync_config->error_handler = [path = config.path](std::shared_ptr<SyncSession>, SyncError err) {
104✔
1180
            // ignore spurious failures on this instance
1181
            util::format(std::cout, "spurious error while seeding a Realm at '%1': %2\n", path, err.status);
×
1182
        };
×
1183
        SharedRealm realm = Realm::get_shared_realm(config);
221✔
1184
        subscribe_to_and_add_objects(realm, 1);
221✔
1185
        auto subs = realm->get_latest_subscription_set();
221✔
1186
        auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
221✔
1187
        CHECK(result == sync::SubscriptionSet::State::Complete);
221!
1188
        if (reset_mode == ResetMode::InitiateClientReset) {
221✔
1189
            reset_utils::trigger_client_reset(harness.session().app_session(), realm);
153✔
1190
        }
153✔
1191
        realm->close();
221✔
1192
    };
221✔
1193

176✔
1194
    auto setup_reset_handlers_for_schema_validation =
374✔
1195
        [&before_reset_count, &after_reset_count](RealmConfig& config, Schema expected_schema) {
239✔
1196
            auto& sync_config = *config.sync_config;
119✔
1197
            sync_config.error_handler = [](std::shared_ptr<SyncSession>, SyncError err) {
56✔
1198
                FAIL(err.status);
×
1199
            };
×
1200
            sync_config.notify_before_client_reset = [&before_reset_count,
119✔
1201
                                                      expected = expected_schema](SharedRealm frozen_before) {
119✔
1202
                ++before_reset_count;
119✔
1203
                REQUIRE(frozen_before->schema().size() > 0);
119!
1204
                REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
119!
1205
                REQUIRE(frozen_before->schema() == expected);
119!
1206
            };
119✔
1207

56✔
1208
            auto [promise, future] = util::make_promise_future<void>();
119✔
1209
            sync_config.notify_after_client_reset =
119✔
1210
                [&after_reset_count, promise = util::CopyablePromiseHolder<void>(std::move(promise)), expected_schema,
119✔
1211
                 reset_mode = config.sync_config->client_resync_mode, has_schema = config.schema.has_value()](
119✔
1212
                    SharedRealm frozen_before, ThreadSafeReference after_ref, bool did_recover) mutable {
119✔
1213
                    ++after_reset_count;
119✔
1214
                    REQUIRE(frozen_before->schema().size() > 0);
119!
1215
                    REQUIRE(frozen_before->schema_version() != ObjectStore::NotVersioned);
119!
1216
                    REQUIRE(frozen_before->schema() == expected_schema);
119!
1217
                    SharedRealm after = Realm::get_shared_realm(std::move(after_ref), util::Scheduler::make_dummy());
119✔
1218
                    if (!has_schema) {
119✔
1219
                        after->set_schema_subset(expected_schema);
34✔
1220
                    }
34✔
1221
                    REQUIRE(after);
119!
1222
                    REQUIRE(after->schema() == expected_schema);
119!
1223
                    // the above check is sufficient unless operator==() is changed to not care about ordering
56✔
1224
                    // so future proof that by explicitly checking the order of properties here as well
56✔
1225
                    REQUIRE(after->schema().size() == frozen_before->schema().size());
119!
1226
                    auto after_it = after->schema().find("TopLevel");
119✔
1227
                    auto before_it = frozen_before->schema().find("TopLevel");
119✔
1228
                    REQUIRE(after_it != after->schema().end());
119!
1229
                    REQUIRE(before_it != frozen_before->schema().end());
119!
1230
                    REQUIRE(after_it->name == before_it->name);
119!
1231
                    REQUIRE(after_it->persisted_properties.size() == before_it->persisted_properties.size());
119!
1232
                    REQUIRE(after_it->persisted_properties[1].name == "queryable_int_field");
119!
1233
                    REQUIRE(after_it->persisted_properties[2].name == "queryable_str_field");
119!
1234
                    REQUIRE(before_it->persisted_properties[1].name == "queryable_int_field");
119!
1235
                    REQUIRE(before_it->persisted_properties[2].name == "queryable_str_field");
119!
1236
                    REQUIRE(did_recover == (reset_mode == ClientResyncMode::Recover));
119!
1237
                    promise.get_promise().emplace_value();
119✔
1238
                };
119✔
1239
            return std::move(future); // move is not redundant here because of how destructing works
119✔
1240
        };
119✔
1241

176✔
1242
    SECTION("Recover: schema indexes match in before and after states") {
374✔
1243
        seed_realm(config_local, ResetMode::InitiateClientReset);
17✔
1244
        // reorder a property such that it does not match the on disk property order
8✔
1245
        std::vector<ObjectSchema> local_schema = schema;
17✔
1246
        std::swap(local_schema[0].persisted_properties[1], local_schema[0].persisted_properties[2]);
17✔
1247
        local_schema[0].persisted_properties.push_back(
17✔
1248
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
17✔
1249
        config_local.schema = local_schema;
17✔
1250
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
1251
        auto future = setup_reset_handlers_for_schema_validation(config_local, local_schema);
17✔
1252
        SharedRealm realm = Realm::get_shared_realm(config_local);
17✔
1253
        future.get();
17✔
1254
        CHECK(before_reset_count == 1);
17!
1255
        CHECK(after_reset_count == 1);
17!
1256
    }
17✔
1257

176✔
1258
    SECTION("Adding a local property matching a server addition is allowed") {
374✔
1259
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
34✔
1260
        config_local.sync_config->client_resync_mode = mode;
34✔
1261
        seed_realm(config_local, ResetMode::InitiateClientReset);
34✔
1262
        std::vector<ObjectSchema> changed_schema = schema;
34✔
1263
        changed_schema[0].persisted_properties.push_back(
34✔
1264
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
34✔
1265
        // In a separate Realm, make the property addition.
16✔
1266
        // Since this is dev mode, it will be added to the server's schema.
16✔
1267
        config_remote.schema = changed_schema;
34✔
1268
        seed_realm(config_remote, ResetMode::NoReset);
34✔
1269
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
34✔
1270
        config_local.schema = changed_schema;
34✔
1271
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
34✔
1272

16✔
1273
        async_open_realm(config_local,
34✔
1274
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
34✔
1275
                             REQUIRE(ref);
34!
1276
                             REQUIRE_FALSE(error);
34!
1277
                             auto realm = Realm::get_shared_realm(std::move(ref));
34✔
1278
                             fut.get();
34✔
1279
                             CHECK(before_reset_count == 1);
34!
1280
                             CHECK(after_reset_count == 1);
34!
1281
                         });
34✔
1282
    }
34✔
1283

176✔
1284
    SECTION("Adding a local property matching a server addition inside the before reset callback is allowed") {
374✔
1285
        auto mode = GENERATE(ClientResyncMode::DiscardLocal, ClientResyncMode::Recover);
34✔
1286
        config_local.sync_config->client_resync_mode = mode;
34✔
1287
        seed_realm(config_local, ResetMode::InitiateClientReset);
34✔
1288
        std::vector<ObjectSchema> changed_schema = schema;
34✔
1289
        changed_schema[0].persisted_properties.push_back(
34✔
1290
            {"queryable_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
34✔
1291
        // In a separate Realm, make the property addition.
16✔
1292
        // Since this is dev mode, it will be added to the server's schema.
16✔
1293
        config_remote.schema = changed_schema;
34✔
1294
        seed_realm(config_remote, ResetMode::NoReset);
34✔
1295
        std::swap(changed_schema[0].persisted_properties[1], changed_schema[0].persisted_properties[2]);
34✔
1296
        config_local.schema.reset();
34✔
1297
        config_local.sync_config->freeze_before_reset_realm = false;
34✔
1298
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
34✔
1299

16✔
1300
        auto notify_before = std::move(config_local.sync_config->notify_before_client_reset);
34✔
1301
        config_local.sync_config->notify_before_client_reset = [=](std::shared_ptr<Realm> realm) {
34✔
1302
            realm->update_schema(changed_schema);
34✔
1303
            notify_before(realm);
34✔
1304
        };
34✔
1305

16✔
1306
        auto notify_after = std::move(config_local.sync_config->notify_after_client_reset);
34✔
1307
        config_local.sync_config->notify_after_client_reset = [=](std::shared_ptr<Realm> before,
34✔
1308
                                                                  ThreadSafeReference after, bool did_recover) {
34✔
1309
            before->set_schema_subset(changed_schema);
34✔
1310
            notify_after(before, std::move(after), did_recover);
34✔
1311
        };
34✔
1312

16✔
1313
        async_open_realm(config_local,
34✔
1314
                         [&, fut = std::move(future)](ThreadSafeReference&& ref, std::exception_ptr error) {
34✔
1315
                             REQUIRE(ref);
34!
1316
                             REQUIRE_FALSE(error);
34!
1317
                             auto realm = Realm::get_shared_realm(std::move(ref));
34✔
1318
                             fut.get();
34✔
1319
                             CHECK(before_reset_count == 1);
34!
1320
                             CHECK(after_reset_count == 1);
34!
1321
                         });
34✔
1322
    }
34✔
1323

176✔
1324
    auto make_additive_changes = [](std::vector<ObjectSchema> schema) {
203✔
1325
        schema[0].persisted_properties.push_back(
51✔
1326
            {"added_oid_field", PropertyType::ObjectId | PropertyType::Nullable});
51✔
1327
        std::swap(schema[0].persisted_properties[1], schema[0].persisted_properties[2]);
51✔
1328
        schema.push_back({"AddedClass",
51✔
1329
                          {
51✔
1330
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
51✔
1331
                              {"str_field", PropertyType::String | PropertyType::Nullable},
51✔
1332
                          }});
51✔
1333
        return schema;
51✔
1334
    };
51✔
1335
    SECTION("Recover: additive schema changes are recovered in dev mode") {
374✔
1336
        const AppSession& app_session = harness.session().app_session();
17✔
1337
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
17✔
1338
        seed_realm(config_local, ResetMode::InitiateClientReset);
17✔
1339
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
17✔
1340
        config_local.schema = changed_schema;
17✔
1341
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
1342
        ThreadSafeReference ref_async;
17✔
1343
        auto future = setup_reset_handlers_for_schema_validation(config_local, changed_schema);
17✔
1344
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
17✔
1345
            REQUIRE(ref);
17!
1346
            REQUIRE_FALSE(error);
17!
1347
            ref_async = std::move(ref);
17✔
1348
        });
17✔
1349
        future.get();
17✔
1350
        CHECK(before_reset_count == 1);
17!
1351
        CHECK(after_reset_count == 1);
17!
1352
        {
17✔
1353
            auto realm = Realm::get_shared_realm(std::move(ref_async));
17✔
1354
            // make changes to the newly added property
8✔
1355
            realm->begin_transaction();
17✔
1356
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
1357
            ColKey new_col = table->get_column_key("added_oid_field");
17✔
1358
            REQUIRE(new_col);
17!
1359
            for (auto it = table->begin(); it != table->end(); ++it) {
34✔
1360
                it->set(new_col, ObjectId::gen());
17✔
1361
            }
17✔
1362
            realm->commit_transaction();
17✔
1363
            // subscribe to the new Class and add an object
8✔
1364
            auto new_table = realm->read_group().get_table("class_AddedClass");
17✔
1365
            auto sub_set = realm->get_latest_subscription_set();
17✔
1366
            auto mut_sub = sub_set.make_mutable_copy();
17✔
1367
            mut_sub.insert_or_assign(Query(new_table));
17✔
1368
            mut_sub.commit();
17✔
1369
            realm->begin_transaction();
17✔
1370
            REQUIRE(new_table);
17!
1371
            new_table->create_object_with_primary_key(ObjectId::gen());
17✔
1372
            realm->commit_transaction();
17✔
1373
            auto result = realm->get_latest_subscription_set()
17✔
1374
                              .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
1375
                              .get();
17✔
1376
            CHECK(result == sync::SubscriptionSet::State::Complete);
17!
1377
            wait_for_advance(*realm);
17✔
1378
            realm->close();
17✔
1379
        }
17✔
1380
        {
17✔
1381
            // ensure that an additional schema change after the successful reset is also accepted by the server
8✔
1382
            changed_schema[0].persisted_properties.push_back(
17✔
1383
                {"added_oid_field_second", PropertyType::ObjectId | PropertyType::Nullable});
17✔
1384
            changed_schema.push_back({"AddedClassSecond",
17✔
1385
                                      {
17✔
1386
                                          {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
1387
                                          {"str_field_2", PropertyType::String | PropertyType::Nullable},
17✔
1388
                                      }});
17✔
1389
            config_local.schema = changed_schema;
17✔
1390
            async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
17✔
1391
                REQUIRE(ref);
17!
1392
                REQUIRE_FALSE(error);
17!
1393
                auto realm = Realm::get_shared_realm(std::move(ref));
17✔
1394
                auto table = realm->read_group().get_table("class_AddedClassSecond");
17✔
1395
                ColKey new_col = table->get_column_key("str_field_2");
17✔
1396
                REQUIRE(new_col);
17!
1397
                auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
1398
                new_subs.insert_or_assign(Query(table).equal(new_col, "hello"));
17✔
1399
                auto subs = new_subs.commit();
17✔
1400
                realm->begin_transaction();
17✔
1401
                table->create_object_with_primary_key(Mixed{ObjectId::gen()}, {{new_col, "hello"}});
17✔
1402
                realm->commit_transaction();
17✔
1403
                subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
1404
                wait_for_advance(*realm);
17✔
1405
                REQUIRE(table->size() == 1);
17!
1406
            });
17✔
1407
        }
17✔
1408
    }
17✔
1409

176✔
1410
    SECTION("DiscardLocal: additive schema changes not allowed") {
374✔
1411
        seed_realm(config_local, ResetMode::InitiateClientReset);
17✔
1412
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
17✔
1413
        config_local.schema = changed_schema;
17✔
1414
        config_local.sync_config->client_resync_mode = ClientResyncMode::DiscardLocal;
17✔
1415
        auto&& [error_future, err_handler] = make_error_handler();
17✔
1416
        config_local.sync_config->error_handler = err_handler;
17✔
1417
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
17✔
1418
            REQUIRE(!ref);
17!
1419
            REQUIRE(error);
17!
1420
            REQUIRE_THROWS_CONTAINING(std::rethrow_exception(error),
17✔
1421
                                      "A fatal error occurred during client reset: 'Client reset cannot recover when "
17✔
1422
                                      "classes have been removed: {AddedClass}'");
17✔
1423
        });
17✔
1424
        error_future.get();
17✔
1425
        CHECK(before_reset_count == 1);
17!
1426
        CHECK(after_reset_count == 0);
17!
1427
    }
17✔
1428

176✔
1429
    SECTION("Recover: incompatible schema changes on async open are an error") {
374✔
1430
        seed_realm(config_local, ResetMode::InitiateClientReset);
17✔
1431
        std::vector<ObjectSchema> changed_schema = schema;
17✔
1432
        changed_schema[0].persisted_properties[0].type = PropertyType::UUID; // incompatible type change
17✔
1433
        config_local.schema = changed_schema;
17✔
1434
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
1435
        auto&& [error_future, err_handler] = make_error_handler();
17✔
1436
        config_local.sync_config->error_handler = err_handler;
17✔
1437
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
17✔
1438
            REQUIRE(!ref);
17!
1439
            REQUIRE(error);
17!
1440
            REQUIRE_THROWS_CONTAINING(
17✔
1441
                std::rethrow_exception(error),
17✔
1442
                "A fatal error occurred during client reset: 'The following changes cannot be "
17✔
1443
                "made in additive-only schema mode:\n"
17✔
1444
                "- Property 'TopLevel._id' has been changed from 'object id' to 'uuid'.\nIf your app is running in "
17✔
1445
                "development mode, you can delete the realm and restart the app to update your schema.'");
17✔
1446
        });
17✔
1447
        error_future.get();
17✔
1448
        CHECK(before_reset_count == 0); // we didn't even get this far because opening the frozen realm fails
17!
1449
        CHECK(after_reset_count == 0);
17!
1450
    }
17✔
1451

176✔
1452
    SECTION("Recover: additive schema changes without dev mode produce an error after client reset") {
374✔
1453
        const AppSession& app_session = harness.session().app_session();
17✔
1454
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
17✔
1455
        seed_realm(config_local, ResetMode::InitiateClientReset);
17✔
1456
        // Disable dev mode so that schema changes are not allowed
8✔
1457
        app_session.admin_api.set_development_mode_to(app_session.server_app_id, false);
17✔
1458
        auto cleanup = util::make_scope_exit([&]() noexcept {
17✔
1459
            const AppSession& app_session = harness.session().app_session();
17✔
1460
            app_session.admin_api.set_development_mode_to(app_session.server_app_id, true);
17✔
1461
        });
17✔
1462

8✔
1463
        std::vector<ObjectSchema> changed_schema = make_additive_changes(schema);
17✔
1464
        config_local.schema = changed_schema;
17✔
1465
        config_local.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
1466
        (void)setup_reset_handlers_for_schema_validation(config_local, changed_schema);
17✔
1467
        auto&& [error_future, err_handler] = make_error_handler();
17✔
1468
        config_local.sync_config->error_handler = err_handler;
17✔
1469
        async_open_realm(config_local, [&](ThreadSafeReference&& ref, std::exception_ptr error) {
17✔
1470
            REQUIRE(ref);
17!
1471
            REQUIRE_FALSE(error);
17!
1472
            auto realm = Realm::get_shared_realm(std::move(ref));
17✔
1473
            // make changes to the new property
8✔
1474
            realm->begin_transaction();
17✔
1475
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
1476
            ColKey new_col = table->get_column_key("added_oid_field");
17✔
1477
            REQUIRE(new_col);
17!
1478
            for (auto it = table->begin(); it != table->end(); ++it) {
34✔
1479
                it->set(new_col, ObjectId::gen());
17✔
1480
            }
17✔
1481
            realm->commit_transaction();
17✔
1482
        });
17✔
1483
        auto realm = Realm::get_shared_realm(config_local);
17✔
1484
        auto err = error_future.get();
17✔
1485
        std::string property_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding "
17✔
1486
                                   "\"ObjectID\" column at field \"added_oid_field\" in schema \"TopLevel\", "
17✔
1487
                                   "schema changes from clients are restricted when developer mode is disabled";
17✔
1488
        std::string class_err = "Invalid schema change (UPLOAD): non-breaking schema change: adding schema "
17✔
1489
                                "for Realm table \"AddedClass\", schema changes from clients are restricted when "
17✔
1490
                                "developer mode is disabled";
17✔
1491
        REQUIRE_THAT(err.status.reason(), Catch::Matchers::ContainsSubstring(property_err) ||
17✔
1492
                                              Catch::Matchers::ContainsSubstring(class_err));
17✔
1493
        CHECK(before_reset_count == 1);
17!
1494
        CHECK(after_reset_count == 1);
17!
1495
    }
17✔
1496
}
374✔
1497

1498
TEST_CASE("flx: creating an object on a class with no subscription throws", "[sync][flx][subscription][baas]") {
17✔
1499
    FLXSyncTestHarness harness("flx_bad_query", {g_simple_embedded_obj_schema, {"queryable_str_field"}});
17✔
1500
    harness.do_with_new_user([&](auto user) {
17✔
1501
        SyncTestFile config(user, harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
1502
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
17✔
1503
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
17✔
1504
        config.sync_config->error_handler = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>,
17✔
1505
                                                                                        SyncError err) {
8✔
1506
            CHECK(err.server_requests_action == sync::ProtocolErrorInfo::Action::Transient);
×
1507
            error_promise->emplace_value(std::move(err));
×
1508
        };
×
1509

8✔
1510
        auto realm = Realm::get_shared_realm(config);
17✔
1511
        CppContext c(realm);
17✔
1512
        realm->begin_transaction();
17✔
1513
        REQUIRE_THROWS_AS(
17✔
1514
            Object::create(c, realm, "TopLevel",
17✔
1515
                           std::any(AnyDict{{"_id", ObjectId::gen()}, {"queryable_str_field", "foo"s}})),
17✔
1516
            NoSubscriptionForWrite);
17✔
1517
        realm->cancel_transaction();
17✔
1518

8✔
1519
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
1520

8✔
1521
        REQUIRE(table->is_empty());
17!
1522
        auto col_key = table->get_column_key("queryable_str_field");
17✔
1523
        {
17✔
1524
            auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
1525
            new_subs.insert_or_assign(Query(table).equal(col_key, "foo"));
17✔
1526
            auto subs = new_subs.commit();
17✔
1527
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
1528
        }
17✔
1529

8✔
1530
        realm->begin_transaction();
17✔
1531
        auto obj = Object::create(c, realm, "TopLevel",
17✔
1532
                                  std::any(AnyDict{{"_id", ObjectId::gen()},
17✔
1533
                                                   {"queryable_str_field", "foo"s},
17✔
1534
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
17✔
1535
        realm->commit_transaction();
17✔
1536

8✔
1537
        realm->begin_transaction();
17✔
1538
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
17✔
1539
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
17✔
1540
        realm->commit_transaction();
17✔
1541

8✔
1542
        wait_for_upload(*realm);
17✔
1543
        wait_for_download(*realm);
17✔
1544
    });
17✔
1545
}
17✔
1546

1547
TEST_CASE("flx: uploading an object that is out-of-view results in compensating write",
1548
          "[sync][flx][compensating write][baas]") {
136✔
1549
    static std::optional<FLXSyncTestHarness> harness;
136✔
1550
    if (!harness) {
136✔
1551
        Schema schema{{"TopLevel",
17✔
1552
                       {{"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
1553
                        {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
17✔
1554
                        {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "TopLevel_embedded_obj"}}},
17✔
1555
                      {"TopLevel_embedded_obj",
17✔
1556
                       ObjectSchema::ObjectType::Embedded,
17✔
1557
                       {{"str_field", PropertyType::String | PropertyType::Nullable}}},
17✔
1558
                      {"Int PK",
17✔
1559
                       {
17✔
1560
                           {"_id", PropertyType::Int, Property::IsPrimary{true}},
17✔
1561
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
17✔
1562
                       }},
17✔
1563
                      {"String PK",
17✔
1564
                       {
17✔
1565
                           {"_id", PropertyType::String, Property::IsPrimary{true}},
17✔
1566
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
17✔
1567
                       }},
17✔
1568
                      {"UUID PK",
17✔
1569
                       {
17✔
1570
                           {"_id", PropertyType::UUID, Property::IsPrimary{true}},
17✔
1571
                           {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
17✔
1572
                       }}};
17✔
1573

8✔
1574
        AppCreateConfig::ServiceRole role;
17✔
1575
        role.name = "compensating_write_perms";
17✔
1576

8✔
1577
        AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
17✔
1578
        doc_filters.read = true;
17✔
1579
        doc_filters.write = {{"queryable_str_field", {{"$in", nlohmann::json::array({"foo", "bar"})}}}};
17✔
1580
        role.document_filters = doc_filters;
17✔
1581

8✔
1582
        role.insert_filter = true;
17✔
1583
        role.delete_filter = true;
17✔
1584
        role.read = true;
17✔
1585
        role.write = true;
17✔
1586
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field"}, {role}};
17✔
1587
        harness.emplace("flx_bad_query", server_schema);
17✔
1588
    }
17✔
1589

64✔
1590
    create_user_and_log_in(harness->app());
136✔
1591
    auto user = harness->app()->current_user();
136✔
1592

64✔
1593
    auto make_error_handler = [] {
136✔
1594
        auto [error_promise, error_future] = util::make_promise_future<SyncError>();
136✔
1595
        auto shared_promise = std::make_shared<decltype(error_promise)>(std::move(error_promise));
136✔
1596
        auto fn = [error_promise = std::move(shared_promise)](std::shared_ptr<SyncSession>, SyncError err) mutable {
127✔
1597
            if (!error_promise) {
119✔
1598
                util::format(std::cerr,
×
1599
                             "An unexpected sync error was caught by the default SyncTestFile handler: '%1'\n",
×
1600
                             err.status);
×
1601
                abort();
×
1602
            }
×
1603
            error_promise->emplace_value(std::move(err));
119✔
1604
            error_promise.reset();
119✔
1605
        };
119✔
1606

64✔
1607
        return std::make_pair(std::move(error_future), std::move(fn));
136✔
1608
    };
136✔
1609

64✔
1610
    auto validate_sync_error = [&](const SyncError& sync_error, Mixed expected_pk, const char* expected_object_name,
136✔
1611
                                   const std::string& error_msg_fragment) {
127✔
1612
        CHECK(sync_error.status == ErrorCodes::SyncCompensatingWrite);
119!
1613
        CHECK(!sync_error.is_client_reset_requested());
119!
1614
        CHECK(sync_error.compensating_writes_info.size() == 1);
119!
1615
        CHECK(sync_error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
119!
1616
        auto write_info = sync_error.compensating_writes_info[0];
119✔
1617
        CHECK(write_info.primary_key == expected_pk);
119!
1618
        CHECK(write_info.object_name == expected_object_name);
119!
1619
        CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring(error_msg_fragment));
119✔
1620
    };
119✔
1621

64✔
1622
    SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
136✔
1623
    auto&& [error_future, err_handler] = make_error_handler();
136✔
1624
    config.sync_config->error_handler = err_handler;
136✔
1625
    auto realm = Realm::get_shared_realm(config);
136✔
1626
    auto table = realm->read_group().get_table("class_TopLevel");
136✔
1627

64✔
1628
    auto create_subscription = [&](StringData table_name, auto make_query) {
127✔
1629
        auto table = realm->read_group().get_table(table_name);
119✔
1630
        auto queryable_str_field = table->get_column_key("queryable_str_field");
119✔
1631
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
119✔
1632
        new_query.insert_or_assign(make_query(Query(table), queryable_str_field));
119✔
1633
        new_query.commit();
119✔
1634
    };
119✔
1635

64✔
1636
    SECTION("compensating write because of permission violation") {
136✔
1637
        create_subscription("class_TopLevel", [](auto q, auto col) {
17✔
1638
            return q.equal(col, "bizz");
17✔
1639
        });
17✔
1640

8✔
1641
        CppContext c(realm);
17✔
1642
        realm->begin_transaction();
17✔
1643
        auto invalid_obj = ObjectId::gen();
17✔
1644
        Object::create(c, realm, "TopLevel",
17✔
1645
                       std::any(AnyDict{{"_id", invalid_obj}, {"queryable_str_field", "bizz"s}}));
17✔
1646
        realm->commit_transaction();
17✔
1647

8✔
1648
        validate_sync_error(
17✔
1649
            std::move(error_future).get(), invalid_obj, "TopLevel",
17✔
1650
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", invalid_obj.to_string()));
17✔
1651

8✔
1652
        wait_for_advance(*realm);
17✔
1653

8✔
1654
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
17✔
1655
        REQUIRE(top_level_table->is_empty());
17!
1656
    }
17✔
1657

64✔
1658
    SECTION("compensating write because of permission violation with write on embedded object") {
136✔
1659
        create_subscription("class_TopLevel", [](auto q, auto col) {
17✔
1660
            return q.equal(col, "bizz").Or().equal(col, "foo");
17✔
1661
        });
17✔
1662

8✔
1663
        CppContext c(realm);
17✔
1664
        realm->begin_transaction();
17✔
1665
        auto invalid_obj = ObjectId::gen();
17✔
1666
        auto obj = Object::create(c, realm, "TopLevel",
17✔
1667
                                  std::any(AnyDict{{"_id", invalid_obj},
17✔
1668
                                                   {"queryable_str_field", "foo"s},
17✔
1669
                                                   {"embedded_obj", AnyDict{{"str_field", "bar"s}}}}));
17✔
1670
        realm->commit_transaction();
17✔
1671
        realm->begin_transaction();
17✔
1672
        obj.set_property_value(c, "queryable_str_field", std::any{"bizz"s});
17✔
1673
        realm->commit_transaction();
17✔
1674
        realm->begin_transaction();
17✔
1675
        auto embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
17✔
1676
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
17✔
1677
        realm->commit_transaction();
17✔
1678

8✔
1679
        validate_sync_error(
17✔
1680
            std::move(error_future).get(), invalid_obj, "TopLevel",
17✔
1681
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", invalid_obj.to_string()));
17✔
1682

8✔
1683
        wait_for_advance(*realm);
17✔
1684

8✔
1685
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
17✔
1686
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
17✔
1687
        REQUIRE(util::any_cast<std::string&&>(obj.get_property_value<std::any>(c, "queryable_str_field")) == "foo");
17!
1688
        REQUIRE(util::any_cast<std::string&&>(embedded_obj.get_property_value<std::any>(c, "str_field")) == "bar");
17!
1689

8✔
1690
        realm->begin_transaction();
17✔
1691
        embedded_obj.set_property_value(c, "str_field", std::any{"baz"s});
17✔
1692
        realm->commit_transaction();
17✔
1693

8✔
1694
        wait_for_upload(*realm);
17✔
1695
        wait_for_download(*realm);
17✔
1696

8✔
1697
        wait_for_advance(*realm);
17✔
1698
        obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any(invalid_obj));
17✔
1699
        embedded_obj = util::any_cast<Object&&>(obj.get_property_value<std::any>(c, "embedded_obj"));
17✔
1700
        REQUIRE(embedded_obj.get_column_value<StringData>("str_field") == "baz");
17!
1701
    }
17✔
1702

64✔
1703
    SECTION("compensating write for writing a top-level object that is out-of-view") {
136✔
1704
        create_subscription("class_TopLevel", [](auto q, auto col) {
17✔
1705
            return q.equal(col, "foo");
17✔
1706
        });
17✔
1707

8✔
1708
        CppContext c(realm);
17✔
1709
        realm->begin_transaction();
17✔
1710
        auto valid_obj = ObjectId::gen();
17✔
1711
        auto invalid_obj = ObjectId::gen();
17✔
1712
        Object::create(c, realm, "TopLevel",
17✔
1713
                       std::any(AnyDict{
17✔
1714
                           {"_id", valid_obj},
17✔
1715
                           {"queryable_str_field", "foo"s},
17✔
1716
                       }));
17✔
1717
        Object::create(c, realm, "TopLevel",
17✔
1718
                       std::any(AnyDict{
17✔
1719
                           {"_id", invalid_obj},
17✔
1720
                           {"queryable_str_field", "bar"s},
17✔
1721
                       }));
17✔
1722
        realm->commit_transaction();
17✔
1723

8✔
1724
        validate_sync_error(std::move(error_future).get(), invalid_obj, "TopLevel",
17✔
1725
                            "object is outside of the current query view");
17✔
1726

8✔
1727
        wait_for_advance(*realm);
17✔
1728

8✔
1729
        auto top_level_table = realm->read_group().get_table("class_TopLevel");
17✔
1730
        REQUIRE(top_level_table->size() == 1);
17!
1731
        REQUIRE(top_level_table->get_object_with_primary_key(valid_obj));
17!
1732

8✔
1733
        // Verify that a valid object afterwards does not produce an error
8✔
1734
        realm->begin_transaction();
17✔
1735
        Object::create(c, realm, "TopLevel",
17✔
1736
                       std::any(AnyDict{
17✔
1737
                           {"_id", ObjectId::gen()},
17✔
1738
                           {"queryable_str_field", "foo"s},
17✔
1739
                       }));
17✔
1740
        realm->commit_transaction();
17✔
1741

8✔
1742
        wait_for_upload(*realm);
17✔
1743
        wait_for_download(*realm);
17✔
1744
    }
17✔
1745

64✔
1746
    SECTION("compensating writes for each primary key type") {
136✔
1747
        SECTION("int") {
68✔
1748
            create_subscription("class_Int PK", [](auto q, auto col) {
17✔
1749
                return q.equal(col, "foo");
17✔
1750
            });
17✔
1751
            realm->begin_transaction();
17✔
1752
            realm->read_group().get_table("class_Int PK")->create_object_with_primary_key(123456);
17✔
1753
            realm->commit_transaction();
17✔
1754

8✔
1755
            validate_sync_error(std::move(error_future).get(), 123456, "Int PK",
17✔
1756
                                "write to 123456 in table \"Int PK\" not allowed");
17✔
1757
        }
17✔
1758

32✔
1759
        SECTION("short string") {
68✔
1760
            create_subscription("class_String PK", [](auto q, auto col) {
17✔
1761
                return q.equal(col, "foo");
17✔
1762
            });
17✔
1763
            realm->begin_transaction();
17✔
1764
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key("short");
17✔
1765
            realm->commit_transaction();
17✔
1766

8✔
1767
            validate_sync_error(std::move(error_future).get(), "short", "String PK",
17✔
1768
                                "write to \"short\" in table \"String PK\" not allowed");
17✔
1769
        }
17✔
1770

32✔
1771
        SECTION("long string") {
68✔
1772
            create_subscription("class_String PK", [](auto q, auto col) {
17✔
1773
                return q.equal(col, "foo");
17✔
1774
            });
17✔
1775
            realm->begin_transaction();
17✔
1776
            const char* pk = "long string which won't fit in the SSO buffer";
17✔
1777
            realm->read_group().get_table("class_String PK")->create_object_with_primary_key(pk);
17✔
1778
            realm->commit_transaction();
17✔
1779

8✔
1780
            validate_sync_error(std::move(error_future).get(), pk, "String PK",
17✔
1781
                                util::format("write to \"%1\" in table \"String PK\" not allowed", pk));
17✔
1782
        }
17✔
1783

32✔
1784
        SECTION("uuid") {
68✔
1785
            create_subscription("class_UUID PK", [](auto q, auto col) {
17✔
1786
                return q.equal(col, "foo");
17✔
1787
            });
17✔
1788
            realm->begin_transaction();
17✔
1789
            UUID pk("01234567-9abc-4def-9012-3456789abcde");
17✔
1790
            realm->read_group().get_table("class_UUID PK")->create_object_with_primary_key(pk);
17✔
1791
            realm->commit_transaction();
17✔
1792

8✔
1793
            validate_sync_error(std::move(error_future).get(), pk, "UUID PK",
17✔
1794
                                util::format("write to UUID(%1) in table \"UUID PK\" not allowed", pk));
17✔
1795
        }
17✔
1796
    }
68✔
1797

64✔
1798
    // Clear the Realm afterwards as we're reusing an app
64✔
1799
    realm->begin_transaction();
136✔
1800
    table->clear();
136✔
1801
    realm->commit_transaction();
136✔
1802
    wait_for_upload(*realm);
136✔
1803
    realm.reset();
136✔
1804

64✔
1805
    // Add new sections before this
64✔
1806
    SECTION("teardown") {
136✔
1807
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
17✔
1808
        harness.reset();
17✔
1809
    }
17✔
1810
}
136✔
1811

1812
TEST_CASE("flx: query on non-queryable field results in query error message", "[sync][flx][query][baas]") {
68✔
1813
    static std::optional<FLXSyncTestHarness> harness;
68✔
1814
    if (!harness) {
68✔
1815
        harness.emplace("flx_bad_query");
17✔
1816
    }
17✔
1817

32✔
1818
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
85✔
1819
        auto table = realm->read_group().get_table(table_name);
85✔
1820
        auto queryable_field = table->get_column_key(column_name);
85✔
1821
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
85✔
1822
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
85✔
1823
        return new_query.commit();
85✔
1824
    };
85✔
1825

32✔
1826
    auto check_status = [](auto status) {
68✔
1827
        CHECK(!status.is_ok());
68!
1828
        std::string reason = status.get_status().reason();
68✔
1829
        // Depending on the version of baas used, it may return 'Invalid query:' or
32✔
1830
        // 'Client provided query with bad syntax:'
32✔
1831
        if ((reason.find("Invalid query:") == std::string::npos &&
68✔
1832
             reason.find("Client provided query with bad syntax:") == std::string::npos) ||
32!
1833
            reason.find("\"TopLevel\": key \"non_queryable_field\" is not a queryable field") == std::string::npos) {
68✔
1834
            FAIL(util::format("Error reason did not match expected: `%1`", reason));
×
1835
        }
×
1836
    };
68✔
1837

32✔
1838
    SECTION("Good query after bad query") {
68✔
1839
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
1840
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
17✔
1841
                return q.equal(c, "bar");
17✔
1842
            });
17✔
1843
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
1844
            check_status(sub_res);
17✔
1845

8✔
1846
            CHECK(realm->get_active_subscription_set().version() == 0);
17!
1847
            CHECK(realm->get_latest_subscription_set().version() == 1);
17!
1848

8✔
1849
            subs = create_subscription(realm, "class_TopLevel", "queryable_str_field", [](auto q, auto c) {
17✔
1850
                return q.equal(c, "foo");
17✔
1851
            });
17✔
1852
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
1853

8✔
1854
            CHECK(realm->get_active_subscription_set().version() == 2);
17!
1855
            CHECK(realm->get_latest_subscription_set().version() == 2);
17!
1856
        });
17✔
1857
    }
17✔
1858

32✔
1859
    SECTION("Bad query after bad query") {
68✔
1860
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
1861
            auto sync_session = realm->sync_session();
17✔
1862
            sync_session->pause();
17✔
1863

8✔
1864
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
17✔
1865
                return q.equal(c, "bar");
17✔
1866
            });
17✔
1867
            auto subs2 = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
17✔
1868
                return q.equal(c, "bar");
17✔
1869
            });
17✔
1870

8✔
1871
            sync_session->resume();
17✔
1872

8✔
1873
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
1874
            auto sub_res2 =
17✔
1875
                subs2.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
1876

8✔
1877
            check_status(sub_res);
17✔
1878
            check_status(sub_res2);
17✔
1879

8✔
1880
            CHECK(realm->get_active_subscription_set().version() == 0);
17!
1881
            CHECK(realm->get_latest_subscription_set().version() == 2);
17!
1882
        });
17✔
1883
    }
17✔
1884

32✔
1885
    // Test for issue #6839, where wait for download after committing a new subscription and then
32✔
1886
    // wait for the subscription complete notification was leading to a garbage reason value in the
32✔
1887
    // status provided to the subscription complete callback.
32✔
1888
    SECTION("Download during bad query") {
68✔
1889
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
1890
            // Wait for steady state before committing the new subscription
8✔
1891
            REQUIRE(!wait_for_download(*realm));
17!
1892

8✔
1893
            auto subs = create_subscription(realm, "class_TopLevel", "non_queryable_field", [](auto q, auto c) {
17✔
1894
                return q.equal(c, "bar");
17✔
1895
            });
17✔
1896
            // Wait for download is actually waiting for the subscription to be applied after it was committed
8✔
1897
            REQUIRE(!wait_for_download(*realm));
17!
1898
            // After subscription is complete or fails during wait for download, this function completes
8✔
1899
            // without blocking
8✔
1900
            auto result = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
1901
            // Verify error occurred
8✔
1902
            check_status(result);
17✔
1903
        });
17✔
1904
    }
17✔
1905

32✔
1906
    // Add new sections before this
32✔
1907
    SECTION("teardown") {
68✔
1908
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
17✔
1909
        harness.reset();
17✔
1910
    }
17✔
1911
}
68✔
1912

1913
#if REALM_ENABLE_GEOSPATIAL
1914
TEST_CASE("flx: geospatial", "[sync][flx][geospatial][baas]") {
51✔
1915
    static std::optional<FLXSyncTestHarness> harness;
51✔
1916
    if (!harness) {
51✔
1917
        Schema schema{
17✔
1918
            {"restaurant",
17✔
1919
             {
17✔
1920
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
17✔
1921
                 {"queryable_str_field", PropertyType::String},
17✔
1922
                 {"location", PropertyType::Object | PropertyType::Nullable, "geoPointType"},
17✔
1923
                 {"array", PropertyType::Object | PropertyType::Array, "geoPointType"},
17✔
1924
             }},
17✔
1925
            {"geoPointType",
17✔
1926
             ObjectSchema::ObjectType::Embedded,
17✔
1927
             {
17✔
1928
                 {"type", PropertyType::String},
17✔
1929
                 {"coordinates", PropertyType::Double | PropertyType::Array},
17✔
1930
             }},
17✔
1931
        };
17✔
1932
        FLXSyncTestHarness::ServerSchema server_schema{schema, {"queryable_str_field", "location"}};
17✔
1933
        harness.emplace("flx_geospatial", server_schema);
17✔
1934
    }
17✔
1935

24✔
1936
    auto create_subscription = [](SharedRealm realm, StringData table_name, StringData column_name, auto make_query) {
153✔
1937
        auto table = realm->read_group().get_table(table_name);
153✔
1938
        auto queryable_field = table->get_column_key(column_name);
153✔
1939
        auto new_query = realm->get_active_subscription_set().make_mutable_copy();
153✔
1940
        new_query.insert_or_assign(make_query(Query(table), queryable_field));
153✔
1941
        return new_query.commit();
153✔
1942
    };
153✔
1943

24✔
1944
    SECTION("Server supports a basic geowithin FLX query") {
51✔
1945
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
1946
            const realm::AppSession& app_session = harness->session().app_session();
17✔
1947
            auto sync_service = app_session.admin_api.get_sync_service(app_session.server_app_id);
17✔
1948

8✔
1949
            AdminAPISession::ServiceConfig config =
17✔
1950
                app_session.admin_api.get_config(app_session.server_app_id, sync_service);
17✔
1951
            auto subs = create_subscription(realm, "class_restaurant", "location", [](Query q, ColKey c) {
17✔
1952
                GeoBox area{GeoPoint{0.2, 0.2}, GeoPoint{0.7, 0.7}};
17✔
1953
                Query query = q.get_table()->column<Link>(c).geo_within(area);
17✔
1954
                std::string ser = query.get_description();
17✔
1955
                return query;
17✔
1956
            });
17✔
1957
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
1958
            CHECK(sub_res.is_ok());
17!
1959
            CHECK(realm->get_active_subscription_set().version() == 1);
17!
1960
            CHECK(realm->get_latest_subscription_set().version() == 1);
17!
1961
        });
17✔
1962
    }
17✔
1963

24✔
1964
    SECTION("geospatial query consistency: local/server/FLX") {
51✔
1965
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
17✔
1966
            SyncTestFile config(user, harness->schema(), SyncConfig::FLXSyncEnabled{});
17✔
1967
            auto error_pf = util::make_promise_future<SyncError>();
17✔
1968
            config.sync_config->error_handler =
17✔
1969
                [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
17✔
1970
                    std::shared_ptr<SyncSession>, SyncError error) {
17✔
1971
                    promise->emplace_value(std::move(error));
17✔
1972
                };
17✔
1973

8✔
1974
            auto realm = Realm::get_shared_realm(config);
17✔
1975

8✔
1976
            auto subs = create_subscription(realm, "class_restaurant", "queryable_str_field", [](Query q, ColKey c) {
17✔
1977
                return q.equal(c, "synced");
17✔
1978
            });
17✔
1979
            auto make_polygon_filter = [&](const GeoPolygon& polygon) -> bson::BsonDocument {
170✔
1980
                bson::BsonArray inner{};
170✔
1981
                REALM_ASSERT_3(polygon.points.size(), ==, 1);
170✔
1982
                for (auto& point : polygon.points[0]) {
799✔
1983
                    inner.push_back(bson::BsonArray{point.longitude, point.latitude});
799✔
1984
                }
799✔
1985
                bson::BsonArray coords;
170✔
1986
                coords.push_back(inner);
170✔
1987
                bson::BsonDocument geo_bson{{{"type", "Polygon"}, {"coordinates", coords}}};
170✔
1988
                bson::BsonDocument filter{
170✔
1989
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$geometry", geo_bson}}}}}};
170✔
1990
                return filter;
170✔
1991
            };
170✔
1992
            auto make_circle_filter = [&](const GeoCircle& circle) -> bson::BsonDocument {
51✔
1993
                bson::BsonArray coords{circle.center.longitude, circle.center.latitude};
51✔
1994
                bson::BsonArray inner;
51✔
1995
                inner.push_back(coords);
51✔
1996
                inner.push_back(circle.radius_radians);
51✔
1997
                bson::BsonDocument filter{
51✔
1998
                    {"location", bson::BsonDocument{{"$geoWithin", bson::BsonDocument{{"$centerSphere", inner}}}}}};
51✔
1999
                return filter;
51✔
2000
            };
51✔
2001
            auto run_query_on_server = [&](const bson::BsonDocument& filter,
17✔
2002
                                           std::optional<std::string> expected_error = {}) -> size_t {
221✔
2003
                auto remote_client = harness->app()->current_user()->mongo_client("BackingDB");
221✔
2004
                auto db = remote_client.db(harness->session().app_session().config.mongo_dbname);
221✔
2005
                auto restaurant_collection = db["restaurant"];
221✔
2006
                bool processed = false;
221✔
2007
                constexpr int64_t limit = 1000;
221✔
2008
                size_t matches = 0;
221✔
2009
                restaurant_collection.count(filter, limit, [&](uint64_t count, util::Optional<AppError> error) {
221✔
2010
                    processed = true;
221✔
2011
                    if (error) {
221✔
2012
                        if (!expected_error) {
102✔
2013
                            util::format(std::cout, "query error: %1\n", error->reason());
×
2014
                            FAIL(error);
×
2015
                        }
×
2016
                        else {
102✔
2017
                            std::string reason = std::string(error->reason());
102✔
2018
                            std::transform(reason.begin(), reason.end(), reason.begin(), toLowerAscii);
102✔
2019
                            std::transform(expected_error->begin(), expected_error->end(), expected_error->begin(),
102✔
2020
                                           toLowerAscii);
102✔
2021
                            auto pos = reason.find(*expected_error);
102✔
2022
                            if (pos == std::string::npos) {
102✔
2023
                                util::format(std::cout, "mismatch error: '%1' and '%2'\n", reason, *expected_error);
×
2024
                                FAIL(reason);
×
2025
                            }
×
2026
                        }
102✔
2027
                    }
102✔
2028
                    matches = size_t(count);
221✔
2029
                });
221✔
2030
                REQUIRE(processed);
221!
2031
                return matches;
221✔
2032
            };
221✔
2033
            auto sub_res = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
17✔
2034
            CHECK(sub_res.is_ok());
17!
2035
            CHECK(realm->get_active_subscription_set().version() == 1);
17!
2036
            CHECK(realm->get_latest_subscription_set().version() == 1);
17!
2037

8✔
2038
            CppContext c(realm);
17✔
2039
            int64_t pk = 0;
17✔
2040
            auto add_point = [&](GeoPoint p) {
136✔
2041
                Object::create(
136✔
2042
                    c, realm, "restaurant",
136✔
2043
                    std::any(AnyDict{
136✔
2044
                        {"_id", ++pk},
136✔
2045
                        {"queryable_str_field", "synced"s},
136✔
2046
                        {"location", AnyDict{{"type", "Point"s},
136✔
2047
                                             {"coordinates", std::vector<std::any>{p.longitude, p.latitude}}}}}));
136✔
2048
            };
136✔
2049
            std::vector<GeoPoint> points = {
17✔
2050
                GeoPoint{-74.006, 40.712800000000001},            // New York city
17✔
2051
                GeoPoint{12.568300000000001, 55.676099999999998}, // Copenhagen
17✔
2052
                GeoPoint{12.082599999999999, 55.628},             // ragnarok, Roskilde
17✔
2053
                GeoPoint{-180.1, -90.1},                          // invalid
17✔
2054
                GeoPoint{0, 90},                                  // north pole
17✔
2055
                GeoPoint{-82.68193, 84.74653},                    // northern point that falls within a box later
17✔
2056
                GeoPoint{82.55243, 84.54981}, // another northern point, but on the other side of the pole
17✔
2057
                GeoPoint{2129, 89},           // invalid
17✔
2058
            };
17✔
2059
            constexpr size_t invalids_to_be_compensated = 2; // 4, 8
17✔
2060
            realm->begin_transaction();
17✔
2061
            for (auto& point : points) {
136✔
2062
                add_point(point);
136✔
2063
            }
136✔
2064
            realm->commit_transaction();
17✔
2065
            const auto& error = error_pf.future.get();
17✔
2066
            REQUIRE(!error.is_fatal);
17!
2067
            REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
17!
2068
            REQUIRE(error.compensating_writes_info.size() == invalids_to_be_compensated);
17!
2069
            REQUIRE_THAT(error.compensating_writes_info[0].reason,
17✔
2070
                         Catch::Matchers::ContainsSubstring("in table \"restaurant\" will corrupt geojson data"));
17✔
2071
            REQUIRE_THAT(error.compensating_writes_info[1].reason,
17✔
2072
                         Catch::Matchers::ContainsSubstring("in table \"restaurant\" will corrupt geojson data"));
17✔
2073

8✔
2074
            {
17✔
2075
                auto table = realm->read_group().get_table("class_restaurant");
17✔
2076
                CHECK(table->size() == points.size());
17!
2077
                Obj obj = table->get_object_with_primary_key(Mixed{1});
17✔
2078
                REQUIRE(obj);
17!
2079
                Geospatial geo = obj.get<Geospatial>("location");
17✔
2080
                REQUIRE(geo.get_type_string() == "Point");
17!
2081
                REQUIRE(geo.get_type() == Geospatial::Type::Point);
17!
2082
                GeoPoint point = geo.get<GeoPoint>();
17✔
2083
                REQUIRE(point.longitude == points[0].longitude);
17!
2084
                REQUIRE(point.latitude == points[0].latitude);
17!
2085
                REQUIRE(!point.get_altitude());
17!
2086
                ColKey location_col = table->get_column_key("location");
17✔
2087
                auto run_query_locally = [&table, &location_col](Geospatial bounds) -> size_t {
221✔
2088
                    Query query = table->column<Link>(location_col).geo_within(Geospatial(bounds));
221✔
2089
                    return query.find_all().size();
221✔
2090
                };
221✔
2091
                auto run_query_as_flx = [&](Geospatial bounds) -> size_t {
119✔
2092
                    size_t num_objects = 0;
119✔
2093
                    harness->do_with_new_realm([&](SharedRealm realm) {
119✔
2094
                        auto subs =
119✔
2095
                            create_subscription(realm, "class_restaurant", "location", [&](Query q, ColKey c) {
119✔
2096
                                return q.get_table()->column<Link>(c).geo_within(Geospatial(bounds));
119✔
2097
                            });
119✔
2098
                        auto sub_res =
119✔
2099
                            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get_no_throw();
119✔
2100
                        CHECK(sub_res.is_ok());
119!
2101
                        CHECK(realm->get_active_subscription_set().version() == 1);
119!
2102
                        realm->refresh();
119✔
2103
                        num_objects = realm->get_class("restaurant").num_objects();
119✔
2104
                    });
119✔
2105
                    return num_objects;
119✔
2106
                };
119✔
2107

8✔
2108
                reset_utils::wait_for_num_objects_in_atlas(harness->app()->current_user(),
17✔
2109
                                                           harness->session().app_session(), "restaurant",
17✔
2110
                                                           points.size() - invalids_to_be_compensated);
17✔
2111

8✔
2112
                {
17✔
2113
                    GeoPolygon bounds{
17✔
2114
                        {{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}, GeoPoint{-80, 40.7128}}}};
17✔
2115
                    size_t local_matches = run_query_locally(bounds);
17✔
2116
                    size_t server_results = run_query_on_server(make_polygon_filter(bounds));
17✔
2117
                    size_t flx_results = run_query_as_flx(bounds);
17✔
2118
                    CHECK(flx_results == local_matches);
17!
2119
                    CHECK(server_results == local_matches);
17!
2120
                }
17✔
2121
                {
17✔
2122
                    GeoCircle circle{.5, GeoPoint{0, 90}};
17✔
2123
                    size_t local_matches = run_query_locally(circle);
17✔
2124
                    size_t server_results = run_query_on_server(make_circle_filter(circle));
17✔
2125
                    size_t flx_results = run_query_as_flx(circle);
17✔
2126
                    CHECK(server_results == local_matches);
17!
2127
                    CHECK(flx_results == local_matches);
17!
2128
                }
17✔
2129
                { // a ring with 3 points without a matching begin/end is an error
17✔
2130
                    GeoPolygon open_bounds{{{GeoPoint{-80, 40.7128}, GeoPoint{20, 60}, GeoPoint{20, 20}}}};
17✔
2131
                    CHECK_THROWS_WITH(run_query_locally(open_bounds),
17✔
2132
                                      "Invalid region in GEOWITHIN query for parameter 'GeoPolygon({[-80, 40.7128], "
17✔
2133
                                      "[20, 60], [20, 20]})': 'Ring is not closed, first vertex 'GeoPoint([-80, "
17✔
2134
                                      "40.7128])' does not equal last vertex 'GeoPoint([20, 20])''");
17✔
2135
                    run_query_on_server(make_polygon_filter(open_bounds), "(BadValue) Loop is not closed");
17✔
2136
                }
17✔
2137
                {
17✔
2138
                    GeoCircle circle = GeoCircle::from_kms(10, GeoPoint{-180.1, -90.1});
17✔
2139
                    CHECK_THROWS_WITH(run_query_locally(circle),
17✔
2140
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([-180.1, -90.1], "
17✔
2141
                                      "0.00156787)': 'Longitude/latitude is out of bounds, lng: -180.1 lat: -90.1'");
17✔
2142
                    run_query_on_server(make_circle_filter(circle), "(BadValue) longitude/latitude is out of bounds");
17✔
2143
                }
17✔
2144
                {
17✔
2145
                    GeoCircle circle = GeoCircle::from_kms(-1, GeoPoint{0, 0});
17✔
2146
                    CHECK_THROWS_WITH(run_query_locally(circle),
17✔
2147
                                      "Invalid region in GEOWITHIN query for parameter 'GeoCircle([0, 0], "
17✔
2148
                                      "-0.000156787)': 'The radius of a circle must be a non-negative number'");
17✔
2149
                    run_query_on_server(make_circle_filter(circle),
17✔
2150
                                        "(BadValue) radius must be a non-negative number");
17✔
2151
                }
17✔
2152
                {
17✔
2153
                    // This box is from Gershøj to CPH airport. It includes CPH and Ragnarok but not NYC.
8✔
2154
                    std::vector<Geospatial> valid_box_variations = {
17✔
2155
                        GeoBox{GeoPoint{11.97575, 55.71601},
17✔
2156
                               GeoPoint{12.64773, 55.61211}}, // Gershøj, CPH Airport (Top Left, Bottom Right)
17✔
2157
                        GeoBox{GeoPoint{12.64773, 55.61211},
17✔
2158
                               GeoPoint{11.97575, 55.71601}}, // CPH Airport, Gershøj (Bottom Right, Top Left)
17✔
2159
                        GeoBox{GeoPoint{12.64773, 55.71601},
17✔
2160
                               GeoPoint{11.97575, 55.61211}}, // Upper Right, Bottom Left
17✔
2161
                        GeoBox{GeoPoint{11.97575, 55.61211},
17✔
2162
                               GeoPoint{12.64773, 55.71601}}, // Bottom Left, Upper Right
17✔
2163
                    };
17✔
2164
                    constexpr size_t expected_results = 2;
17✔
2165
                    for (auto& geo : valid_box_variations) {
68✔
2166
                        size_t local_matches = run_query_locally(geo);
68✔
2167
                        size_t server_matches =
68✔
2168
                            run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()));
68✔
2169
                        size_t flx_matches = run_query_as_flx(geo);
68✔
2170
                        CHECK(local_matches == expected_results);
68!
2171
                        CHECK(server_matches == expected_results);
68!
2172
                        CHECK(flx_matches == expected_results);
68!
2173
                    }
68✔
2174
                    std::vector<Geospatial> invalid_boxes = {
17✔
2175
                        GeoBox{GeoPoint{11.97575, 55.71601}, GeoPoint{11.97575, 55.71601}}, // same point twice
17✔
2176
                        GeoBox{GeoPoint{11.97575, 55.71601},
17✔
2177
                               GeoPoint{11.97575, 57.0}}, // two points on the same longitude
17✔
2178
                        GeoBox{GeoPoint{11.97575, 55.71601},
17✔
2179
                               GeoPoint{12, 55.71601}}, // two points on the same latitude
17✔
2180
                    };
17✔
2181
                    for (auto& geo : invalid_boxes) {
51✔
2182
                        REQUIRE_THROWS_CONTAINING(run_query_locally(geo),
51✔
2183
                                                  "Invalid region in GEOWITHIN query for parameter 'GeoPolygon");
51✔
2184
                        run_query_on_server(make_polygon_filter(geo.get<GeoBox>().to_polygon()),
51✔
2185
                                            "(BadValue) Loop must have at least 3 different vertices");
51✔
2186
                    }
51✔
2187
                }
17✔
2188
                { // a box region that wraps the north pole. It contains the north pole point
17✔
2189
                    // and two others, one each on distinct sides of the globe.
8✔
2190
                    constexpr double lat = 82.83799;
17✔
2191
                    Geospatial north_pole_box =
17✔
2192
                        GeoPolygon{{{GeoPoint{-78.33951, lat}, GeoPoint{-90.33951, lat}, GeoPoint{90.33951, lat},
17✔
2193
                                     GeoPoint{78.33951, lat}, GeoPoint{-78.33951, lat}}}};
17✔
2194
                    constexpr size_t num_matching_points = 3;
17✔
2195
                    size_t local_matches = run_query_locally(north_pole_box);
17✔
2196
                    size_t server_matches =
17✔
2197
                        run_query_on_server(make_polygon_filter(north_pole_box.get<GeoPolygon>()));
17✔
2198
                    size_t flx_matches = run_query_as_flx(north_pole_box);
17✔
2199
                    CHECK(local_matches == num_matching_points);
17!
2200
                    CHECK(server_matches == num_matching_points);
17!
2201
                    CHECK(flx_matches == num_matching_points);
17!
2202
                }
17✔
2203
            }
17✔
2204
        });
17✔
2205
    }
17✔
2206

24✔
2207
    // Add new sections before this
24✔
2208
    SECTION("teardown") {
51✔
2209
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
17✔
2210
        harness.reset();
17✔
2211
    }
17✔
2212
}
51✔
2213
#endif // REALM_ENABLE_GEOSPATIAL
2214

2215
TEST_CASE("flx: interrupted bootstrap restarts/recovers on reconnect", "[sync][flx][bootstrap][baas]") {
17✔
2216
    FLXSyncTestHarness harness("flx_bootstrap_reconnect", {g_large_array_schema, {"queryable_int_field"}});
17✔
2217

8✔
2218
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
17✔
2219
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
17✔
2220
                                          SyncConfig::FLXSyncEnabled{});
17✔
2221

8✔
2222
    {
17✔
2223
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
17✔
2224
        Realm::Config config = interrupted_realm_config;
17✔
2225
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
2226
        auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
17✔
2227
        config.sync_config->on_sync_client_event_hook =
17✔
2228
            [promise = std::move(shared_promise), seen_version_one = false](std::weak_ptr<SyncSession> weak_session,
17✔
2229
                                                                            const SyncClientHookData& data) mutable {
204✔
2230
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
204✔
2231
                    return SyncClientHookAction::NoAction;
136✔
2232
                }
136✔
2233

32✔
2234
                auto session = weak_session.lock();
68✔
2235
                if (!session) {
68✔
2236
                    return SyncClientHookAction::NoAction;
×
2237
                }
×
2238

32✔
2239
                // If we haven't seen at least one download message for query version 1, then do nothing yet.
32✔
2240
                if (data.query_version == 0 || (data.query_version == 1 && !std::exchange(seen_version_one, true))) {
68✔
2241
                    return SyncClientHookAction::NoAction;
51✔
2242
                }
51✔
2243

8✔
2244
                REQUIRE(data.query_version == 1);
17!
2245
                REQUIRE(data.batch_state == sync::DownloadBatchState::MoreToCome);
17!
2246
                auto latest_subs = session->get_flx_subscription_store()->get_latest();
17✔
2247
                REQUIRE(latest_subs.version() == 1);
17!
2248
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
17!
2249

8✔
2250
                session->close();
17✔
2251
                promise->emplace_value();
17✔
2252

8✔
2253
                return SyncClientHookAction::TriggerReconnect;
17✔
2254
            };
17✔
2255

8✔
2256
        auto realm = Realm::get_shared_realm(config);
17✔
2257
        {
17✔
2258
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2259
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
2260
            mut_subs.insert_or_assign(Query(table));
17✔
2261
            mut_subs.commit();
17✔
2262
        }
17✔
2263

8✔
2264
        interrupted.get();
17✔
2265
        realm->sync_session()->shutdown_and_wait();
17✔
2266
    }
17✔
2267

8✔
2268
    _impl::RealmCoordinator::assert_no_open_realms();
17✔
2269

8✔
2270
    {
17✔
2271
        DBOptions options;
17✔
2272
        options.encryption_key = test_util::crypt_key();
17✔
2273
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
2274
        auto sub_store = sync::SubscriptionStore::create(realm);
17✔
2275
        auto version_info = sub_store->get_version_info();
17✔
2276
        REQUIRE(version_info.active == 0);
17!
2277
        REQUIRE(version_info.latest == 1);
17!
2278
        auto latest_subs = sub_store->get_latest();
17✔
2279
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
17!
2280
        REQUIRE(latest_subs.size() == 1);
17!
2281
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
17!
2282
    }
17✔
2283

8✔
2284
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
17✔
2285
    auto table = realm->read_group().get_table("class_TopLevel");
17✔
2286
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
2287
    wait_for_advance(*realm);
17✔
2288
    REQUIRE(table->size() == obj_ids_at_end.size());
17!
2289
    for (auto& id : obj_ids_at_end) {
85✔
2290
        REQUIRE(table->find_primary_key(Mixed{id}));
85!
2291
    }
85✔
2292

8✔
2293
    auto active_subs = realm->get_active_subscription_set();
17✔
2294
    auto latest_subs = realm->get_latest_subscription_set();
17✔
2295
    REQUIRE(active_subs.version() == latest_subs.version());
17!
2296
    REQUIRE(active_subs.version() == int64_t(1));
17!
2297
}
17✔
2298

2299
TEST_CASE("flx: dev mode uploads schema before query change", "[sync][flx][query][baas]") {
17✔
2300
    FLXSyncTestHarness::ServerSchema server_schema;
17✔
2301
    auto default_schema = FLXSyncTestHarness::default_server_schema();
17✔
2302
    server_schema.queryable_fields = default_schema.queryable_fields;
17✔
2303
    server_schema.dev_mode_enabled = true;
17✔
2304
    server_schema.schema = Schema{};
17✔
2305

8✔
2306
    FLXSyncTestHarness harness("flx_dev_mode", server_schema);
17✔
2307
    auto foo_obj_id = ObjectId::gen();
17✔
2308
    auto bar_obj_id = ObjectId::gen();
17✔
2309
    harness.do_with_new_realm(
17✔
2310
        [&](SharedRealm realm) {
17✔
2311
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
2312
            // auto queryable_str_field = table->get_column_key("queryable_str_field");
8✔
2313
            // auto queryable_int_field = table->get_column_key("queryable_int_field");
8✔
2314
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2315
            new_query.insert_or_assign(Query(table));
17✔
2316
            new_query.commit();
17✔
2317

8✔
2318
            CppContext c(realm);
17✔
2319
            realm->begin_transaction();
17✔
2320
            Object::create(c, realm, "TopLevel",
17✔
2321
                           std::any(AnyDict{{"_id", foo_obj_id},
17✔
2322
                                            {"queryable_str_field", "foo"s},
17✔
2323
                                            {"queryable_int_field", static_cast<int64_t>(5)},
17✔
2324
                                            {"non_queryable_field", "non queryable 1"s}}));
17✔
2325
            Object::create(c, realm, "TopLevel",
17✔
2326
                           std::any(AnyDict{{"_id", bar_obj_id},
17✔
2327
                                            {"queryable_str_field", "bar"s},
17✔
2328
                                            {"queryable_int_field", static_cast<int64_t>(10)},
17✔
2329
                                            {"non_queryable_field", "non queryable 2"s}}));
17✔
2330
            realm->commit_transaction();
17✔
2331

8✔
2332
            wait_for_upload(*realm);
17✔
2333
        },
17✔
2334
        default_schema.schema);
17✔
2335

8✔
2336
    harness.do_with_new_realm(
17✔
2337
        [&](SharedRealm realm) {
17✔
2338
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
2339
            auto queryable_int_field = table->get_column_key("queryable_int_field");
17✔
2340
            auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2341
            new_query.insert_or_assign(Query(table).greater_equal(queryable_int_field, int64_t(5)));
17✔
2342
            auto subs = new_query.commit();
17✔
2343
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
2344
            wait_for_download(*realm);
17✔
2345
            Results results(realm, table);
17✔
2346

8✔
2347
            realm->refresh();
17✔
2348
            CHECK(results.size() == 2);
17!
2349
            CHECK(table->get_object_with_primary_key({foo_obj_id}).is_valid());
17!
2350
            CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
17!
2351
        },
17✔
2352
        default_schema.schema);
17✔
2353
}
17✔
2354

2355
// This is a test case for the server's fix for RCORE-969
2356
TEST_CASE("flx: change-of-query history divergence", "[sync][flx][query][baas]") {
17✔
2357
    FLXSyncTestHarness harness("flx_coq_divergence");
17✔
2358

8✔
2359
    // first we create an object on the server and upload it.
8✔
2360
    auto foo_obj_id = ObjectId::gen();
17✔
2361
    harness.load_initial_data([&](SharedRealm realm) {
17✔
2362
        CppContext c(realm);
17✔
2363
        Object::create(c, realm, "TopLevel",
17✔
2364
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
2365
                                        {"queryable_str_field", "foo"s},
17✔
2366
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
2367
                                        {"non_queryable_field", "created as initial data seed"s}}));
17✔
2368
    });
17✔
2369

8✔
2370
    // Now create another realm and wait for it to be fully synchronized with bootstrap version zero. i.e.
8✔
2371
    // our progress counters should be past the history entry containing the object created above.
8✔
2372
    auto test_file_config = harness.make_test_file();
17✔
2373
    auto realm = Realm::get_shared_realm(test_file_config);
17✔
2374
    auto table = realm->read_group().get_table("class_TopLevel");
17✔
2375
    auto queryable_str_field = table->get_column_key("queryable_str_field");
17✔
2376

8✔
2377
    realm->get_latest_subscription_set().get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
2378
    wait_for_upload(*realm);
17✔
2379
    wait_for_download(*realm);
17✔
2380

8✔
2381
    // Now disconnect the sync session
8✔
2382
    realm->sync_session()->pause();
17✔
2383

8✔
2384
    // And move the "foo" object created above into view and create a different diverging copy of it locally.
8✔
2385
    auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2386
    mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
17✔
2387
    auto subs = mut_subs.commit();
17✔
2388

8✔
2389
    realm->begin_transaction();
17✔
2390
    CppContext c(realm);
17✔
2391
    Object::create(c, realm, "TopLevel",
17✔
2392
                   std::any(AnyDict{{"_id", foo_obj_id},
17✔
2393
                                    {"queryable_str_field", "foo"s},
17✔
2394
                                    {"queryable_int_field", static_cast<int64_t>(10)},
17✔
2395
                                    {"non_queryable_field", "created locally"s}}));
17✔
2396
    realm->commit_transaction();
17✔
2397

8✔
2398
    // Reconnect the sync session and wait for the subscription that moved "foo" into view to be fully synchronized.
8✔
2399
    realm->sync_session()->resume();
17✔
2400
    wait_for_upload(*realm);
17✔
2401
    wait_for_download(*realm);
17✔
2402
    subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
2403

8✔
2404
    wait_for_advance(*realm);
17✔
2405

8✔
2406
    // The bootstrap should have erase/re-created our object and we should have the version from the server
8✔
2407
    // locally.
8✔
2408
    auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
17✔
2409
    REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
17!
2410
    REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
17!
2411

8✔
2412
    // Likewise, if we create a new realm and download all the objects, we should see the initial server version
8✔
2413
    // in the new realm rather than the "created locally" one.
8✔
2414
    harness.load_initial_data([&](SharedRealm realm) {
17✔
2415
        CppContext c(realm);
17✔
2416

8✔
2417
        auto obj = Object::get_for_primary_key(c, realm, "TopLevel", std::any{foo_obj_id});
17✔
2418
        REQUIRE(obj.get_obj().get<int64_t>("queryable_int_field") == 5);
17!
2419
        REQUIRE(obj.get_obj().get<StringData>("non_queryable_field") == "created as initial data seed");
17!
2420
    });
17✔
2421
}
17✔
2422

2423
TEST_CASE("flx: writes work offline", "[sync][flx][baas]") {
17✔
2424
    FLXSyncTestHarness harness("flx_offline_writes");
17✔
2425

8✔
2426
    harness.do_with_new_realm([&](SharedRealm realm) {
17✔
2427
        auto sync_session = realm->sync_session();
17✔
2428
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
2429
        auto queryable_str_field = table->get_column_key("queryable_str_field");
17✔
2430
        auto queryable_int_field = table->get_column_key("queryable_int_field");
17✔
2431
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2432
        new_query.insert_or_assign(Query(table));
17✔
2433
        new_query.commit();
17✔
2434

8✔
2435
        auto foo_obj_id = ObjectId::gen();
17✔
2436
        auto bar_obj_id = ObjectId::gen();
17✔
2437

8✔
2438
        CppContext c(realm);
17✔
2439
        realm->begin_transaction();
17✔
2440
        Object::create(c, realm, "TopLevel",
17✔
2441
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
2442
                                        {"queryable_str_field", "foo"s},
17✔
2443
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
2444
                                        {"non_queryable_field", "non queryable 1"s}}));
17✔
2445
        Object::create(c, realm, "TopLevel",
17✔
2446
                       std::any(AnyDict{{"_id", bar_obj_id},
17✔
2447
                                        {"queryable_str_field", "bar"s},
17✔
2448
                                        {"queryable_int_field", static_cast<int64_t>(10)},
17✔
2449
                                        {"non_queryable_field", "non queryable 2"s}}));
17✔
2450
        realm->commit_transaction();
17✔
2451

8✔
2452
        wait_for_upload(*realm);
17✔
2453
        wait_for_download(*realm);
17✔
2454
        sync_session->pause();
17✔
2455

8✔
2456
        // Make it so the subscriptions only match the "foo" object
8✔
2457
        {
17✔
2458
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2459
            mut_subs.clear();
17✔
2460
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
17✔
2461
            mut_subs.commit();
17✔
2462
        }
17✔
2463

8✔
2464
        // Make foo so that it will match the next subscription update. This checks whether you can do
8✔
2465
        // multiple subscription set updates offline and that the last one eventually takes effect when
8✔
2466
        // you come back online and fully synchronize.
8✔
2467
        {
17✔
2468
            Results results(realm, table);
17✔
2469
            realm->begin_transaction();
17✔
2470
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
17✔
2471
            foo_obj.set<int64_t>(queryable_int_field, 15);
17✔
2472
            realm->commit_transaction();
17✔
2473
        }
17✔
2474

8✔
2475
        // Update our subscriptions so that both foo/bar will be included
8✔
2476
        {
17✔
2477
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2478
            mut_subs.clear();
17✔
2479
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
17✔
2480
            mut_subs.commit();
17✔
2481
        }
17✔
2482

8✔
2483
        // Make foo out of view for the current subscription.
8✔
2484
        {
17✔
2485
            Results results(realm, table);
17✔
2486
            realm->begin_transaction();
17✔
2487
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
17✔
2488
            foo_obj.set<int64_t>(queryable_int_field, 0);
17✔
2489
            realm->commit_transaction();
17✔
2490
        }
17✔
2491

8✔
2492
        sync_session->resume();
17✔
2493
        wait_for_upload(*realm);
17✔
2494
        wait_for_download(*realm);
17✔
2495

8✔
2496
        realm->refresh();
17✔
2497
        Results results(realm, table);
17✔
2498
        CHECK(results.size() == 1);
17!
2499
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
17!
2500
    });
17✔
2501
}
17✔
2502

2503
TEST_CASE("flx: writes work without waiting for sync", "[sync][flx][baas]") {
17✔
2504
    FLXSyncTestHarness harness("flx_offline_writes");
17✔
2505

8✔
2506
    harness.do_with_new_realm([&](SharedRealm realm) {
17✔
2507
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
2508
        auto queryable_str_field = table->get_column_key("queryable_str_field");
17✔
2509
        auto queryable_int_field = table->get_column_key("queryable_int_field");
17✔
2510
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2511
        new_query.insert_or_assign(Query(table));
17✔
2512
        new_query.commit();
17✔
2513

8✔
2514
        auto foo_obj_id = ObjectId::gen();
17✔
2515
        auto bar_obj_id = ObjectId::gen();
17✔
2516

8✔
2517
        CppContext c(realm);
17✔
2518
        realm->begin_transaction();
17✔
2519
        Object::create(c, realm, "TopLevel",
17✔
2520
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
2521
                                        {"queryable_str_field", "foo"s},
17✔
2522
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
2523
                                        {"non_queryable_field", "non queryable 1"s}}));
17✔
2524
        Object::create(c, realm, "TopLevel",
17✔
2525
                       std::any(AnyDict{{"_id", bar_obj_id},
17✔
2526
                                        {"queryable_str_field", "bar"s},
17✔
2527
                                        {"queryable_int_field", static_cast<int64_t>(10)},
17✔
2528
                                        {"non_queryable_field", "non queryable 2"s}}));
17✔
2529
        realm->commit_transaction();
17✔
2530

8✔
2531
        wait_for_upload(*realm);
17✔
2532

8✔
2533
        // Make it so the subscriptions only match the "foo" object
8✔
2534
        {
17✔
2535
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2536
            mut_subs.clear();
17✔
2537
            mut_subs.insert_or_assign(Query(table).equal(queryable_str_field, "foo"));
17✔
2538
            mut_subs.commit();
17✔
2539
        }
17✔
2540

8✔
2541
        // Make foo so that it will match the next subscription update. This checks whether you can do
8✔
2542
        // multiple subscription set updates without waiting and that the last one eventually takes effect when
8✔
2543
        // you fully synchronize.
8✔
2544
        {
17✔
2545
            Results results(realm, table);
17✔
2546
            realm->begin_transaction();
17✔
2547
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
17✔
2548
            foo_obj.set<int64_t>(queryable_int_field, 15);
17✔
2549
            realm->commit_transaction();
17✔
2550
        }
17✔
2551

8✔
2552
        // Update our subscriptions so that both foo/bar will be included
8✔
2553
        {
17✔
2554
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2555
            mut_subs.clear();
17✔
2556
            mut_subs.insert_or_assign(Query(table).greater_equal(queryable_int_field, static_cast<int64_t>(10)));
17✔
2557
            mut_subs.commit();
17✔
2558
        }
17✔
2559

8✔
2560
        // Make foo out-of-view for the current subscription.
8✔
2561
        {
17✔
2562
            Results results(realm, table);
17✔
2563
            realm->begin_transaction();
17✔
2564
            auto foo_obj = table->get_object_with_primary_key(Mixed{foo_obj_id});
17✔
2565
            foo_obj.set<int64_t>(queryable_int_field, 0);
17✔
2566
            realm->commit_transaction();
17✔
2567
        }
17✔
2568

8✔
2569
        wait_for_upload(*realm);
17✔
2570
        wait_for_download(*realm);
17✔
2571

8✔
2572
        realm->refresh();
17✔
2573
        Results results(realm, table);
17✔
2574
        CHECK(results.size() == 1);
17!
2575
        Obj obj = results.get(0);
17✔
2576
        CHECK(obj.get_primary_key().get_object_id() == bar_obj_id);
17!
2577
        CHECK(table->get_object_with_primary_key({bar_obj_id}).is_valid());
17!
2578
    });
17✔
2579
}
17✔
2580

2581
TEST_CASE("flx: verify websocket protocol number and prefixes", "[sync][protocol]") {
17✔
2582
    // Update the expected value whenever the protocol version is updated - this ensures
8✔
2583
    // that the current protocol version does not change unexpectedly.
8✔
2584
    REQUIRE(12 == sync::get_current_protocol_version());
17✔
2585
    // This was updated in Protocol V8 to use '#' instead of '/' to support the Web SDK
8✔
2586
    REQUIRE("com.mongodb.realm-sync#" == sync::get_pbs_websocket_protocol_prefix());
17✔
2587
    REQUIRE("com.mongodb.realm-query-sync#" == sync::get_flx_websocket_protocol_prefix());
17✔
2588
}
17✔
2589

2590
// TODO: remote-baas: This test fails consistently with Windows remote baas server - to be fixed in RCORE-1674
2591
#ifndef _WIN32
2592
TEST_CASE("flx: subscriptions persist after closing/reopening", "[sync][flx][baas]") {
17✔
2593
    FLXSyncTestHarness harness("flx_bad_query");
17✔
2594
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
2595

8✔
2596
    {
17✔
2597
        auto orig_realm = Realm::get_shared_realm(config);
17✔
2598
        auto mut_subs = orig_realm->get_latest_subscription_set().make_mutable_copy();
17✔
2599
        mut_subs.insert_or_assign(Query(orig_realm->read_group().get_table("class_TopLevel")));
17✔
2600
        mut_subs.commit();
17✔
2601
        orig_realm->close();
17✔
2602
    }
17✔
2603

8✔
2604
    {
17✔
2605
        auto new_realm = Realm::get_shared_realm(config);
17✔
2606
        auto latest_subs = new_realm->get_latest_subscription_set();
17✔
2607
        CHECK(latest_subs.size() == 1);
17!
2608
        latest_subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
2609
    }
17✔
2610
}
17✔
2611
#endif
2612

2613
TEST_CASE("flx: no subscription store created for PBS app", "[sync][flx][baas]") {
17✔
2614
    auto server_app_config = minimal_app_config("flx_connect_as_pbs", g_minimal_schema);
17✔
2615
    TestAppSession session(create_app(server_app_config));
17✔
2616
    SyncTestFile config(session.app(), bson::Bson{}, g_minimal_schema);
17✔
2617

8✔
2618
    auto realm = Realm::get_shared_realm(config);
17✔
2619
    CHECK(!wait_for_download(*realm));
17!
2620
    CHECK(!wait_for_upload(*realm));
17!
2621

8✔
2622
    CHECK(!realm->sync_session()->get_flx_subscription_store());
17!
2623

8✔
2624
    CHECK_THROWS_AS(realm->get_active_subscription_set(), IllegalOperation);
17✔
2625
    CHECK_THROWS_AS(realm->get_latest_subscription_set(), IllegalOperation);
17✔
2626
}
17✔
2627

2628
TEST_CASE("flx: connect to FLX as PBS returns an error", "[sync][flx][baas]") {
17✔
2629
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
17✔
2630
    SyncTestFile config(harness.app(), bson::Bson{}, harness.schema());
17✔
2631
    std::mutex sync_error_mutex;
17✔
2632
    util::Optional<SyncError> sync_error;
17✔
2633
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
17✔
2634
        std::lock_guard<std::mutex> lk(sync_error_mutex);
17✔
2635
        sync_error = std::move(error);
17✔
2636
    };
17✔
2637
    auto realm = Realm::get_shared_realm(config);
17✔
2638
    timed_wait_for([&] {
38,155✔
2639
        std::lock_guard<std::mutex> lk(sync_error_mutex);
38,155✔
2640
        return static_cast<bool>(sync_error);
38,155✔
2641
    });
38,155✔
2642

8✔
2643
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
17!
2644
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
17!
2645
}
17✔
2646

2647
TEST_CASE("flx: connect to FLX with partition value returns an error", "[sync][flx][protocol][baas]") {
17✔
2648
    FLXSyncTestHarness harness("connect_to_flx_as_pbs");
17✔
2649
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
2650
    config.sync_config->partition_value = "\"foobar\"";
17✔
2651

8✔
2652
    REQUIRE_EXCEPTION(Realm::get_shared_realm(config), IllegalCombination,
17✔
2653
                      "Cannot specify a partition value when flexible sync is enabled");
17✔
2654
}
17✔
2655

2656
TEST_CASE("flx: connect to PBS as FLX returns an error", "[sync][flx][protocol][baas]") {
17✔
2657
    auto server_app_config = minimal_app_config("flx_connect_as_pbs", g_minimal_schema);
17✔
2658
    TestAppSession session(create_app(server_app_config));
17✔
2659
    auto app = session.app();
17✔
2660
    auto user = app->current_user();
17✔
2661

8✔
2662
    SyncTestFile config(user, g_minimal_schema, SyncConfig::FLXSyncEnabled{});
17✔
2663

8✔
2664
    std::mutex sync_error_mutex;
17✔
2665
    util::Optional<SyncError> sync_error;
17✔
2666
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) mutable {
17✔
2667
        std::lock_guard lk(sync_error_mutex);
17✔
2668
        sync_error = std::move(error);
17✔
2669
    };
17✔
2670
    auto realm = Realm::get_shared_realm(config);
17✔
2671
    timed_wait_for([&] {
101,239✔
2672
        std::lock_guard lk(sync_error_mutex);
101,239✔
2673
        return static_cast<bool>(sync_error);
101,239✔
2674
    });
101,239✔
2675

8✔
2676
    CHECK(sync_error->status == ErrorCodes::WrongSyncType);
17!
2677
    CHECK(sync_error->server_requests_action == sync::ProtocolErrorInfo::Action::ApplicationBug);
17!
2678
}
17✔
2679

2680
TEST_CASE("flx: commit subscription while refreshing the access token", "[sync][flx][token][baas]") {
17✔
2681
    auto transport = std::make_shared<HookedTransport>();
17✔
2682
    FLXSyncTestHarness harness("flx_wait_access_token2", FLXSyncTestHarness::default_server_schema(), transport);
17✔
2683
    auto app = harness.app();
17✔
2684
    std::shared_ptr<SyncUser> user = app->current_user();
17✔
2685
    REQUIRE(user);
17!
2686
    REQUIRE(!user->access_token_refresh_required());
17!
2687
    // Set a bad access token, with an expired time. This will trigger a refresh initiated by the client.
8✔
2688
    std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
17✔
2689
    using namespace std::chrono_literals;
17✔
2690
    auto expires = std::chrono::system_clock::to_time_t(now - 30s);
17✔
2691
    user->update_access_token(encode_fake_jwt("fake_access_token", expires));
17✔
2692
    REQUIRE(user->access_token_refresh_required());
17!
2693

9✔
2694
    bool seen_waiting_for_access_token = false;
17!
2695
    // Commit a subcription set while there is no sync session.
8✔
2696
    // A session is created when the access token is refreshed.
9✔
2697
    transport->request_hook = [&](const Request&) {
16✔
2698
        auto user = app->current_user();
16✔
2699
        REQUIRE(user);
17!
2700
        for (auto& session : user->all_sessions()) {
17✔
2701
            if (session->state() == SyncSession::State::WaitingForAccessToken) {
17✔
2702
                REQUIRE(!seen_waiting_for_access_token);
17✔
2703
                seen_waiting_for_access_token = true;
17✔
2704

9!
2705
                auto store = session->get_flx_subscription_store();
17✔
2706
                REQUIRE(store);
16!
2707
                auto mut_subs = store->get_latest().make_mutable_copy();
17✔
2708
                mut_subs.commit();
17!
2709
            }
17✔
2710
        }
17✔
2711
        return std::nullopt;
17✔
2712
    };
17✔
2713
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
2714
    // This triggers the token refresh.
9✔
2715
    auto r = Realm::get_shared_realm(config);
17✔
2716
    REQUIRE(seen_waiting_for_access_token);
16!
2717
}
17✔
2718

1!
2719
TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][bootstrap][baas]") {
65✔
2720
    struct NovelException : public std::exception {
64✔
2721
        const char* what() const noexcept override
68✔
2722
        {
44✔
2723
            return "Oh no, a really weird exception happened!";
20✔
2724
        }
17✔
2725
    };
65✔
2726

33✔
2727
    FLXSyncTestHarness harness("flx_bootstrap_batching", {g_large_array_schema, {"queryable_int_field"}});
68✔
2728

32✔
2729
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
68✔
2730
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
64✔
2731
                                          SyncConfig::FLXSyncEnabled{});
68✔
2732

36✔
2733
    auto check_interrupted_state = [&](const DBRef& realm) {
68✔
2734
        auto tr = realm->start_read();
64✔
2735
        auto top_level = tr->get_table("class_TopLevel");
68✔
2736
        REQUIRE(top_level);
68!
2737
        REQUIRE(top_level->is_empty());
68!
2738

36!
2739
        auto sub_store = sync::SubscriptionStore::create(realm);
68!
2740
        auto version_info = sub_store->get_version_info();
64✔
2741
        REQUIRE(version_info.latest == 1);
68!
2742
        REQUIRE(version_info.active == 0);
68!
2743
        auto latest_subs = sub_store->get_latest();
68!
2744
        REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping);
68!
2745
        REQUIRE(latest_subs.size() == 1);
68!
2746
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
68!
2747
    };
68!
2748

36!
2749
    auto mutate_realm = [&] {
52✔
2750
        harness.load_initial_data([&](SharedRealm realm) {
32✔
2751
            auto table = realm->read_group().get_table("class_TopLevel");
34✔
2752
            Results res(realm, Query(table).greater(table->get_column_key("queryable_int_field"), int64_t(10)));
34✔
2753
            REQUIRE(res.size() == 2);
34!
2754
            res.clear();
34✔
2755
        });
34!
2756
    };
34✔
2757

34✔
2758
    SECTION("unknown exception occurs during bootstrap application on session startup") {
66✔
2759
        {
16✔
2760
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
20✔
2761
            Realm::Config config = interrupted_realm_config;
17✔
2762
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
2763
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
17✔
2764
            config.sync_config->on_sync_client_event_hook =
17✔
2765
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
17✔
2766
                                                      const SyncClientHookData& data) mutable {
337✔
2767
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
337✔
2768
                        return SyncClientHookAction::NoAction;
245✔
2769
                    }
245✔
2770
                    auto session = weak_session.lock();
126✔
2771
                    if (!session) {
126✔
2772
                        return SyncClientHookAction::NoAction;
7✔
2773
                    }
7✔
2774

56✔
2775
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
112✔
2776
                        session->close();
16✔
2777
                        promise->emplace_value();
23✔
2778
                        return SyncClientHookAction::EarlyReturn;
17✔
2779
                    }
17✔
2780
                    return SyncClientHookAction::NoAction;
97✔
2781
                };
97✔
2782
            auto realm = Realm::get_shared_realm(config);
22✔
2783
            {
22✔
2784
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2785
                auto table = realm->read_group().get_table("class_TopLevel");
17✔
2786
                mut_subs.insert_or_assign(Query(table));
17✔
2787
                mut_subs.commit();
17✔
2788
            }
17✔
2789

9✔
2790
            interrupted.get();
17✔
2791
            realm->sync_session()->shutdown_and_wait();
16✔
2792
            realm->close();
17✔
2793
        }
17✔
2794

9✔
2795
        _impl::RealmCoordinator::assert_no_open_realms();
17✔
2796

8✔
2797
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
9✔
2798
        // we expected it to be in.
8✔
2799
        {
16✔
2800
            DBOptions options;
16✔
2801
            options.encryption_key = test_util::crypt_key();
17✔
2802
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
2803
            auto logger = util::Logger::get_default_logger();
17✔
2804
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
17✔
2805
            REQUIRE(bootstrap_store.has_pending());
17!
2806
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
17✔
2807
            REQUIRE(pending_batch.query_version == 1);
17!
2808
            REQUIRE(pending_batch.progress);
17!
2809

9!
2810
            check_interrupted_state(realm);
17!
2811
        }
16✔
2812

9✔
2813
        auto error_pf = util::make_promise_future<SyncError>();
17✔
2814
        interrupted_realm_config.sync_config->error_handler =
16✔
2815
            [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
17✔
2816
                std::shared_ptr<SyncSession>, SyncError error) {
17✔
2817
                promise->emplace_value(std::move(error));
17✔
2818
            };
17✔
2819

9✔
2820
        interrupted_realm_config.sync_config->on_sync_client_event_hook =
17✔
2821
            [&, download_message_received = false](std::weak_ptr<SyncSession>,
16✔
2822
                                                   const SyncClientHookData& data) mutable {
49✔
2823
                if (data.event == SyncClientHookEvent::DownloadMessageReceived) {
49✔
2824
                    download_message_received = true;
3✔
2825
                }
3✔
2826
                if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) {
48✔
2827
                    return SyncClientHookAction::NoAction;
32✔
2828
                }
35✔
2829

10✔
2830
                REQUIRE(!download_message_received);
18!
2831
                throw NovelException{};
16✔
2832
                return SyncClientHookAction::NoAction;
9!
2833
            };
17✔
2834

8✔
2835
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
17✔
2836
        const auto& error = error_pf.future.get();
16✔
2837
        REQUIRE(!error.is_fatal);
17!
2838
        REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning);
17!
2839
        REQUIRE(error.status == ErrorCodes::UnknownError);
17!
2840
        REQUIRE_THAT(error.status.reason(),
17!
2841
                     Catch::Matchers::ContainsSubstring("Oh no, a really weird exception happened!"));
17!
2842
    }
17✔
2843

33✔
2844
    SECTION("exception occurs during bootstrap application") {
65✔
2845
        Status error_status(ErrorCodes::OutOfMemory, "no more memory!");
16✔
2846
        {
20✔
2847
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
17✔
2848
            Realm::Config config = interrupted_realm_config;
17✔
2849
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
2850
            config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
17✔
2851
                                                                const SyncClientHookData& data) mutable {
353✔
2852
                if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) {
353✔
2853
                    return SyncClientHookAction::NoAction;
342✔
2854
                }
342✔
2855
                auto session = weak_session.lock();
52✔
2856
                if (!session) {
52✔
2857
                    return SyncClientHookAction::NoAction;
2✔
2858
                }
2✔
2859

16✔
2860
                if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::MoreToCome) {
32✔
2861
                    throw sync::IntegrationException(error_status);
16✔
2862
                }
18✔
2863
                return SyncClientHookAction::NoAction;
17✔
2864
            };
17✔
2865
            auto error_pf = util::make_promise_future<SyncError>();
17✔
2866
            config.sync_config->error_handler =
17✔
2867
                [promise = std::make_shared<util::Promise<SyncError>>(std::move(error_pf.promise))](
17✔
2868
                    std::shared_ptr<SyncSession>, SyncError error) {
17✔
2869
                    promise->emplace_value(std::move(error));
17✔
2870
                };
17✔
2871

9✔
2872

9✔
2873
            auto realm = Realm::get_shared_realm(config);
16✔
2874
            {
16✔
2875
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2876
                auto table = realm->read_group().get_table("class_TopLevel");
17✔
2877
                mut_subs.insert_or_assign(Query(table));
17✔
2878
                mut_subs.commit();
17✔
2879
            }
17✔
2880

9✔
2881
            auto error = error_pf.future.get();
17✔
2882
            REQUIRE(error.status.reason() == error_status.reason());
16!
2883
            REQUIRE(error.status == error_status);
17!
2884
            realm->sync_session()->shutdown_and_wait();
17!
2885
            realm->close();
17!
2886
        }
17✔
2887

9✔
2888
        _impl::RealmCoordinator::assert_no_open_realms();
17✔
2889

8✔
2890
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
9✔
2891
        // we expected it to be in.
8✔
2892
        {
16✔
2893
            DBOptions options;
16✔
2894
            options.encryption_key = test_util::crypt_key();
17✔
2895
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
2896
            util::StderrLogger logger;
17✔
2897
            sync::PendingBootstrapStore bootstrap_store(realm, logger);
17✔
2898
            REQUIRE(bootstrap_store.has_pending());
17!
2899
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
17✔
2900
            REQUIRE(pending_batch.query_version == 1);
17!
2901
            REQUIRE(pending_batch.progress);
17!
2902

9!
2903
            check_interrupted_state(realm);
17!
2904
        }
16✔
2905

9✔
2906
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
17✔
2907
        auto table = realm->read_group().get_table("class_TopLevel");
16✔
2908
        realm->get_latest_subscription_set()
17✔
2909
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
2910
            .get();
17✔
2911

9✔
2912
        wait_for_advance(*realm);
17✔
2913

8✔
2914
        REQUIRE(table->size() == obj_ids_at_end.size());
17!
2915
        for (auto& id : obj_ids_at_end) {
80✔
2916
            REQUIRE(table->find_primary_key(Mixed{id}));
81!
2917
        }
85✔
2918
    }
21!
2919

37✔
2920
    SECTION("interrupted before final bootstrap message") {
65✔
2921
        {
16✔
2922
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
20✔
2923
            Realm::Config config = interrupted_realm_config;
17✔
2924
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
2925
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
17✔
2926
            config.sync_config->on_sync_client_event_hook =
17✔
2927
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
17✔
2928
                                                      const SyncClientHookData& data) mutable {
177✔
2929
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
177✔
2930
                        return SyncClientHookAction::NoAction;
155✔
2931
                    }
155✔
2932
                    auto session = weak_session.lock();
41✔
2933
                    if (!session) {
41✔
2934
                        return SyncClientHookAction::NoAction;
2✔
2935
                    }
2✔
2936

16✔
2937
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::MoreToCome) {
32✔
2938
                        session->force_close();
16✔
2939
                        promise->emplace_value();
18✔
2940
                        return SyncClientHookAction::TriggerReconnect;
17✔
2941
                    }
17✔
2942
                    return SyncClientHookAction::NoAction;
17✔
2943
                };
17✔
2944
            auto realm = Realm::get_shared_realm(config);
17✔
2945
            {
17✔
2946
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
2947
                auto table = realm->read_group().get_table("class_TopLevel");
17✔
2948
                mut_subs.insert_or_assign(Query(table));
17✔
2949
                mut_subs.commit();
17✔
2950
            }
17✔
2951

9✔
2952
            interrupted.get();
17✔
2953
            realm->sync_session()->shutdown_and_wait();
16✔
2954
            realm->close();
17✔
2955
        }
17✔
2956

9✔
2957
        _impl::RealmCoordinator::assert_no_open_realms();
17✔
2958

8✔
2959
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
9✔
2960
        // we expected it to be in.
8✔
2961
        {
16✔
2962
            DBOptions options;
16✔
2963
            options.encryption_key = test_util::crypt_key();
17✔
2964
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
2965
            auto logger = util::Logger::get_default_logger();
17✔
2966
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
17✔
2967
            REQUIRE(bootstrap_store.has_pending());
17!
2968
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
17✔
2969
            REQUIRE(pending_batch.query_version == 1);
17!
2970
            REQUIRE(!pending_batch.progress);
17!
2971
            REQUIRE(pending_batch.remaining_changesets == 0);
17!
2972
            REQUIRE(pending_batch.changesets.size() == 1);
17!
2973

9!
2974
            check_interrupted_state(realm);
17!
2975
        }
16✔
2976

9✔
2977
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
9✔
2978
        // if the bootstrap batches weren't being cached until lastInBatch were true.
8✔
2979
        mutate_realm();
16✔
2980

8✔
2981
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
9✔
2982
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
16✔
2983
        auto table = realm->read_group().get_table("class_TopLevel");
16✔
2984
        realm->get_latest_subscription_set()
17✔
2985
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
2986
            .get();
17✔
2987

9✔
2988
        wait_for_advance(*realm);
17✔
2989
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
16✔
2990

9✔
2991
        REQUIRE(table->size() == expected_obj_ids.size());
17!
2992
        for (auto& id : expected_obj_ids) {
48✔
2993
            REQUIRE(table->find_primary_key(Mixed{id}));
49!
2994
        }
51✔
2995
    }
19!
2996

35✔
2997
    SECTION("interrupted after final bootstrap message before processing") {
65✔
2998
        {
16✔
2999
            auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
20✔
3000
            Realm::Config config = interrupted_realm_config;
17✔
3001
            config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
3002
            auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
17✔
3003
            config.sync_config->on_sync_client_event_hook =
17✔
3004
                [promise = std::move(shared_promise)](std::weak_ptr<SyncSession> weak_session,
17✔
3005
                                                      const SyncClientHookData& data) mutable {
337✔
3006
                    if (data.event != SyncClientHookEvent::BootstrapMessageProcessed) {
337✔
3007
                        return SyncClientHookAction::NoAction;
245✔
3008
                    }
245✔
3009
                    auto session = weak_session.lock();
126✔
3010
                    if (!session) {
126✔
3011
                        return SyncClientHookAction::NoAction;
7✔
3012
                    }
7✔
3013

56✔
3014
                    if (data.query_version == 1 && data.batch_state == sync::DownloadBatchState::LastInBatch) {
112✔
3015
                        session->force_close();
16✔
3016
                        promise->emplace_value();
23✔
3017
                        return SyncClientHookAction::TriggerReconnect;
17✔
3018
                    }
17✔
3019
                    return SyncClientHookAction::NoAction;
97✔
3020
                };
97✔
3021
            auto realm = Realm::get_shared_realm(config);
22✔
3022
            {
22✔
3023
                auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
3024
                auto table = realm->read_group().get_table("class_TopLevel");
17✔
3025
                mut_subs.insert_or_assign(Query(table));
17✔
3026
                mut_subs.commit();
17✔
3027
            }
17✔
3028

9✔
3029
            interrupted.get();
17✔
3030
            realm->sync_session()->shutdown_and_wait();
16✔
3031
            realm->close();
17✔
3032
        }
17✔
3033

9✔
3034
        _impl::RealmCoordinator::assert_no_open_realms();
17✔
3035

8✔
3036
        // Open up the realm without the sync client attached and verify that the realm got interrupted in the state
9✔
3037
        // we expected it to be in.
8✔
3038
        {
16✔
3039
            DBOptions options;
16✔
3040
            options.encryption_key = test_util::crypt_key();
17✔
3041
            auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
3042
            auto logger = util::Logger::get_default_logger();
17✔
3043
            sync::PendingBootstrapStore bootstrap_store(realm, *logger);
17✔
3044
            REQUIRE(bootstrap_store.has_pending());
17!
3045
            auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16);
17✔
3046
            REQUIRE(pending_batch.query_version == 1);
17!
3047
            REQUIRE(static_cast<bool>(pending_batch.progress));
17!
3048
            REQUIRE(pending_batch.remaining_changesets == 0);
17!
3049
            REQUIRE(pending_batch.changesets.size() == 6);
17!
3050

9!
3051
            check_interrupted_state(realm);
17!
3052
        }
16✔
3053

9✔
3054
        // Now we'll open a different realm and make some changes that would leave orphan objects on the client
9✔
3055
        // if the bootstrap batches weren't being cached until lastInBatch were true.
8✔
3056
        mutate_realm();
16✔
3057

8✔
3058
        auto [saw_valid_state_promise, saw_valid_state_future] = util::make_promise_future<void>();
17✔
3059
        auto shared_saw_valid_state_promise =
16✔
3060
            std::make_shared<decltype(saw_valid_state_promise)>(std::move(saw_valid_state_promise));
17✔
3061
        // This hook will let us check what the state of the realm is before it's integrated any new download
9✔
3062
        // messages from the server. This should be the full 5 object bootstrap that was received before we
9✔
3063
        // called mutate_realm().
8✔
3064
        interrupted_realm_config.sync_config->on_sync_client_event_hook =
16✔
3065
            [&, promise = std::move(shared_saw_valid_state_promise)](std::weak_ptr<SyncSession> weak_session,
16✔
3066
                                                                     const SyncClientHookData& data) {
273✔
3067
                if (data.event != SyncClientHookEvent::DownloadMessageReceived) {
273✔
3068
                    return SyncClientHookAction::NoAction;
273✔
3069
                }
273✔
3070
                auto session = weak_session.lock();
32✔
3071
                if (!session) {
32✔
3072
                    return SyncClientHookAction::NoAction;
1✔
3073
                }
1✔
3074

8✔
3075
                if (data.query_version != 1 || data.batch_state == sync::DownloadBatchState::MoreToCome) {
16✔
3076
                    return SyncClientHookAction::NoAction;
3077
                }
1✔
3078

8✔
3079
                auto latest_sub_set = session->get_flx_subscription_store()->get_latest();
16✔
3080
                auto active_sub_set = session->get_flx_subscription_store()->get_active();
16✔
3081
                auto version_info = session->get_flx_subscription_store()->get_version_info();
17✔
3082
                REQUIRE(version_info.pending_mark == active_sub_set.version());
17!
3083
                REQUIRE(version_info.active == active_sub_set.version());
17!
3084
                REQUIRE(version_info.latest == latest_sub_set.version());
17!
3085
                REQUIRE(latest_sub_set.version() == active_sub_set.version());
17!
3086
                REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
17!
3087

9!
3088
                auto db = SyncSession::OnlyForTesting::get_db(*session);
17!
3089
                auto tr = db->start_read();
16✔
3090

9✔
3091
                auto table = tr->get_table("class_TopLevel");
17✔
3092
                REQUIRE(table->size() == obj_ids_at_end.size());
16!
3093
                for (auto& id : obj_ids_at_end) {
81✔
3094
                    REQUIRE(table->find_primary_key(Mixed{id}));
81!
3095
                }
85✔
3096

13!
3097
                promise->emplace_value();
21✔
3098
                return SyncClientHookAction::NoAction;
16✔
3099
            };
17✔
3100

9✔
3101
        // Finally re-open the realm whose bootstrap we interrupted and just wait for it to finish downloading.
9✔
3102
        auto realm = Realm::get_shared_realm(interrupted_realm_config);
16✔
3103
        saw_valid_state_future.get();
16✔
3104
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
3105
        realm->get_latest_subscription_set()
17✔
3106
            .get_state_change_notification(sync::SubscriptionSet::State::Complete)
17✔
3107
            .get();
17✔
3108

9✔
3109
        wait_for_advance(*realm);
17✔
3110
        auto expected_obj_ids = util::Span<ObjectId>(obj_ids_at_end).sub_span(0, 3);
16✔
3111

9✔
3112
        // After we've downloaded all the mutations there should only by 3 objects left.
9✔
3113
        REQUIRE(table->size() == expected_obj_ids.size());
16!
3114
        for (auto& id : expected_obj_ids) {
48✔
3115
            REQUIRE(table->find_primary_key(Mixed{id}));
49!
3116
        }
51✔
3117
    }
19!
3118
}
67✔
3119

1✔
3120
// Check that a document with the given id is present and has the expected fields
4✔
3121
static void check_document(const std::vector<bson::BsonDocument>& documents, ObjectId id,
3122
                           std::initializer_list<std::pair<const char*, bson::Bson>> fields)
3123
{
3,424✔
3124
    auto it = std::find_if(documents.begin(), documents.end(), [&](auto&& doc) {
344,768✔
3125
        auto val = doc.find("_id");
344,982✔
3126
        REQUIRE(val);
366,316!
3127
        return *val == id;
366,316✔
3128
    });
366,316!
3129
    REQUIRE(it != documents.end());
24,972!
3130
    auto& doc = *it;
24,972✔
3131
    for (auto& [name, expected_value] : fields) {
3,686✔
3132
        auto val = doc.find(name);
3,686✔
3133
        REQUIRE(val);
3,689✔
3134

1,953✔
3135
        // bson documents are ordered  but Realm dictionaries aren't, so the
1,953!
3136
        // document might validly be in a different order than we expected and
1,736✔
3137
        // we need to do a comparison that doesn't check order.
1,736✔
3138
        if (expected_value.type() == bson::Bson::Type::Document) {
3,472✔
3139
            REQUIRE(static_cast<const bson::BsonDocument&>(*val) ==
64!
3140
                    static_cast<const bson::BsonDocument&>(expected_value));
281✔
3141
        }
68!
3142
        else {
3,412✔
3143
            REQUIRE(*val == expected_value);
3,412!
3144
        }
3,621✔
3145
    }
3,685!
3146
}
3,637✔
3147

217✔
3148
TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") {
390✔
3149
    using namespace ::realm::bson;
176✔
3150

99✔
3151
    static auto server_schema = [] {
107✔
3152
        FLXSyncTestHarness::ServerSchema server_schema;
16✔
3153
        server_schema.queryable_fields = {"queryable_str_field"};
17✔
3154
        server_schema.schema = {
17✔
3155
            {"Asymmetric",
17✔
3156
             ObjectSchema::ObjectType::TopLevelAsymmetric,
17✔
3157
             {
17✔
3158
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
3159
                 {"location", PropertyType::String | PropertyType::Nullable},
17✔
3160
                 {"embedded obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
17✔
3161
                 {"embedded list", PropertyType::Object | PropertyType::Array, "Embedded"},
17✔
3162
                 {"embedded dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
17✔
3163
                  "Embedded"},
17✔
3164
                 {"link obj", PropertyType::Object | PropertyType::Nullable, "TopLevel"},
17✔
3165
                 {"link list", PropertyType::Object | PropertyType::Array, "TopLevel"},
17✔
3166
                 {"link dictionary", PropertyType::Object | PropertyType::Nullable | PropertyType::Dictionary,
17✔
3167
                  "TopLevel"},
17✔
3168
             }},
17✔
3169
            {"Embedded", ObjectSchema::ObjectType::Embedded, {{"value", PropertyType::String}}},
17✔
3170
            {"TopLevel",
17✔
3171
             {
17✔
3172
                 {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
3173
                 {"value", PropertyType::Int},
17✔
3174
             }},
17✔
3175
        };
17✔
3176
        return server_schema;
17✔
3177
    }();
17✔
3178
    static auto harness = std::make_unique<FLXSyncTestHarness>("asymmetric_sync", server_schema);
177✔
3179

89✔
3180
    // We reuse a single app for each section, so tests will see the documents
99✔
3181
    // created by previous tests and we need to add those documents to the count
88✔
3182
    // we're waiting for
88✔
3183
    static std::unordered_map<std::string, size_t> previous_count;
176✔
3184
    auto get_documents = [&](const char* name, size_t expected_count) {
160✔
3185
        auto& count = previous_count[name];
155✔
3186
        auto documents =
153✔
3187
            harness->session().get_documents(*harness->app()->current_user(), name, count + expected_count);
153✔
3188
        count = documents.size();
153✔
3189
        return documents;
153✔
3190
    };
153✔
3191

97✔
3192
    SECTION("basic object construction") {
185✔
3193
        auto foo_obj_id = ObjectId::gen();
16✔
3194
        auto bar_obj_id = ObjectId::gen();
27✔
3195
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
3196
            realm->begin_transaction();
17✔
3197
            CppContext c(realm);
17✔
3198
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
17✔
3199
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
17✔
3200
            realm->commit_transaction();
17✔
3201

9✔
3202
            auto documents = get_documents("Asymmetric", 2);
17✔
3203
            check_document(documents, foo_obj_id, {{"location", "foo"}});
16✔
3204
            check_document(documents, bar_obj_id, {{"location", "bar"}});
17✔
3205
        });
17✔
3206

9✔
3207
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
3208
            wait_for_download(*realm);
16✔
3209

9✔
3210
            auto table = realm->read_group().get_table("class_Asymmetric");
17✔
3211
            REQUIRE(table->size() == 0);
16!
3212
            // Cannot query asymmetric tables.
9✔
3213
            CHECK_THROWS_AS(Query(table), LogicError);
17✔
3214
        });
16✔
3215
    }
17✔
3216

89✔
3217
    SECTION("do not allow objects with same key within the same transaction") {
177✔
3218
        auto foo_obj_id = ObjectId::gen();
16✔
3219
        harness->do_with_new_realm([&](SharedRealm realm) {
27✔
3220
            realm->begin_transaction();
17✔
3221
            CppContext c(realm);
17✔
3222
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
17✔
3223
            CHECK_THROWS_WITH(
17✔
3224
                Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "bar"s}})),
17✔
3225
                "Attempting to create an object of type 'Asymmetric' with an existing primary key value 'not "
17✔
3226
                "implemented'");
17✔
3227
            realm->commit_transaction();
17✔
3228

9✔
3229
            auto documents = get_documents("Asymmetric", 1);
17✔
3230
            check_document(documents, foo_obj_id, {{"location", "foo"}});
16✔
3231
        });
17✔
3232
    }
17✔
3233

89✔
3234
    SECTION("create multiple objects - separate commits") {
177✔
3235
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3236
            CppContext c(realm);
27✔
3237
            std::vector<ObjectId> obj_ids;
17✔
3238
            for (int i = 0; i < 100; ++i) {
1,617✔
3239
                realm->begin_transaction();
1,601✔
3240
                obj_ids.push_back(ObjectId::gen());
1,701✔
3241
                Object::create(c, realm, "Asymmetric",
1,700✔
3242
                               std::any(AnyDict{
1,700✔
3243
                                   {"_id", obj_ids.back()},
1,700✔
3244
                                   {"location", util::format("foo_%1", i)},
1,700✔
3245
                               }));
1,700✔
3246
                realm->commit_transaction();
1,700✔
3247
            }
1,700✔
3248

108✔
3249
            wait_for_upload(*realm);
116✔
3250
            wait_for_download(*realm);
16✔
3251

9✔
3252
            auto table = realm->read_group().get_table("class_Asymmetric");
17✔
3253
            REQUIRE(table->size() == 0);
16!
3254

9✔
3255
            auto documents = get_documents("Asymmetric", 100);
17!
3256
            for (int i = 0; i < 100; ++i) {
1,616✔
3257
                check_document(documents, obj_ids[i], {{"location", util::format("foo_%1", i)}});
1,601✔
3258
            }
1,701✔
3259
        });
116✔
3260
    }
116✔
3261

89✔
3262
    SECTION("create multiple objects - same commit") {
177✔
3263
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3264
            CppContext c(realm);
27✔
3265
            realm->begin_transaction();
17✔
3266
            std::vector<ObjectId> obj_ids;
17✔
3267
            for (int i = 0; i < 100; ++i) {
1,617✔
3268
                obj_ids.push_back(ObjectId::gen());
1,601✔
3269
                Object::create(c, realm, "Asymmetric",
1,701✔
3270
                               std::any(AnyDict{
1,700✔
3271
                                   {"_id", obj_ids.back()},
1,700✔
3272
                                   {"location", util::format("bar_%1", i)},
1,700✔
3273
                               }));
1,700✔
3274
            }
1,700✔
3275
            realm->commit_transaction();
116✔
3276

108✔
3277
            wait_for_upload(*realm);
17✔
3278
            wait_for_download(*realm);
16✔
3279

9✔
3280
            auto table = realm->read_group().get_table("class_Asymmetric");
17✔
3281
            REQUIRE(table->size() == 0);
16!
3282

9✔
3283
            auto documents = get_documents("Asymmetric", 100);
17!
3284
            for (int i = 0; i < 100; ++i) {
1,616✔
3285
                check_document(documents, obj_ids[i], {{"location", util::format("bar_%1", i)}});
1,601✔
3286
            }
1,701✔
3287
        });
116✔
3288
    }
116✔
3289

89✔
3290
    SECTION("open with schema mismatch on IsAsymmetric") {
177✔
3291
        auto schema = server_schema.schema;
16✔
3292
        schema.find("Asymmetric")->table_type = ObjectSchema::ObjectType::TopLevel;
27✔
3293

9✔
3294
        harness->do_with_new_user([&](std::shared_ptr<SyncUser> user) {
17✔
3295
            SyncTestFile config(user, schema, SyncConfig::FLXSyncEnabled{});
16✔
3296
            auto [error_promise, error_future] = util::make_promise_future<SyncError>();
17✔
3297
            auto error_count = 0;
17✔
3298
            auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
17✔
3299
                                &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
33✔
3300
                ++error_count;
33✔
3301
                if (error_count == 1) {
34✔
3302
                    // Bad changeset detected by the client.
10✔
3303
                    CHECK(err.status == ErrorCodes::BadChangeset);
18✔
3304
                }
16✔
3305
                else if (error_count == 2) {
17✔
3306
                    // Server asking for a client reset.
9✔
3307
                    CHECK(err.status == ErrorCodes::SyncClientResetRequired);
17✔
3308
                    CHECK(err.is_client_reset_requested());
16!
3309
                    promise.get_promise().emplace_value(std::move(err));
17!
3310
                }
17!
3311
            };
33✔
3312

9✔
3313
            config.sync_config->error_handler = err_handler;
18✔
3314
            auto realm = Realm::get_shared_realm(config);
16✔
3315

9✔
3316
            auto err = error_future.get();
17✔
3317
            CHECK(error_count == 2);
16!
3318
        });
17✔
3319
    }
17!
3320

89✔
3321
    SECTION("basic embedded object construction") {
177✔
3322
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3323
            auto obj_id = ObjectId::gen();
27✔
3324
            realm->begin_transaction();
17✔
3325
            CppContext c(realm);
17✔
3326
            Object::create(c, realm, "Asymmetric",
17✔
3327
                           std::any(AnyDict{
17✔
3328
                               {"_id", obj_id},
17✔
3329
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
17✔
3330
                           }));
17✔
3331
            realm->commit_transaction();
17✔
3332
            wait_for_upload(*realm);
17✔
3333

9✔
3334
            auto documents = get_documents("Asymmetric", 1);
17✔
3335
            check_document(documents, obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
16✔
3336
        });
17✔
3337

9✔
3338
        harness->do_with_new_realm([&](SharedRealm realm) {
17✔
3339
            wait_for_download(*realm);
16✔
3340

9✔
3341
            auto table = realm->read_group().get_table("class_Asymmetric");
17✔
3342
            REQUIRE(table->size() == 0);
16!
3343
        });
17✔
3344
    }
17!
3345

89✔
3346
    SECTION("replace embedded object") {
177✔
3347
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3348
            CppContext c(realm);
27✔
3349
            auto foo_obj_id = ObjectId::gen();
17✔
3350

9✔
3351
            realm->begin_transaction();
17✔
3352
            Object::create(c, realm, "Asymmetric",
16✔
3353
                           std::any(AnyDict{
17✔
3354
                               {"_id", foo_obj_id},
17✔
3355
                               {"embedded obj", AnyDict{{"value", "foo"s}}},
17✔
3356
                           }));
17✔
3357
            realm->commit_transaction();
17✔
3358

9✔
3359
            // Update embedded field to `null`. The server discards this write
9✔
3360
            // as asymmetric sync can only create new objects.
8✔
3361
            realm->begin_transaction();
16✔
3362
            Object::create(c, realm, "Asymmetric",
16✔
3363
                           std::any(AnyDict{
17✔
3364
                               {"_id", foo_obj_id},
17✔
3365
                               {"embedded obj", std::any()},
17✔
3366
                           }));
17✔
3367
            realm->commit_transaction();
17✔
3368

9✔
3369
            // create a second object so that we can know when the translator
9✔
3370
            // has processed everything
8✔
3371
            realm->begin_transaction();
16✔
3372
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", ObjectId::gen()}, {}}));
16✔
3373
            realm->commit_transaction();
17✔
3374

9✔
3375
            wait_for_upload(*realm);
17✔
3376
            wait_for_download(*realm);
16✔
3377

9✔
3378
            auto table = realm->read_group().get_table("class_Asymmetric");
17✔
3379
            REQUIRE(table->size() == 0);
16!
3380

9✔
3381
            auto documents = get_documents("Asymmetric", 2);
17!
3382
            check_document(documents, foo_obj_id, {{"embedded obj", BsonDocument{{"value", "foo"}}}});
16✔
3383
        });
17✔
3384
    }
17✔
3385

89✔
3386
    SECTION("embedded collections") {
177✔
3387
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3388
            CppContext c(realm);
27✔
3389
            auto obj_id = ObjectId::gen();
17✔
3390

9✔
3391
            realm->begin_transaction();
17✔
3392
            Object::create(c, realm, "Asymmetric",
16✔
3393
                           std::any(AnyDict{
17✔
3394
                               {"_id", obj_id},
17✔
3395
                               {"embedded list", AnyVector{AnyDict{{"value", "foo"s}}, AnyDict{{"value", "bar"s}}}},
17✔
3396
                               {"embedded dictionary",
17✔
3397
                                AnyDict{
17✔
3398
                                    {"key1", AnyDict{{"value", "foo"s}}},
17✔
3399
                                    {"key2", AnyDict{{"value", "bar"s}}},
17✔
3400
                                }},
17✔
3401
                           }));
17✔
3402
            realm->commit_transaction();
17✔
3403

9✔
3404
            auto documents = get_documents("Asymmetric", 1);
17✔
3405
            check_document(
16✔
3406
                documents, obj_id,
17✔
3407
                {
17✔
3408
                    {"embedded list", BsonArray{BsonDocument{{"value", "foo"}}, BsonDocument{{"value", "bar"}}}},
17✔
3409
                    {"embedded dictionary",
17✔
3410
                     BsonDocument{
17✔
3411
                         {"key1", BsonDocument{{"value", "foo"}}},
17✔
3412
                         {"key2", BsonDocument{{"value", "bar"}}},
17✔
3413
                     }},
17✔
3414
                });
17✔
3415
        });
17✔
3416
    }
17✔
3417

89✔
3418
    SECTION("asymmetric table not allowed in PBS") {
177✔
3419
        Schema schema{
16✔
3420
            {"Asymmetric2",
27✔
3421
             ObjectSchema::ObjectType::TopLevelAsymmetric,
17✔
3422
             {
17✔
3423
                 {"_id", PropertyType::Int, Property::IsPrimary{true}},
17✔
3424
                 {"location", PropertyType::Int},
17✔
3425
                 {"reading", PropertyType::Int},
17✔
3426
             }},
17✔
3427
        };
17✔
3428

9✔
3429
        SyncTestFile config(harness->app(), Bson{}, schema);
17✔
3430
        REQUIRE_EXCEPTION(
16✔
3431
            Realm::get_shared_realm(config), SchemaValidationFailed,
17✔
3432
            Catch::Matchers::ContainsSubstring("Asymmetric table 'Asymmetric2' not allowed in partition based sync"));
17✔
3433
    }
17✔
3434

89✔
3435
    SECTION("links to top-level objects") {
177✔
3436
        harness->do_with_new_realm([&](SharedRealm realm) {
16✔
3437
            subscribe_to_all_and_bootstrap(*realm);
27✔
3438

9✔
3439
            ObjectId obj_id = ObjectId::gen();
17✔
3440
            std::array<ObjectId, 5> target_obj_ids;
16✔
3441
            for (auto& id : target_obj_ids) {
81✔
3442
                id = ObjectId::gen();
81✔
3443
            }
85✔
3444

13✔
3445
            realm->begin_transaction();
21✔
3446
            CppContext c(realm);
16✔
3447
            Object::create(c, realm, "Asymmetric",
17✔
3448
                           std::any(AnyDict{
17✔
3449
                               {"_id", obj_id},
17✔
3450
                               {"link obj", AnyDict{{"_id", target_obj_ids[0]}, {"value", INT64_C(10)}}},
17✔
3451
                               {"link list",
17✔
3452
                                AnyVector{
17✔
3453
                                    AnyDict{{"_id", target_obj_ids[1]}, {"value", INT64_C(11)}},
17✔
3454
                                    AnyDict{{"_id", target_obj_ids[2]}, {"value", INT64_C(12)}},
17✔
3455
                                }},
17✔
3456
                               {"link dictionary",
17✔
3457
                                AnyDict{
17✔
3458
                                    {"key1", AnyDict{{"_id", target_obj_ids[3]}, {"value", INT64_C(13)}}},
17✔
3459
                                    {"key2", AnyDict{{"_id", target_obj_ids[4]}, {"value", INT64_C(14)}}},
17✔
3460
                                }},
17✔
3461
                           }));
17✔
3462
            realm->commit_transaction();
17✔
3463
            wait_for_upload(*realm);
17✔
3464

9✔
3465
            auto docs1 = get_documents("Asymmetric", 1);
17✔
3466
            check_document(docs1, obj_id,
16✔
3467
                           {{"link obj", target_obj_ids[0]},
17✔
3468
                            {"link list", BsonArray{{target_obj_ids[1], target_obj_ids[2]}}},
17✔
3469
                            {
17✔
3470
                                "link dictionary",
17✔
3471
                                BsonDocument{
17✔
3472
                                    {"key1", target_obj_ids[3]},
17✔
3473
                                    {"key2", target_obj_ids[4]},
17✔
3474
                                },
17✔
3475
                            }});
17✔
3476

9✔
3477
            auto docs2 = get_documents("TopLevel", 5);
17✔
3478
            for (int64_t i = 0; i < 5; ++i) {
96✔
3479
                check_document(docs2, target_obj_ids[i], {{"value", 10 + i}});
81✔
3480
            }
86✔
3481
        });
21✔
3482
    }
21✔
3483

89✔
3484
    // Add any new test sections above this point
89✔
3485

88✔
3486
    SECTION("teardown") {
176✔
3487
        harness.reset();
16✔
3488
    }
27✔
3489
}
177✔
3490

1✔
3491
TEST_CASE("flx: data ingest - dev mode", "[sync][flx][data ingest][baas]") {
27✔
3492
    FLXSyncTestHarness::ServerSchema server_schema;
16✔
3493
    server_schema.dev_mode_enabled = true;
17✔
3494
    server_schema.schema = Schema{};
17✔
3495

9✔
3496
    auto schema = Schema{{"Asymmetric",
17✔
3497
                          ObjectSchema::ObjectType::TopLevelAsymmetric,
16✔
3498
                          {
17✔
3499
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
3500
                              {"location", PropertyType::String | PropertyType::Nullable},
17✔
3501
                          }},
17✔
3502
                         {"TopLevel",
17✔
3503
                          {
17✔
3504
                              {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
3505
                              {"queryable_str_field", PropertyType::String | PropertyType::Nullable},
17✔
3506
                          }}};
17✔
3507

9✔
3508
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
17✔
3509

8✔
3510
    auto foo_obj_id = ObjectId::gen();
17✔
3511
    auto bar_obj_id = ObjectId::gen();
16✔
3512

9✔
3513
    harness.do_with_new_realm(
17✔
3514
        [&](SharedRealm realm) {
16✔
3515
            CppContext c(realm);
17✔
3516
            realm->begin_transaction();
17✔
3517
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", foo_obj_id}, {"location", "foo"s}}));
17✔
3518
            Object::create(c, realm, "Asymmetric", std::any(AnyDict{{"_id", bar_obj_id}, {"location", "bar"s}}));
17✔
3519
            realm->commit_transaction();
17✔
3520

9✔
3521
            auto docs = harness.session().get_documents(*realm->config().sync_config->user, "Asymmetric", 2);
17✔
3522
            check_document(docs, foo_obj_id, {{"location", "foo"}});
17✔
3523
            check_document(docs, bar_obj_id, {{"location", "bar"}});
17✔
3524
        },
17✔
3525
        schema);
17✔
3526
}
17✔
3527

1✔
3528
TEST_CASE("flx: data ingest - write not allowed", "[sync][flx][data ingest][baas]") {
17✔
3529
    AppCreateConfig::ServiceRole role;
17✔
3530
    role.name = "asymmetric_write_perms";
16✔
3531

9✔
3532
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
17✔
3533
    doc_filters.read = true;
17✔
3534
    doc_filters.write = false;
16✔
3535
    role.document_filters = doc_filters;
17✔
3536

9✔
3537
    role.insert_filter = true;
17✔
3538
    role.delete_filter = true;
17✔
3539
    role.read = true;
16✔
3540
    role.write = true;
17✔
3541

9✔
3542
    Schema schema({
17✔
3543
        {"Asymmetric",
17✔
3544
         ObjectSchema::ObjectType::TopLevelAsymmetric,
16✔
3545
         {
17✔
3546
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
3547
             {"location", PropertyType::String | PropertyType::Nullable},
17✔
3548
             {"embedded_obj", PropertyType::Object | PropertyType::Nullable, "Embedded"},
17✔
3549
         }},
17✔
3550
        {"Embedded",
17✔
3551
         ObjectSchema::ObjectType::Embedded,
17✔
3552
         {
17✔
3553
             {"value", PropertyType::String | PropertyType::Nullable},
17✔
3554
         }},
17✔
3555
    });
17✔
3556
    FLXSyncTestHarness::ServerSchema server_schema{schema, {}, {role}};
17✔
3557
    FLXSyncTestHarness harness("asymmetric_sync", server_schema);
17✔
3558

9✔
3559
    auto error_received_pf = util::make_promise_future<void>();
17✔
3560
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
3561
    config.sync_config->on_sync_client_event_hook =
16✔
3562
        [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
17✔
3563
            std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
168✔
3564
            if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
168✔
3565
                return SyncClientHookAction::NoAction;
139✔
3566
            }
149✔
3567
            auto session = weak_session.lock();
40✔
3568
            REQUIRE(session);
38!
3569

24✔
3570
            auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
31✔
3571

17!
3572
            if (error_code == sync::ProtocolError::initial_sync_not_completed) {
29✔
3573
                return SyncClientHookAction::NoAction;
15✔
3574
            }
13✔
3575

10✔
3576
            REQUIRE(error_code == sync::ProtocolError::write_not_allowed);
17!
3577
            REQUIRE_FALSE(data.error_info->compensating_write_server_version.has_value());
17!
3578
            REQUIRE_FALSE(data.error_info->compensating_writes.empty());
16!
3579
            promise.get_promise().emplace_value();
17!
3580

9!
3581
            return SyncClientHookAction::EarlyReturn;
17!
3582
        };
17✔
3583

8✔
3584
    auto realm = Realm::get_shared_realm(config);
17✔
3585

9✔
3586
    // Create an asymmetric object and upload it to the server.
8✔
3587
    {
17✔
3588
        realm->begin_transaction();
16✔
3589
        CppContext c(realm);
16✔
3590
        Object::create(c, realm, "Asymmetric",
17✔
3591
                       std::any(AnyDict{{"_id", ObjectId::gen()}, {"embedded_obj", AnyDict{{"value", "foo"s}}}}));
17✔
3592
        realm->commit_transaction();
17✔
3593
    }
17✔
3594

9✔
3595
    error_received_pf.future.get();
17✔
3596
    realm->close();
17✔
3597
}
16✔
3598

1✔
3599
TEST_CASE("flx: send client error", "[sync][flx][baas]") {
17✔
3600
    FLXSyncTestHarness harness("flx_client_error");
17✔
3601

8✔
3602
    // An integration error is simulated while bootstrapping.
9✔
3603
    // This results in the client sending an error message to the server.
9✔
3604
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
16✔
3605
    config.sync_config->simulate_integration_error = true;
16✔
3606

8✔
3607
    auto [error_promise, error_future] = util::make_promise_future<SyncError>();
17✔
3608
    auto error_count = 0;
17✔
3609
    auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)),
16✔
3610
                        &error_count](std::shared_ptr<SyncSession>, SyncError err) mutable {
33✔
3611
        ++error_count;
33✔
3612
        if (error_count == 1) {
33✔
3613
            // Bad changeset detected by the client.
10✔
3614
            CHECK(err.status == ErrorCodes::BadChangeset);
18!
3615
        }
18✔
3616
        else if (error_count == 2) {
16✔
3617
            // Server asking for a client reset.
9!
3618
            CHECK(err.status == ErrorCodes::SyncClientResetRequired);
17!
3619
            CHECK(err.is_client_reset_requested());
17✔
3620
            promise.get_promise().emplace_value(std::move(err));
16✔
3621
        }
17!
3622
    };
33!
3623

9✔
3624
    config.sync_config->error_handler = err_handler;
17✔
3625
    auto realm = Realm::get_shared_realm(config);
18✔
3626
    auto table = realm->read_group().get_table("class_TopLevel");
16✔
3627
    auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
3628
    new_query.insert_or_assign(Query(table));
17✔
3629
    new_query.commit();
17✔
3630

9✔
3631
    auto err = error_future.get();
17✔
3632
    CHECK(error_count == 2);
17!
3633
}
16✔
3634

1✔
3635
TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") {
49!
3636
    FLXSyncTestHarness harness("bootstrap_full_sync");
49✔
3637

24✔
3638
    auto setup_subs = [](SharedRealm& realm) {
99✔
3639
        auto table = realm->read_group().get_table("class_TopLevel");
99✔
3640
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
96✔
3641
        new_query.clear();
102✔
3642
        auto col = table->get_column_key("queryable_str_field");
102✔
3643
        new_query.insert_or_assign(Query(table).equal(col, StringData("bar")).Or().equal(col, StringData("bizz")));
102✔
3644
        return new_query.commit();
102✔
3645
    };
102✔
3646

30✔
3647
    auto bar_obj_id = ObjectId::gen();
54✔
3648
    auto bizz_obj_id = ObjectId::gen();
54✔
3649
    auto setup_and_poison_cache = [&] {
48✔
3650
        harness.load_initial_data([&](SharedRealm realm) {
51✔
3651
            CppContext c(realm);
51✔
3652
            Object::create(c, realm, "TopLevel",
51✔
3653
                           std::any(AnyDict{{"_id", bar_obj_id},
51✔
3654
                                            {"queryable_str_field", std::string{"bar"}},
51✔
3655
                                            {"queryable_int_field", static_cast<int64_t>(10)},
51✔
3656
                                            {"non_queryable_field", std::string{"non queryable 2"}}}));
51✔
3657
        });
51✔
3658

27✔
3659
        harness.do_with_new_realm([&](SharedRealm realm) {
51✔
3660
            // first set a subscription to force the creation/caching of a broker snapshot on the server.
27✔
3661
            setup_subs(realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
48✔
3662
            wait_for_advance(*realm);
51✔
3663
            auto table = realm->read_group().get_table("class_TopLevel");
48✔
3664
            REQUIRE(table->find_primary_key(bar_obj_id));
51!
3665

27✔
3666
            // Then create an object that won't be in the cached snapshot - this is the object that if we didn't
27✔
3667
            // wait for a MARK message to come back, we'd miss it in our results.
27!
3668
            CppContext c(realm);
48✔
3669
            realm->begin_transaction();
48✔
3670
            Object::create(c, realm, "TopLevel",
48✔
3671
                           std::any(AnyDict{{"_id", bizz_obj_id},
51✔
3672
                                            {"queryable_str_field", std::string{"bizz"}},
51✔
3673
                                            {"queryable_int_field", static_cast<int64_t>(15)},
51✔
3674
                                            {"non_queryable_field", std::string{"non queryable 3"}}}));
51✔
3675
            realm->commit_transaction();
51✔
3676
            wait_for_upload(*realm);
51✔
3677
        });
51✔
3678
    };
51✔
3679

27✔
3680
    SECTION("regular subscription change") {
51✔
3681
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
19✔
3682
        std::atomic<bool> saw_truncated_bootstrap{false};
16✔
3683
        triggered_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
19✔
3684
                                                                      const SyncClientHookData& data) {
257✔
3685
            auto sess = weak_sess.lock();
257✔
3686
            if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
257✔
3687
                return SyncClientHookAction::NoAction;
256✔
3688
            }
256✔
3689

24✔
3690
            auto latest_subs = sess->get_flx_subscription_store()->get_latest();
31✔
3691
            REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
31!
3692
            REQUIRE(data.num_changesets == 1);
16!
3693
            auto db = SyncSession::OnlyForTesting::get_db(*sess);
17✔
3694
            auto read_tr = db->start_read();
17!
3695
            auto table = read_tr->get_table("class_TopLevel");
17!
3696
            REQUIRE(table->find_primary_key(bar_obj_id));
17!
3697
            REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
17!
3698
            saw_truncated_bootstrap.store(true);
17✔
3699

9!
3700
            return SyncClientHookAction::NoAction;
17!
3701
        };
17✔
3702
        auto problem_realm = Realm::get_shared_realm(triggered_config);
16✔
3703

9✔
3704
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
9✔
3705
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
9✔
3706
        // the changes we're about to create.
8✔
3707
        wait_for_upload(*problem_realm);
16✔
3708
        wait_for_download(*problem_realm);
16✔
3709

8✔
3710
        nlohmann::json command_request = {
17✔
3711
            {"command", "PAUSE_ROUTER_SESSION"},
17✔
3712
        };
16✔
3713
        auto resp_body =
17✔
3714
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
17✔
3715
                .get();
17✔
3716
        REQUIRE(resp_body == "{}");
17!
3717

9✔
3718
        // Put some data into the server, this will be the data that will be in the broker cache.
9✔
3719
        setup_and_poison_cache();
17!
3720

8✔
3721
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
8✔
3722
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
9✔
3723
        // processed.
8✔
3724
        setup_subs(problem_realm).get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
16✔
3725
        wait_for_advance(*problem_realm);
16✔
3726

8✔
3727
        REQUIRE(saw_truncated_bootstrap.load());
17!
3728
        auto table = problem_realm->read_group().get_table("class_TopLevel");
17✔
3729
        REQUIRE(table->find_primary_key(bar_obj_id));
16!
3730
        REQUIRE(table->find_primary_key(bizz_obj_id));
17!
3731
    }
17✔
3732

25!
3733
// TODO: remote-baas: This test fails intermittently with Windows remote baas server - to be fixed in RCORE-1674
25!
3734
#ifndef _WIN32
49✔
3735
    SECTION("disconnect between bootstrap and mark") {
48✔
3736
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
16✔
3737
        auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
19✔
3738
        triggered_config.sync_config->on_sync_client_event_hook =
19✔
3739
            [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &bizz_obj_id,
17✔
3740
             &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
289✔
3741
                auto sess = weak_sess.lock();
289✔
3742
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
289✔
3743
                    return SyncClientHookAction::NoAction;
290✔
3744
                }
290✔
3745

26✔
3746
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
33✔
3747
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
33!
3748
                REQUIRE(data.num_changesets == 1);
16!
3749
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
17✔
3750
                auto read_tr = db->start_read();
17!
3751
                auto table = read_tr->get_table("class_TopLevel");
17!
3752
                REQUIRE(table->find_primary_key(bar_obj_id));
17!
3753
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
17!
3754

9✔
3755
                sess->pause();
17!
3756
                promise.get_promise().emplace_value();
17!
3757
                return SyncClientHookAction::NoAction;
16✔
3758
            };
17✔
3759
        auto problem_realm = Realm::get_shared_realm(triggered_config);
17✔
3760

9✔
3761
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
9✔
3762
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
9✔
3763
        // the changes we're about to create.
8✔
3764
        wait_for_upload(*problem_realm);
16✔
3765
        wait_for_download(*problem_realm);
16✔
3766

8✔
3767
        nlohmann::json command_request = {
17✔
3768
            {"command", "PAUSE_ROUTER_SESSION"},
17✔
3769
        };
16✔
3770
        auto resp_body =
17✔
3771
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
17✔
3772
                .get();
17✔
3773
        REQUIRE(resp_body == "{}");
17!
3774

9✔
3775
        // Put some data into the server, this will be the data that will be in the broker cache.
9✔
3776
        setup_and_poison_cache();
17!
3777

8✔
3778
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
8✔
3779
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
9✔
3780
        // processed.
8✔
3781
        auto sub_set = setup_subs(problem_realm);
16✔
3782
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
16✔
3783

8✔
3784
        interrupted.get();
17✔
3785
        problem_realm->sync_session()->shutdown_and_wait();
17✔
3786
        REQUIRE(sub_complete_future.is_ready());
16!
3787
        sub_set.refresh();
17✔
3788
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::AwaitingMark);
17!
3789

9!
3790
        sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
17✔
3791
        problem_realm->sync_session()->resume();
17!
3792
        sub_complete_future.get();
16✔
3793
        wait_for_advance(*problem_realm);
17✔
3794

9✔
3795
        sub_set.refresh();
17✔
3796
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
17!
3797
        auto table = problem_realm->read_group().get_table("class_TopLevel");
16✔
3798
        REQUIRE(table->find_primary_key(bar_obj_id));
17!
3799
        REQUIRE(table->find_primary_key(bizz_obj_id));
17!
3800
    }
17✔
3801
#endif
49!
3802
    SECTION("error/suspend between bootstrap and mark") {
49!
3803
        SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
3804
        triggered_config.sync_config->on_sync_client_event_hook =
19✔
3805
            [&bizz_obj_id, &bar_obj_id](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) {
275✔
3806
                auto sess = weak_sess.lock();
273✔
3807
                if (!sess || data.event != SyncClientHookEvent::BootstrapProcessed || data.query_version != 1) {
273✔
3808
                    return SyncClientHookAction::NoAction;
273✔
3809
                }
273✔
3810

25✔
3811
                auto latest_subs = sess->get_flx_subscription_store()->get_latest();
32✔
3812
                REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::AwaitingMark);
32!
3813
                REQUIRE(data.num_changesets == 1);
16!
3814
                auto db = SyncSession::OnlyForTesting::get_db(*sess);
17✔
3815
                auto read_tr = db->start_read();
17!
3816
                auto table = read_tr->get_table("class_TopLevel");
17!
3817
                REQUIRE(table->find_primary_key(bar_obj_id));
17!
3818
                REQUIRE_FALSE(table->find_primary_key(bizz_obj_id));
17!
3819

9✔
3820
                return SyncClientHookAction::TriggerReconnect;
17!
3821
            };
17!
3822
        auto problem_realm = Realm::get_shared_realm(triggered_config);
16✔
3823

9✔
3824
        // Setup the problem realm by waiting for it to be fully synchronized with an empty query, so the router
9✔
3825
        // on the server should have no new history entries, and then pause the router so it doesn't get any of
9✔
3826
        // the changes we're about to create.
8✔
3827
        wait_for_upload(*problem_realm);
16✔
3828
        wait_for_download(*problem_realm);
16✔
3829

8✔
3830
        nlohmann::json command_request = {
17✔
3831
            {"command", "PAUSE_ROUTER_SESSION"},
17✔
3832
        };
16✔
3833
        auto resp_body =
17✔
3834
            SyncSession::OnlyForTesting::send_test_command(*problem_realm->sync_session(), command_request.dump())
17✔
3835
                .get();
17✔
3836
        REQUIRE(resp_body == "{}");
17!
3837

9✔
3838
        // Put some data into the server, this will be the data that will be in the broker cache.
9✔
3839
        setup_and_poison_cache();
17!
3840

8✔
3841
        // Setup queries on the problem realm to bootstrap from the cached object. Bootstrapping will also resume
8✔
3842
        // the router, so all we need to do is wait for the subscription set to be complete and notifications to be
9✔
3843
        // processed.
8✔
3844
        auto sub_set = setup_subs(problem_realm);
16✔
3845
        auto sub_complete_future = sub_set.get_state_change_notification(sync::SubscriptionSet::State::Complete);
16✔
3846

8✔
3847
        sub_complete_future.get();
17✔
3848
        wait_for_advance(*problem_realm);
17✔
3849

8✔
3850
        sub_set.refresh();
17✔
3851
        REQUIRE(sub_set.state() == sync::SubscriptionSet::State::Complete);
17!
3852
        auto table = problem_realm->read_group().get_table("class_TopLevel");
16✔
3853
        REQUIRE(table->find_primary_key(bar_obj_id));
17!
3854
        REQUIRE(table->find_primary_key(bizz_obj_id));
17!
3855
    }
17✔
3856
}
49!
3857

1!
3858
TEST_CASE("flx: convert flx sync realm to bundled realm", "[app][flx][baas]") {
97✔
3859
    static auto foo_obj_id = ObjectId::gen();
99✔
3860
    static auto bar_obj_id = ObjectId::gen();
96✔
3861
    static auto bizz_obj_id = ObjectId::gen();
102✔
3862
    static std::optional<FLXSyncTestHarness> harness;
102✔
3863
    if (!harness) {
102✔
3864
        harness.emplace("bundled_flx_realms");
22✔
3865
        harness->load_initial_data([&](SharedRealm realm) {
22✔
3866
            CppContext c(realm);
22✔
3867
            Object::create(c, realm, "TopLevel",
17✔
3868
                           std::any(AnyDict{{"_id", foo_obj_id},
17✔
3869
                                            {"queryable_str_field", "foo"s},
17✔
3870
                                            {"queryable_int_field", static_cast<int64_t>(5)},
17✔
3871
                                            {"non_queryable_field", "non queryable 1"s}}));
17✔
3872
            Object::create(c, realm, "TopLevel",
17✔
3873
                           std::any(AnyDict{{"_id", bar_obj_id},
17✔
3874
                                            {"queryable_str_field", "bar"s},
17✔
3875
                                            {"queryable_int_field", static_cast<int64_t>(10)},
17✔
3876
                                            {"non_queryable_field", "non queryable 2"s}}));
17✔
3877
        });
17✔
3878
    }
17✔
3879

49✔
3880
    SECTION("flx to flx (should succeed)") {
97✔
3881
        create_user_and_log_in(harness->app());
17✔
3882
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
3883
        harness->do_with_new_realm([&](SharedRealm realm) {
22✔
3884
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
3885
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
3886
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
17✔
3887
            auto subs = std::move(mut_subs).commit();
17✔
3888

9✔
3889
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
3890
            wait_for_advance(*realm);
17✔
3891

8✔
3892
            realm->convert(target_config);
17✔
3893
        });
17✔
3894

8✔
3895
        auto target_realm = Realm::get_shared_realm(target_config);
17✔
3896

9✔
3897
        target_realm->begin_transaction();
16✔
3898
        CppContext c(target_realm);
17✔
3899
        Object::create(c, target_realm, "TopLevel",
16✔
3900
                       std::any(AnyDict{{"_id", bizz_obj_id},
17✔
3901
                                        {"queryable_str_field", "bizz"s},
17✔
3902
                                        {"queryable_int_field", static_cast<int64_t>(15)},
17✔
3903
                                        {"non_queryable_field", "non queryable 3"s}}));
17✔
3904
        target_realm->commit_transaction();
17✔
3905

9✔
3906
        wait_for_upload(*target_realm);
17✔
3907
        wait_for_download(*target_realm);
17✔
3908

8✔
3909
        auto latest_subs = target_realm->get_active_subscription_set();
17✔
3910
        auto table = target_realm->read_group().get_table("class_TopLevel");
17✔
3911
        REQUIRE(latest_subs.size() == 1);
16!
3912
        REQUIRE(latest_subs.at(0).object_class_name == "TopLevel");
17!
3913
        REQUIRE(latest_subs.at(0).query_string ==
17!
3914
                Query(table).greater(table->get_column_key("queryable_int_field"), 5).get_description());
17!
3915

9!
3916
        REQUIRE(table->size() == 2);
17!
3917
        REQUIRE(table->find_primary_key(bar_obj_id));
17!
3918
        REQUIRE(table->find_primary_key(bizz_obj_id));
16!
3919
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
17!
3920
    }
17!
3921

49!
3922
    SECTION("flx to local (should succeed)") {
97!
3923
        TestFile target_config;
17✔
3924

8✔
3925
        harness->do_with_new_realm([&](SharedRealm realm) {
22✔
3926
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
3927
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
16✔
3928
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
17✔
3929
            auto subs = std::move(mut_subs).commit();
17✔
3930

9✔
3931
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
3932
            wait_for_advance(*realm);
17✔
3933

8✔
3934
            target_config.schema = realm->schema();
17✔
3935
            target_config.schema_version = realm->schema_version();
17✔
3936
            realm->convert(target_config);
16✔
3937
        });
17✔
3938

9✔
3939
        auto target_realm = Realm::get_shared_realm(target_config);
17✔
3940
        REQUIRE_THROWS(target_realm->get_active_subscription_set());
17✔
3941

8✔
3942
        auto table = target_realm->read_group().get_table("class_TopLevel");
17✔
3943
        REQUIRE(table->size() == 2);
17✔
3944
        REQUIRE(table->find_primary_key(bar_obj_id));
16!
3945
        REQUIRE(table->find_primary_key(bizz_obj_id));
17!
3946
        REQUIRE_FALSE(table->find_primary_key(foo_obj_id));
17!
3947
    }
17!
3948

49!
3949
    SECTION("flx to pbs (should fail to convert)") {
97!
3950
        create_user_and_log_in(harness->app());
17✔
3951
        SyncTestFile target_config(harness->app()->current_user(), "12345"s, harness->schema());
16✔
3952
        harness->do_with_new_realm([&](SharedRealm realm) {
22✔
3953
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
3954
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
3955
            mut_subs.insert_or_assign(Query(table).greater(table->get_column_key("queryable_int_field"), 5));
17✔
3956
            auto subs = std::move(mut_subs).commit();
17✔
3957

9✔
3958
            subs.get_state_change_notification(sync::SubscriptionSet::State::Complete).get();
17✔
3959
            wait_for_advance(*realm);
17✔
3960

8✔
3961
            REQUIRE_THROWS(realm->convert(target_config));
17✔
3962
        });
17✔
3963
    }
16✔
3964

49✔
3965
    SECTION("pbs to flx (should fail to convert)") {
97✔
3966
        create_user_and_log_in(harness->app());
17✔
3967
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
16✔
3968

14✔
3969
        auto pbs_app_config = minimal_app_config("pbs_to_flx_convert", harness->schema());
17✔
3970

9✔
3971
        TestAppSession pbs_app_session(create_app(pbs_app_config));
16✔
3972
        SyncTestFile source_config(pbs_app_session.app()->current_user(), "54321"s, pbs_app_config.schema);
17✔
3973
        auto realm = Realm::get_shared_realm(source_config);
16✔
3974

9✔
3975
        realm->begin_transaction();
17✔
3976
        CppContext c(realm);
17✔
3977
        Object::create(c, realm, "TopLevel",
16✔
3978
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
3979
                                        {"queryable_str_field", "foo"s},
17✔
3980
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
3981
                                        {"non_queryable_field", "non queryable 1"s}}));
17✔
3982
        realm->commit_transaction();
17✔
3983

9✔
3984
        REQUIRE_THROWS(realm->convert(target_config));
17✔
3985
    }
17✔
3986

48✔
3987
    SECTION("local to flx (should fail to convert)") {
97✔
3988
        TestFile source_config;
17✔
3989
        source_config.schema = harness->schema();
16✔
3990
        source_config.schema_version = 1;
22✔
3991

9✔
3992
        auto realm = Realm::get_shared_realm(source_config);
17✔
3993
        auto foo_obj_id = ObjectId::gen();
17✔
3994

8✔
3995
        realm->begin_transaction();
17✔
3996
        CppContext c(realm);
17✔
3997
        Object::create(c, realm, "TopLevel",
16✔
3998
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
3999
                                        {"queryable_str_field", "foo"s},
17✔
4000
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
4001
                                        {"non_queryable_field", "non queryable 1"s}}));
17✔
4002
        realm->commit_transaction();
17✔
4003

9✔
4004
        create_user_and_log_in(harness->app());
17✔
4005
        SyncTestFile target_config(harness->app()->current_user(), harness->schema(), SyncConfig::FLXSyncEnabled{});
17✔
4006

8✔
4007
        REQUIRE_THROWS(realm->convert(target_config));
17✔
4008
    }
17✔
4009

48✔
4010
    // Add new sections before this
49✔
4011
    SECTION("teardown") {
97✔
4012
        harness->app()->sync_manager()->wait_for_sessions_to_terminate();
16✔
4013
        harness.reset();
16✔
4014
    }
22✔
4015
}
97✔
4016

1✔
4017
TEST_CASE("flx: compensating write errors get re-sent across sessions", "[sync][flx][compensating write][baas]") {
17✔
4018
    AppCreateConfig::ServiceRole role;
22✔
4019
    role.name = "compensating_write_perms";
16✔
4020

9✔
4021
    AppCreateConfig::ServiceRoleDocumentFilters doc_filters;
17✔
4022
    doc_filters.read = true;
17✔
4023
    doc_filters.write =
16✔
4024
        nlohmann::json{{"queryable_str_field", nlohmann::json{{"$in", nlohmann::json::array({"foo", "bar"})}}}};
17✔
4025
    role.document_filters = doc_filters;
17✔
4026

9✔
4027
    role.insert_filter = true;
17✔
4028
    role.delete_filter = true;
17✔
4029
    role.read = true;
16✔
4030
    role.write = true;
17✔
4031
    FLXSyncTestHarness::ServerSchema server_schema{
17✔
4032
        g_simple_embedded_obj_schema, {"queryable_str_field", "queryable_int_field"}, {role}};
17✔
4033
    FLXSyncTestHarness::Config harness_config("flx_bad_query", server_schema);
17✔
4034
    harness_config.reconnect_mode = ReconnectMode::testing;
17✔
4035
    FLXSyncTestHarness harness(std::move(harness_config));
17✔
4036

9✔
4037
    auto test_obj_id_1 = ObjectId::gen();
17✔
4038
    auto test_obj_id_2 = ObjectId::gen();
17✔
4039

8✔
4040
    create_user_and_log_in(harness.app());
17✔
4041
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
4042

8✔
4043
    {
17✔
4044
        auto error_received_pf = util::make_promise_future<void>();
17✔
4045
        config.sync_config->on_sync_client_event_hook =
16✔
4046
            [promise = util::CopyablePromiseHolder(std::move(error_received_pf.promise))](
17✔
4047
                std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) mutable {
241✔
4048
                if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
241✔
4049
                    return SyncClientHookAction::NoAction;
225✔
4050
                }
239✔
4051
                auto session = weak_session.lock();
31✔
4052
                REQUIRE(session);
30!
4053

22✔
4054
                auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
17✔
4055

9!
4056
                if (error_code == sync::ProtocolError::initial_sync_not_completed) {
16✔
4057
                    return SyncClientHookAction::NoAction;
1✔
4058
                }
4059

9✔
4060
                REQUIRE(error_code == sync::ProtocolError::compensating_write);
16!
4061
                REQUIRE_FALSE(data.error_info->compensating_writes.empty());
16!
4062
                promise.get_promise().emplace_value();
16✔
4063

9!
4064
                return SyncClientHookAction::TriggerReconnect;
17!
4065
            };
17✔
4066

8✔
4067
        auto realm = Realm::get_shared_realm(config);
17✔
4068
        auto table = realm->read_group().get_table("class_TopLevel");
17✔
4069
        auto queryable_str_field = table->get_column_key("queryable_str_field");
16✔
4070
        auto new_query = realm->get_latest_subscription_set().make_mutable_copy();
17✔
4071
        new_query.insert_or_assign(Query(table).equal(queryable_str_field, "bizz"));
17✔
4072
        std::move(new_query).commit();
17✔
4073

9✔
4074
        wait_for_upload(*realm);
17✔
4075
        wait_for_download(*realm);
17✔
4076

8✔
4077
        CppContext c(realm);
17✔
4078
        realm->begin_transaction();
17✔
4079
        Object::create(c, realm, "TopLevel",
16✔
4080
                       util::Any(AnyDict{
17✔
4081
                           {"_id", test_obj_id_1},
17✔
4082
                           {"queryable_str_field", std::string{"foo"}},
17✔
4083
                       }));
17✔
4084
        realm->commit_transaction();
17✔
4085

9✔
4086
        realm->begin_transaction();
17✔
4087
        Object::create(c, realm, "TopLevel",
17✔
4088
                       util::Any(AnyDict{
16✔
4089
                           {"_id", test_obj_id_2},
17✔
4090
                           {"queryable_str_field", std::string{"baz"}},
17✔
4091
                       }));
17✔
4092
        realm->commit_transaction();
17✔
4093

9✔
4094
        error_received_pf.future.get();
17✔
4095
        realm->sync_session()->shutdown_and_wait();
17✔
4096
        config.sync_config->on_sync_client_event_hook = {};
16✔
4097
    }
17✔
4098

9✔
4099
    _impl::RealmCoordinator::clear_all_caches();
17✔
4100

9✔
4101
    std::mutex errors_mutex;
16✔
4102
    std::condition_variable new_compensating_write;
17✔
4103
    std::vector<std::pair<ObjectId, sync::version_type>> error_to_download_version;
16✔
4104
    std::vector<sync::CompensatingWriteErrorInfo> compensating_writes;
17✔
4105
    sync::version_type download_version;
17✔
4106

9✔
4107
    config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_session,
17✔
4108
                                                        const SyncClientHookData& data) mutable {
135✔
4109
        auto session = weak_session.lock();
134✔
4110
        if (!session) {
135✔
4111
            return SyncClientHookAction::NoAction;
8✔
4112
        }
8✔
4113

78✔
4114
        if (data.event != SyncClientHookEvent::ErrorMessageReceived) {
134✔
4115
            if (data.event == SyncClientHookEvent::DownloadMessageReceived) {
102✔
4116
                download_version = data.progress.download.server_version;
35✔
4117
            }
43✔
4118

60✔
4119
            return SyncClientHookAction::NoAction;
104✔
4120
        }
104✔
4121

16✔
4122
        auto error_code = sync::ProtocolError(data.error_info->raw_error_code);
38✔
4123
        REQUIRE(error_code == sync::ProtocolError::compensating_write);
38!
4124
        REQUIRE(!data.error_info->compensating_writes.empty());
32!
4125
        std::lock_guard<std::mutex> lk(errors_mutex);
34✔
4126
        for (const auto& compensating_write : data.error_info->compensating_writes) {
34✔
4127
            error_to_download_version.emplace_back(compensating_write.primary_key.get_object_id(),
34!
4128
                                                   *data.error_info->compensating_write_server_version);
34✔
4129
        }
34✔
4130

18✔
4131
        return SyncClientHookAction::NoAction;
34✔
4132
    };
34✔
4133

8✔
4134
    config.sync_config->error_handler = [&](std::shared_ptr<SyncSession>, SyncError error) {
34✔
4135
        std::unique_lock<std::mutex> lk(errors_mutex);
34✔
4136
        REQUIRE(error.status == ErrorCodes::SyncCompensatingWrite);
32!
4137
        for (const auto& compensating_write : error.compensating_writes_info) {
34✔
4138
            auto tracked_error = std::find_if(error_to_download_version.begin(), error_to_download_version.end(),
34✔
4139
                                              [&](const auto& pair) {
50!
4140
                                                  return pair.first == compensating_write.primary_key.get_object_id();
50✔
4141
                                              });
50✔
4142
            REQUIRE(tracked_error != error_to_download_version.end());
35!
4143
            CHECK(tracked_error->second <= download_version);
35!
4144
            compensating_writes.push_back(compensating_write);
35✔
4145
        }
34!
4146
        new_compensating_write.notify_one();
34!
4147
    };
34✔
4148

10✔
4149
    auto realm = Realm::get_shared_realm(config);
18✔
4150

10✔
4151
    wait_for_upload(*realm);
16✔
4152
    wait_for_download(*realm);
17✔
4153

8✔
4154
    std::unique_lock<std::mutex> lk(errors_mutex);
17✔
4155
    new_compensating_write.wait_for(lk, std::chrono::seconds(30), [&] {
17✔
4156
        return compensating_writes.size() == 2;
16✔
4157
    });
17✔
4158

9✔
4159
    REQUIRE(compensating_writes.size() == 2);
17!
4160
    auto& write_info = compensating_writes[0];
17✔
4161
    CHECK(write_info.primary_key.is_type(type_ObjectId));
16!
4162
    CHECK(write_info.primary_key.get_object_id() == test_obj_id_1);
17!
4163
    CHECK(write_info.object_name == "TopLevel");
17!
4164
    CHECK_THAT(write_info.reason, Catch::Matchers::ContainsSubstring("object is outside of the current query view"));
17!
4165

9!
4166
    write_info = compensating_writes[1];
17!
4167
    REQUIRE(write_info.primary_key.is_type(type_ObjectId));
17!
4168
    REQUIRE(write_info.primary_key.get_object_id() == test_obj_id_2);
16!
4169
    REQUIRE(write_info.object_name == "TopLevel");
17!
4170
    REQUIRE(write_info.reason ==
17!
4171
            util::format("write to ObjectID(\"%1\") in table \"TopLevel\" not allowed", test_obj_id_2));
17!
4172
    auto top_level_table = realm->read_group().get_table("class_TopLevel");
17!
4173
    REQUIRE(top_level_table->is_empty());
17!
4174
}
17✔
4175

1✔
4176
TEST_CASE("flx: bootstrap changesets are applied continuously", "[sync][flx][bootstrap][baas]") {
17!
4177
    FLXSyncTestHarness harness("flx_bootstrap_ordering", {g_large_array_schema, {"queryable_int_field"}});
17✔
4178
    fill_large_array_schema(harness);
16✔
4179

9✔
4180
    std::unique_ptr<std::thread> th;
17✔
4181
    sync::version_type user_commit_version = UINT_FAST64_MAX;
17✔
4182
    sync::version_type bootstrap_version = UINT_FAST64_MAX;
16✔
4183
    SharedRealm realm;
17✔
4184
    std::condition_variable cv;
17✔
4185
    std::mutex mutex;
17✔
4186
    bool allow_to_commit = false;
17✔
4187

9✔
4188
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
4189
    auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
17✔
4190
    auto shared_promise = std::make_shared<util::Promise<void>>(std::move(interrupted_promise));
16✔
4191
    config.sync_config->on_sync_client_event_hook =
17✔
4192
        [promise = std::move(shared_promise), &th, &realm, &user_commit_version, &bootstrap_version, &cv, &mutex,
17✔
4193
         &allow_to_commit](std::weak_ptr<SyncSession> weak_session, const SyncClientHookData& data) {
545✔
4194
            if (data.query_version == 0) {
545✔
4195
                return SyncClientHookAction::NoAction;
145✔
4196
            }
178✔
4197
            if (data.event != SyncClientHookEvent::DownloadMessageIntegrated) {
434✔
4198
                return SyncClientHookAction::NoAction;
313✔
4199
            }
313✔
4200
            auto session = weak_session.lock();
121✔
4201
            if (!session) {
115✔
4202
                return SyncClientHookAction::NoAction;
19✔
4203
            }
6✔
4204
            if (data.batch_state != sync::DownloadBatchState::MoreToCome) {
102✔
4205
                // Read version after bootstrap is done.
8✔
4206
                auto db = TestHelper::get_db(realm);
16✔
4207
                ReadTransaction rt(db);
22✔
4208
                bootstrap_version = rt.get_version();
16✔
4209
                {
17✔
4210
                    std::lock_guard<std::mutex> lock(mutex);
17✔
4211
                    allow_to_commit = true;
17✔
4212
                }
17✔
4213
                cv.notify_one();
17✔
4214
                session->force_close();
17✔
4215
                promise->emplace_value();
17✔
4216
                return SyncClientHookAction::NoAction;
17✔
4217
            }
17✔
4218

41✔
4219
            if (th) {
81✔
4220
                return SyncClientHookAction::NoAction;
65✔
4221
            }
64✔
4222

13✔
4223
            auto func = [&] {
20✔
4224
                // Attempt to commit a local change after the first bootstrap batch was committed.
12✔
4225
                auto db = TestHelper::get_db(realm);
16✔
4226
                WriteTransaction wt(db);
17✔
4227
                TableRef table = wt.get_table("class_TopLevel");
16✔
4228
                table->create_object_with_primary_key(ObjectId::gen());
17✔
4229
                {
17✔
4230
                    std::unique_lock<std::mutex> lock(mutex);
17✔
4231
                    // Wait to commit until we read the final bootstrap version.
9✔
4232
                    cv.wait(lock, [&] {
17✔
4233
                        return allow_to_commit;
17✔
4234
                    });
16✔
4235
                }
17✔
4236
                user_commit_version = wt.commit();
17✔
4237
            };
17✔
4238
            th = std::make_unique<std::thread>(std::move(func));
17✔
4239

9✔
4240
            return SyncClientHookAction::NoAction;
17✔
4241
        };
17✔
4242

8✔
4243
    realm = Realm::get_shared_realm(config);
17✔
4244
    auto table = realm->read_group().get_table("class_TopLevel");
17✔
4245
    Query query(table);
16✔
4246
    {
17✔
4247
        auto new_subs = realm->get_latest_subscription_set().make_mutable_copy();
17✔
4248
        new_subs.insert_or_assign(query);
17✔
4249
        new_subs.commit();
17✔
4250
    }
17✔
4251
    interrupted.get();
17✔
4252
    th->join();
17✔
4253

9✔
4254
    // The user commit is the last one.
9✔
4255
    CHECK(user_commit_version == bootstrap_version + 1);
17!
4256
}
16✔
4257

4258
TEST_CASE("flx: open realm + register subscription callback while bootstrapping",
1!
4259
          "[sync][flx][bootstrap][async open][baas]") {
97✔
4260
    FLXSyncTestHarness harness("flx_bootstrap_and_subscribe");
96✔
4261
    auto foo_obj_id = ObjectId::gen();
96✔
4262
    harness.load_initial_data([&](SharedRealm realm) {
102✔
4263
        CppContext c(realm);
102✔
4264
        Object::create(c, realm, "TopLevel",
102✔
4265
                       std::any(AnyDict{{"_id", foo_obj_id},
102✔
4266
                                        {"queryable_str_field", "foo"s},
102✔
4267
                                        {"queryable_int_field", static_cast<int64_t>(5)},
102✔
4268
                                        {"non_queryable_field", "created as initial data seed"s}}));
102✔
4269
    });
102✔
4270
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
102✔
4271

54✔
4272
    std::atomic<bool> subscription_invoked = false;
102✔
4273
    auto subscription_pf = util::make_promise_future<bool>();
102✔
4274
    // create a subscription to commit when realm is open for the first time or asked to rerun on open
48✔
4275
    auto init_subscription_callback_with_promise =
102✔
4276
        [&, promise_holder = util::CopyablePromiseHolder(std::move(subscription_pf.promise))](
102✔
4277
            std::shared_ptr<Realm> realm) mutable {
80✔
4278
            REQUIRE(realm);
70!
4279
            auto table = realm->read_group().get_table("class_TopLevel");
70✔
4280
            Query query(table);
68✔
4281
            auto subscription = realm->get_latest_subscription_set();
68!
4282
            auto mutable_subscription = subscription.make_mutable_copy();
68✔
4283
            mutable_subscription.insert_or_assign(query);
68✔
4284
            auto promise = promise_holder.get_promise();
68✔
4285
            mutable_subscription.commit();
68✔
4286
            subscription_invoked = true;
68✔
4287
            promise.emplace_value(true);
68✔
4288
        };
68✔
4289
    // verify that the subscription has changed the database
52✔
4290
    auto verify_subscription = [](SharedRealm realm) {
100✔
4291
        REQUIRE(realm);
100!
4292
        auto table_ref = realm->read_group().get_table("class_TopLevel");
96✔
4293
        REQUIRE(table_ref);
102!
4294
        REQUIRE(table_ref->get_column_count() == 4);
102!
4295
        REQUIRE(table_ref->get_column_key("_id"));
102!
4296
        REQUIRE(table_ref->get_column_key("queryable_str_field"));
102!
4297
        REQUIRE(table_ref->get_column_key("queryable_int_field"));
102!
4298
        REQUIRE(table_ref->get_column_key("non_queryable_field"));
102!
4299
        REQUIRE(table_ref->size() == 1);
102!
4300
        auto str_col = table_ref->get_column_key("queryable_str_field");
102!
4301
        REQUIRE(table_ref->get_object(0).get<String>(str_col) == "foo");
102!
4302
        return true;
102!
4303
    };
102✔
4304

54!
4305
    SECTION("Sync open") {
102✔
4306
        // sync open with subscription callback. Subscription will be run, since this is the first time that realm is
14✔
4307
        // opened
8✔
4308
        subscription_invoked = false;
22✔
4309
        config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
16✔
4310
        auto realm = Realm::get_shared_realm(config);
16✔
4311
        REQUIRE(subscription_pf.future.get());
17!
4312
        auto sb = realm->get_latest_subscription_set();
17✔
4313
        auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
17✔
4314
        auto state = future.get();
17!
4315
        REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
17!
4316
        realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
17✔
4317
        REQUIRE(verify_subscription(realm));
17!
4318
    }
17!
4319

49✔
4320
    SECTION("Sync Open + Async Open") {
97!
4321
        {
17✔
4322
            subscription_invoked = false;
16✔
4323
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
22✔
4324
            auto realm = Realm::get_shared_realm(config);
17✔
4325
            REQUIRE(subscription_pf.future.get());
17!
4326
            auto sb = realm->get_latest_subscription_set();
17✔
4327
            auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
17✔
4328
            auto state = future.get();
17!
4329
            REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
17!
4330
            realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
17✔
4331
            REQUIRE(verify_subscription(realm));
17!
4332
        }
17!
4333
        {
17✔
4334
            auto subscription_pf_async = util::make_promise_future<bool>();
17!
4335
            auto init_subscription_asyc_callback =
17✔
4336
                [promise_holder_async = util::CopyablePromiseHolder(std::move(subscription_pf_async.promise))](
17✔
4337
                    std::shared_ptr<Realm> realm) mutable {
17✔
4338
                    REQUIRE(realm);
17!
4339
                    auto table = realm->read_group().get_table("class_TopLevel");
17✔
4340
                    Query query(table);
17✔
4341
                    auto subscription = realm->get_latest_subscription_set();
17!
4342
                    auto mutable_subscription = subscription.make_mutable_copy();
17✔
4343
                    mutable_subscription.insert_or_assign(query);
17✔
4344
                    auto promise = promise_holder_async.get_promise();
17✔
4345
                    mutable_subscription.commit();
17✔
4346
                    promise.emplace_value(true);
17✔
4347
                };
17✔
4348
            auto open_realm_pf = util::make_promise_future<bool>();
17✔
4349
            auto open_realm_completed_callback =
17✔
4350
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
17✔
4351
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
17✔
4352
                    auto promise = promise_holder.get_promise();
17✔
4353
                    if (err)
17✔
4354
                        promise.emplace_value(false);
1✔
4355
                    else
17✔
4356
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
17✔
4357
                };
16✔
4358

9✔
4359
            config.sync_config->subscription_initializer = init_subscription_asyc_callback;
17✔
4360
            config.sync_config->rerun_init_subscription_on_open = true;
17✔
4361
            auto async_open = Realm::get_synchronized_realm(config);
16✔
4362
            async_open->start(open_realm_completed_callback);
17✔
4363
            REQUIRE(open_realm_pf.future.get());
17!
4364
            REQUIRE(subscription_pf_async.future.get());
17!
4365
            config.sync_config->rerun_init_subscription_on_open = false;
17✔
4366
            auto realm = Realm::get_shared_realm(config);
17!
4367
            REQUIRE(realm->get_latest_subscription_set().version() == 2);
17!
4368
            REQUIRE(realm->get_active_subscription_set().version() == 2);
17!
4369
        }
17✔
4370
    }
17!
4371

49!
4372
    SECTION("Async open") {
97✔
4373
        SECTION("Initial async open with no rerun on open set") {
65✔
4374
            // subscription will be run since this is the first time we are opening the realm file.
16✔
4375
            auto open_realm_pf = util::make_promise_future<bool>();
38✔
4376
            auto open_realm_completed_callback =
36✔
4377
                [&, promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
32✔
4378
                    ThreadSafeReference ref, std::exception_ptr err) mutable {
34✔
4379
                    auto promise = promise_holder.get_promise();
34✔
4380
                    if (err)
34✔
4381
                        promise.emplace_value(false);
2✔
4382
                    else
34✔
4383
                        promise.emplace_value(verify_subscription(Realm::get_shared_realm(std::move(ref))));
34✔
4384
                };
32✔
4385

18✔
4386
            config.sync_config->subscription_initializer = init_subscription_callback_with_promise;
34✔
4387
            auto async_open = Realm::get_synchronized_realm(config);
34✔
4388
            async_open->start(open_realm_completed_callback);
32✔
4389
            REQUIRE(open_realm_pf.future.get());
34!
4390
            REQUIRE(subscription_pf.future.get());
34!
4391

18✔
4392
            SECTION("rerun on open = false. Subscription not run") {
34!
4393
                subscription_invoked = false;
18!
4394
                auto async_open = Realm::get_synchronized_realm(config);
16✔
4395
                auto open_realm_pf = util::make_promise_future<bool>();
18✔
4396
                auto open_realm_completed_callback =
17✔
4397
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
17✔
4398
                        ThreadSafeReference, std::exception_ptr) mutable {
17✔
4399
                        // no need to verify if the subscription has changed the db, since it has not run as we test
9✔
4400
                        // below
9✔
4401
                        promise_holder.get_promise().emplace_value(true);
17✔
4402
                    };
16✔
4403
                async_open->start(open_realm_completed_callback);
16✔
4404
                REQUIRE(open_realm_pf.future.get());
17!
4405
                REQUIRE_FALSE(subscription_invoked.load());
17!
4406
            }
17✔
4407

17!
4408
            SECTION("rerun on open = true. Subscription not run cause realm already opened once") {
33!
4409
                subscription_invoked = false;
17✔
4410
                auto realm = Realm::get_shared_realm(config);
16✔
4411
                auto init_subscription = [&subscription_invoked](std::shared_ptr<Realm> realm) mutable {
10✔
4412
                    REQUIRE(realm);
1!
4413
                    auto table = realm->read_group().get_table("class_TopLevel");
1✔
UNCOV
4414
                    Query query(table);
×
4415
                    auto subscription = realm->get_latest_subscription_set();
×
4416
                    auto mutable_subscription = subscription.make_mutable_copy();
×
4417
                    mutable_subscription.insert_or_assign(query);
×
4418
                    mutable_subscription.commit();
×
4419
                    subscription_invoked.store(true);
×
4420
                };
×
4421
                config.sync_config->rerun_init_subscription_on_open = true;
16✔
4422
                config.sync_config->subscription_initializer = init_subscription;
16✔
4423
                auto async_open = Realm::get_synchronized_realm(config);
16✔
4424
                auto open_realm_pf = util::make_promise_future<bool>();
17✔
4425
                auto open_realm_completed_callback =
17✔
4426
                    [promise_holder = util::CopyablePromiseHolder(std::move(open_realm_pf.promise))](
17✔
4427
                        ThreadSafeReference, std::exception_ptr) mutable {
17✔
4428
                        // no need to verify if the subscription has changed the db, since it has not run as we test
9✔
4429
                        // below
9✔
4430
                        promise_holder.get_promise().emplace_value(true);
17✔
4431
                    };
16✔
4432
                async_open->start(open_realm_completed_callback);
16✔
4433
                REQUIRE(open_realm_pf.future.get());
17!
4434
                REQUIRE_FALSE(subscription_invoked.load());
17!
4435
                REQUIRE(realm->get_latest_subscription_set().version() == 1);
17!
4436
                REQUIRE(realm->get_active_subscription_set().version() == 1);
17!
4437
            }
17!
4438
        }
33!
4439

33!
4440
        SECTION("rerun on open set for multiple async open tasks (subscription runs only once)") {
65✔
4441
            auto init_subscription = [](std::shared_ptr<Realm> realm) mutable {
66✔
4442
                REQUIRE(realm);
64!
4443
                auto table = realm->read_group().get_table("class_TopLevel");
68✔
4444
                Query query(table);
68✔
4445
                auto subscription = realm->get_latest_subscription_set();
68!
4446
                auto mutable_subscription = subscription.make_mutable_copy();
68✔
4447
                mutable_subscription.insert_or_assign(query);
68✔
4448
                mutable_subscription.commit();
68✔
4449
            };
68✔
4450

20✔
4451
            auto open_task1_pf = util::make_promise_future<SharedRealm>();
36✔
4452
            auto open_task2_pf = util::make_promise_future<SharedRealm>();
36✔
4453
            auto open_callback1 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task1_pf.promise))](
32✔
4454
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
34✔
4455
                REQUIRE_FALSE(err);
34!
4456
                auto realm = Realm::get_shared_realm(std::move(ref));
34✔
4457
                REQUIRE(realm);
34!
4458
                promise_holder.get_promise().emplace_value(realm);
34!
4459
            };
34✔
4460
            auto open_callback2 = [promise_holder = util::CopyablePromiseHolder(std::move(open_task2_pf.promise))](
34!
4461
                                      ThreadSafeReference ref, std::exception_ptr err) mutable {
34✔
4462
                REQUIRE_FALSE(err);
34!
4463
                auto realm = Realm::get_shared_realm(std::move(ref));
34✔
4464
                REQUIRE(realm);
34!
4465
                promise_holder.get_promise().emplace_value(realm);
34!
4466
            };
34✔
4467

18!
4468
            config.sync_config->rerun_init_subscription_on_open = true;
34✔
4469
            config.sync_config->subscription_initializer = init_subscription;
34✔
4470

16✔
4471
            SECTION("Realm was already created, but we want to rerun on first open using multiple tasks") {
34✔
4472
                {
18✔
4473
                    subscription_invoked = false;
16✔
4474
                    auto realm = Realm::get_shared_realm(config);
18✔
4475
                    auto sb = realm->get_latest_subscription_set();
17✔
4476
                    auto future = sb.get_state_change_notification(realm::sync::SubscriptionSet::State::Complete);
17✔
4477
                    auto state = future.get();
17✔
4478
                    REQUIRE(state == realm::sync::SubscriptionSet::State::Complete);
17!
4479
                    realm->refresh(); // refresh is needed otherwise table_ref->size() would be 0
17✔
4480
                    REQUIRE(verify_subscription(realm));
17!
4481
                    REQUIRE(realm->get_latest_subscription_set().version() == 1);
17!
4482
                    REQUIRE(realm->get_active_subscription_set().version() == 1);
17!
4483
                }
17!
4484

9!
4485
                auto async_open_task1 = Realm::get_synchronized_realm(config);
17!
4486
                auto async_open_task2 = Realm::get_synchronized_realm(config);
17✔
4487
                async_open_task1->start(open_callback1);
16✔
4488
                async_open_task2->start(open_callback2);
17✔
4489

9✔
4490
                auto realm1 = open_task1_pf.future.get();
17✔
4491
                auto realm2 = open_task2_pf.future.get();
17✔
4492

8✔
4493
                const auto version_expected = 2;
17✔
4494
                auto r1_latest = realm1->get_latest_subscription_set().version();
17✔
4495
                auto r1_active = realm1->get_active_subscription_set().version();
16✔
4496
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
17!
4497
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
17!
4498
                REQUIRE(r1_latest == version_expected);
17!
4499
                REQUIRE(r1_active == version_expected);
17!
4500
            }
17!
4501
            SECTION("First time realm is created but opened via open async. Both tasks could run the subscription") {
33!
4502
                auto async_open_task1 = Realm::get_synchronized_realm(config);
17!
4503
                auto async_open_task2 = Realm::get_synchronized_realm(config);
17✔
4504
                async_open_task1->start(open_callback1);
18✔
4505
                async_open_task2->start(open_callback2);
17✔
4506
                auto realm1 = open_task1_pf.future.get();
17✔
4507
                auto realm2 = open_task2_pf.future.get();
17✔
4508

9✔
4509
                auto r1_latest = realm1->get_latest_subscription_set().version();
17✔
4510
                auto r1_active = realm1->get_active_subscription_set().version();
17✔
4511
                REQUIRE(realm2->get_latest_subscription_set().version() == r1_latest);
16!
4512
                REQUIRE(realm2->get_active_subscription_set().version() == r1_active);
17!
4513
                // the callback may be run twice, if task1 is the first task to open realm
9✔
4514
                // but it is scheduled after tasks2, which have opened realm later but
9!
4515
                // by the time it runs, subscription version is equal to 0 (realm creation).
9!
4516
                // This can only happen the first time that realm is created. All the other times
8✔
4517
                // the init_sb callback is guaranteed to run once.
8✔
4518
                REQUIRE(r1_latest >= 1);
16!
4519
                REQUIRE(r1_latest <= 2);
16!
4520
                REQUIRE(r1_active >= 1);
16!
4521
                REQUIRE(r1_active <= 2);
17!
4522
            }
17!
4523
        }
33!
4524
    }
65!
4525
}
97✔
4526
TEST_CASE("flx sync: Client reset during async open", "[sync][flx][client reset][async open][baas]") {
18✔
4527
    FLXSyncTestHarness harness("flx_bootstrap_reset");
20✔
4528
    auto foo_obj_id = ObjectId::gen();
22✔
4529
    std::atomic<bool> subscription_invoked = false;
17✔
4530
    harness.load_initial_data([&](SharedRealm realm) {
17✔
4531
        CppContext c(realm);
17✔
4532
        Object::create(c, realm, "TopLevel",
17✔
4533
                       std::any(AnyDict{{"_id", foo_obj_id},
17✔
4534
                                        {"queryable_str_field", "foo"s},
17✔
4535
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
4536
                                        {"non_queryable_field", "created as initial data seed"s}}));
17✔
4537
    });
17✔
4538
    SyncTestFile realm_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
4539

9✔
4540
    auto subscription_callback = [&](std::shared_ptr<Realm> realm) {
17✔
4541
        REQUIRE(realm);
17!
4542
        auto table = realm->read_group().get_table("class_TopLevel");
16✔
4543
        Query query(table);
17✔
4544
        auto subscription = realm->get_latest_subscription_set();
17!
4545
        auto mutable_subscription = subscription.make_mutable_copy();
17✔
4546
        mutable_subscription.insert_or_assign(query);
17✔
4547
        subscription_invoked = true;
17✔
4548
        mutable_subscription.commit();
17✔
4549
    };
17✔
4550

9✔
4551
    realm_config.sync_config->client_resync_mode = ClientResyncMode::Recover;
17✔
4552
    realm_config.sync_config->subscription_initializer = subscription_callback;
17✔
4553

8✔
4554
    bool client_reset_triggered = false;
17✔
4555
    realm_config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr<SyncSession> weak_sess,
17✔
4556
                                                              const SyncClientHookData& event_data) {
432✔
4557
        auto sess = weak_sess.lock();
433✔
4558
        if (!sess) {
433✔
4559
            return SyncClientHookAction::NoAction;
27✔
4560
        }
27✔
4561
        if (sess->path() != realm_config.path) {
459✔
4562
            return SyncClientHookAction::NoAction;
192✔
4563
        }
192✔
4564

147✔
4565
        if (event_data.event != SyncClientHookEvent::DownloadMessageReceived) {
252✔
4566
            return SyncClientHookAction::NoAction;
204✔
4567
        }
192✔
4568

39✔
4569
        if (client_reset_triggered) {
60✔
4570
            return SyncClientHookAction::NoAction;
44✔
4571
        }
32✔
4572

11✔
4573
        client_reset_triggered = true;
18✔
4574
        reset_utils::trigger_client_reset(harness.session().app_session(), *sess);
18✔
4575
        return SyncClientHookAction::SuspendWithRetryableError;
16✔
4576
    };
17✔
4577

9✔
4578
    auto before_callback_called = util::make_promise_future<void>();
17✔
4579
    realm_config.sync_config->notify_before_client_reset = [&](std::shared_ptr<Realm> realm) {
17✔
4580
        CHECK(realm->schema_version() == 0);
16!
4581
        before_callback_called.promise.emplace_value();
17✔
4582
    };
17✔
4583

9!
4584
    auto after_callback_called = util::make_promise_future<void>();
17✔
4585
    realm_config.sync_config->notify_after_client_reset = [&](std::shared_ptr<Realm> realm, ThreadSafeReference,
17✔
4586
                                                              bool) {
16✔
4587
        CHECK(realm->schema_version() == 0);
17!
4588
        after_callback_called.promise.emplace_value();
17✔
4589
    };
17✔
4590

9!
4591
    auto realm_task = Realm::get_synchronized_realm(realm_config);
17✔
4592
    auto realm_pf = util::make_promise_future<SharedRealm>();
17✔
4593
    realm_task->start([&](ThreadSafeReference ref, std::exception_ptr ex) {
16✔
4594
        auto& promise = realm_pf.promise;
17✔
4595
        try {
17✔
4596
            if (ex) {
17✔
4597
                std::rethrow_exception(ex);
1✔
4598
            }
1✔
4599
            promise.emplace_value(Realm::get_shared_realm(std::move(ref)));
17✔
4600
        }
16✔
4601
        catch (...) {
8✔
4602
            promise.set_error(exception_to_status());
1✔
4603
        }
1✔
4604
    });
16✔
4605
    auto realm = realm_pf.future.get();
16✔
4606
    before_callback_called.future.get();
16✔
4607
    after_callback_called.future.get();
17✔
4608
    REQUIRE(subscription_invoked.load());
17!
4609
}
17✔
4610

1✔
4611
// Test that resending pending subscription sets does not cause any inconsistencies in the progress cursors.
1!
4612
TEST_CASE("flx sync: resend pending subscriptions when reconnecting", "[sync][flx][baas]") {
17✔
4613
    FLXSyncTestHarness harness("flx_pending_subscriptions", {g_large_array_schema, {"queryable_int_field"}});
16✔
4614

8✔
4615
    std::vector<ObjectId> obj_ids_at_end = fill_large_array_schema(harness);
17✔
4616
    SyncTestFile interrupted_realm_config(harness.app()->current_user(), harness.schema(),
17✔
4617
                                          SyncConfig::FLXSyncEnabled{});
16✔
4618

9✔
4619
    {
17✔
4620
        auto pf = util::make_promise_future<void>();
17✔
4621
        Realm::Config config = interrupted_realm_config;
16✔
4622
        config.sync_config = std::make_shared<SyncConfig>(*interrupted_realm_config.sync_config);
17✔
4623
        config.sync_config->on_sync_client_event_hook =
17✔
4624
            [promise = util::CopyablePromiseHolder(std::move(pf.promise))](std::weak_ptr<SyncSession> weak_session,
17✔
4625
                                                                           const SyncClientHookData& data) mutable {
545✔
4626
                if (data.event != SyncClientHookEvent::BootstrapMessageProcessed &&
545✔
4627
                    data.event != SyncClientHookEvent::BootstrapProcessed) {
489✔
4628
                    return SyncClientHookAction::NoAction;
434✔
4629
                }
434✔
4630
                auto session = weak_session.lock();
171✔
4631
                if (!session) {
169✔
4632
                    return SyncClientHookAction::NoAction;
25✔
4633
                }
9✔
4634
                if (data.query_version != 1) {
153✔
4635
                    return SyncClientHookAction::NoAction;
32✔
4636
                }
32✔
4637

65✔
4638
                // Commit a subscriptions set whenever a bootstrap message is received for query version 1.
58✔
4639
                if (data.event == SyncClientHookEvent::BootstrapMessageProcessed) {
114✔
4640
                    auto latest_subs = session->get_flx_subscription_store()->get_latest().make_mutable_copy();
96✔
4641
                    latest_subs.commit();
96✔
4642
                    return SyncClientHookAction::NoAction;
103✔
4643
                }
102✔
4644
                // At least one subscription set was created.
14✔
4645
                CHECK(session->get_flx_subscription_store()->get_latest().version() > 1);
22!
4646
                promise.get_promise().emplace_value();
22✔
4647
                // Reconnect once query version 1 is bootstrapped.
8✔
4648
                return SyncClientHookAction::TriggerReconnect;
17!
4649
            };
17✔
4650

8✔
4651
        auto realm = Realm::get_shared_realm(config);
17✔
4652
        {
17✔
4653
            auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
16✔
4654
            auto table = realm->read_group().get_table("class_TopLevel");
17✔
4655
            mut_subs.insert_or_assign(Query(table));
17✔
4656
            mut_subs.commit();
17✔
4657
        }
17✔
4658
        pf.future.get();
17✔
4659
        realm->sync_session()->shutdown_and_wait();
17✔
4660
        realm->close();
17✔
4661
    }
17✔
4662

9✔
4663
    _impl::RealmCoordinator::assert_no_open_realms();
17✔
4664

9✔
4665
    // Check at least one subscription set needs to be resent.
8✔
4666
    {
17✔
4667
        DBOptions options;
16✔
4668
        options.encryption_key = test_util::crypt_key();
16✔
4669
        auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options);
17✔
4670
        auto sub_store = sync::SubscriptionStore::create(realm);
17✔
4671
        auto version_info = sub_store->get_version_info();
17✔
4672
        REQUIRE(version_info.latest > version_info.active);
17!
4673
    }
17✔
4674

9✔
4675
    // Resend the pending subscriptions.
9!
4676
    auto realm = Realm::get_shared_realm(interrupted_realm_config);
17✔
4677
    wait_for_upload(*realm);
16✔
4678
    wait_for_download(*realm);
16✔
4679
}
17✔
4680

1✔
4681
TEST_CASE("flx: fatal errors and session becoming inactive cancel pending waits", "[sync][flx][baas]") {
17✔
4682
    std::vector<ObjectSchema> schema{
17✔
4683
        {"TopLevel",
16✔
4684
         {
17✔
4685
             {"_id", PropertyType::ObjectId, Property::IsPrimary{true}},
17✔
4686
             {"queryable_int_field", PropertyType::Int | PropertyType::Nullable},
17✔
4687
         }},
17✔
4688
    };
17✔
4689

9✔
4690
    FLXSyncTestHarness harness("flx_cancel_pending_waits", {schema, {"queryable_int_field"}});
17✔
4691
    SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
17✔
4692

8✔
4693
    auto check_status = [](auto status) {
33✔
4694
        CHECK(!status.is_ok());
33!
4695
        std::string reason = status.get_status().reason();
32✔
4696
        // Subscription notification is cancelled either because the sync session is inactive, or because a fatal
18✔
4697
        // error is received from the server.
18!
4698
        if (reason.find("Sync session became inactive") == std::string::npos &&
34✔
4699
            reason.find("Invalid schema change (UPLOAD): non-breaking schema change: adding \"Int\" column at field "
24✔
4700
                        "\"other_col\" in schema \"TopLevel\", schema changes from clients are restricted when "
16✔
4701
                        "developer mode is disabled") == std::string::npos) {
10✔
4702
            FAIL(reason);
1✔
4703
        }
1✔
4704
    };
32✔
4705

8✔
4706
    auto create_subscription = [](auto realm) -> realm::sync::SubscriptionSet {
32✔
4707
        auto mut_subs = realm->get_latest_subscription_set().make_mutable_copy();
34✔
4708
        auto table = realm->read_group().get_table("class_TopLevel");
32✔
4709
        mut_subs.insert_or_assign(Query(table));
34✔
4710
        return mut_subs.commit();
34✔
4711
    };
34✔
4712

10✔
4713
    auto [error_occured_promise, error_occurred] = util::make_promise_future<void>();
18✔
4714
    config.sync_config->error_handler = [promise = util::CopyablePromiseHolder(std::move(error_occured_promise))](
18✔
4715
                                            std::shared_ptr<SyncSession>, SyncError) mutable {
16✔
4716
        promise.get_promise().emplace_value();
17✔
4717
    };
17✔
4718

9✔
4719
    auto realm = Realm::get_shared_realm(config);
17✔
4720
    wait_for_download(*realm);
17✔
4721

8✔
4722
    auto subs = create_subscription(realm);
17✔
4723
    auto subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
17✔
4724

8✔
4725
    realm->sync_session()->pause();
17✔
4726
    auto state = subs_future.get_no_throw();
17✔
4727
    check_status(state);
16✔
4728

9✔
4729
    auto [download_complete_promise, download_complete] = util::make_promise_future<void>();
17✔
4730
    realm->sync_session()->wait_for_upload_completion([promise = std::move(download_complete_promise)](auto) mutable {
17✔
4731
        promise.emplace_value();
16✔
4732
    });
17✔
4733
    schema[0].persisted_properties.push_back({"other_col", PropertyType::Int | PropertyType::Nullable});
17✔
4734
    realm->update_schema(schema);
17✔
4735

9✔
4736
    subs = create_subscription(realm);
17✔
4737
    subs_future = subs.get_state_change_notification(sync::SubscriptionSet::State::Complete);
17✔
4738

8✔
4739
    harness.load_initial_data([&](SharedRealm realm) {
17✔
4740
        CppContext c(realm);
17✔
4741
        Object::create(c, realm, "TopLevel",
16✔
4742
                       std::any(AnyDict{{"_id", ObjectId::gen()},
17✔
4743
                                        {"queryable_int_field", static_cast<int64_t>(5)},
17✔
4744
                                        {"other_col", static_cast<int64_t>(42)}}));
17✔
4745
    });
17✔
4746

9✔
4747
    realm->sync_session()->resume();
17✔
4748
    download_complete.get();
17✔
4749
    error_occurred.get();
16✔
4750
    state = subs_future.get_no_throw();
17✔
4751
    check_status(state);
17✔
4752
}
17✔
4753

1✔
4754
TEST_CASE("flx: pause and resume bootstrapping at query version 0", "[sync][flx][baas]") {
17✔
4755
    FLXSyncTestHarness harness("flx_pause_resume_bootstrap");
17✔
4756
    SyncTestFile triggered_config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{});
16✔
4757
    auto [interrupted_promise, interrupted] = util::make_promise_future<void>();
17✔
4758
    std::mutex download_message_mutex;
17✔
4759
    int download_message_integrated_count = 0;
17✔
4760
    triggered_config.sync_config->on_sync_client_event_hook =
17✔
4761
        [promise = util::CopyablePromiseHolder(std::move(interrupted_promise)), &download_message_integrated_count,
17✔
4762
         &download_message_mutex](std::weak_ptr<SyncSession> weak_sess, const SyncClientHookData& data) mutable {
177✔
4763
            auto sess = weak_sess.lock();
177✔
4764
            if (!sess || data.event != SyncClientHookEvent::DownloadMessageIntegrated) {
177✔
4765
                return SyncClientHookAction::NoAction;
155✔
4766
            }
155✔
4767

27✔
4768
            std::lock_guard<std::mutex> lk(download_message_mutex);
41✔
4769
            // Pause and resume the first session after the bootstrap message is integrated.
25✔
4770
            if (download_message_integrated_count == 0) {
32✔
4771
                sess->pause();
18✔
4772
                sess->resume();
16✔
4773
            }
18✔
4774
            // Complete the test when the second session integrates the empty download
9✔
4775
            // message it receives.
9✔
4776
            else {
17✔
4777
                promise.get_promise().emplace_value();
16✔
4778
            }
16✔
4779
            ++download_message_integrated_count;
33✔
4780
            return SyncClientHookAction::NoAction;
33✔
4781
        };
33✔
4782
    auto realm = Realm::get_shared_realm(triggered_config);
18✔
4783
    interrupted.get();
18✔
4784
    std::lock_guard<std::mutex> lk(download_message_mutex);
18✔
4785
    CHECK(download_message_integrated_count == 2);
17!
4786
    auto active_sub_set = realm->sync_session()->get_flx_subscription_store()->get_active();
17✔
4787
    REQUIRE(active_sub_set.version() == 0);
17!
4788
    REQUIRE(active_sub_set.state() == sync::SubscriptionSet::State::Complete);
17!
4789
}
17✔
4790

1!
4791
} // namespace realm::app
1!
4792

1✔
4793
#endif // REALM_ENABLE_AUTH_TESTS
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc