• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1817

04 Nov 2023 12:29AM UTC coverage: 91.695% (+0.04%) from 91.66%
1817

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92122 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

59 existing lines in 12 files now uncovered.

230819 of 251726 relevant lines covered (91.69%)

6481779.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.41
/src/realm/sync/subscriptions.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2021 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/subscriptions.hpp"
20

21
#include "external/json/json.hpp"
22

23
#include "realm/data_type.hpp"
24
#include "realm/keys.hpp"
25
#include "realm/list.hpp"
26
#include "realm/sort_descriptor.hpp"
27
#include "realm/sync/noinst/sync_metadata_schema.hpp"
28
#include "realm/table.hpp"
29
#include "realm/table_view.hpp"
30
#include "realm/transaction.hpp"
31
#include "realm/util/flat_map.hpp"
32

33
#include <algorithm>
34
#include <initializer_list>
35
#include <stdexcept>
36

37
namespace realm::sync {
38
namespace {
39
// Schema version history:
40
//   v2: Initial public beta.
41

42
constexpr static int c_flx_schema_version = 2;
43
constexpr static std::string_view c_flx_subscription_sets_table("flx_subscription_sets");
44
constexpr static std::string_view c_flx_subscriptions_table("flx_subscriptions");
45

46
constexpr static std::string_view c_flx_sub_sets_state_field("state");
47
constexpr static std::string_view c_flx_sub_sets_version_field("version");
48
constexpr static std::string_view c_flx_sub_sets_error_str_field("error");
49
constexpr static std::string_view c_flx_sub_sets_subscriptions_field("subscriptions");
50
constexpr static std::string_view c_flx_sub_sets_snapshot_version_field("snapshot_version");
51

52
constexpr static std::string_view c_flx_sub_id_field("id");
53
constexpr static std::string_view c_flx_sub_created_at_field("created_at");
54
constexpr static std::string_view c_flx_sub_updated_at_field("updated_at");
55
constexpr static std::string_view c_flx_sub_name_field("name");
56
constexpr static std::string_view c_flx_sub_object_class_field("object_class");
57
constexpr static std::string_view c_flx_sub_query_str_field("query");
58

59
using OptionalString = util::Optional<std::string>;
60

61
enum class SubscriptionStateForStorage : int64_t {
62
    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
63
    Pending = 1,
64
    // The server is currently sending the initial state that represents this subscription set to the client.
65
    Bootstrapping = 2,
66
    // This subscription set is the active subscription set that is currently being synchronized with the server.
67
    Complete = 3,
68
    // An error occurred while processing this subscription set on the server. Check error_str() for details.
69
    Error = 4,
70
    // The last bootstrap message containing the initial state for this subscription set has been received. The
71
    // client is awaiting a mark message to mark this subscription as fully caught up to history.
72
    AwaitingMark = 6,
73
};
74

75
SubscriptionSet::State state_from_storage(int64_t value)
76
{
9,414✔
77
    switch (static_cast<SubscriptionStateForStorage>(value)) {
9,414✔
78
        case SubscriptionStateForStorage::Pending:
5,274✔
79
            return SubscriptionSet::State::Pending;
5,274✔
80
        case SubscriptionStateForStorage::Bootstrapping:
124✔
81
            return SubscriptionSet::State::Bootstrapping;
124✔
82
        case SubscriptionStateForStorage::AwaitingMark:
1,474✔
83
            return SubscriptionSet::State::AwaitingMark;
1,474✔
84
        case SubscriptionStateForStorage::Complete:
2,506✔
85
            return SubscriptionSet::State::Complete;
2,506✔
86
        case SubscriptionStateForStorage::Error:
36✔
87
            return SubscriptionSet::State::Error;
36✔
88
        default:
✔
89
            throw std::runtime_error(util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
×
90
    }
9,414✔
91
}
9,414✔
92

93
int64_t state_to_storage(SubscriptionSet::State state)
94
{
28,014✔
95
    switch (state) {
28,014✔
96
        case SubscriptionSet::State::Pending:
10,316✔
97
            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
10,316✔
98
        case SubscriptionSet::State::Bootstrapping:
9,250✔
99
            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
9,250✔
100
        case SubscriptionSet::State::AwaitingMark:
4,438✔
101
            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
4,438✔
102
        case SubscriptionSet::State::Complete:
3,990✔
103
            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
3,990✔
104
        case SubscriptionSet::State::Error:
20✔
105
            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
20✔
106
        default:
✔
107
            REALM_UNREACHABLE();
108
    }
28,014✔
109
}
28,014✔
110

111
size_t state_to_order(SubscriptionSet::State needle)
112
{
4,016✔
113
    using State = SubscriptionSet::State;
4,016✔
114
    switch (needle) {
4,016✔
115
        case State::Uncommitted:
24✔
116
            return 0;
24✔
117
        case State::Pending:
696✔
118
            return 1;
696✔
119
        case State::Bootstrapping:
56✔
120
            return 2;
56✔
121
        case State::AwaitingMark:
572✔
122
            return 3;
572✔
123
        case State::Complete:
2,664✔
124
            return 4;
2,664✔
125
        case State::Error:
✔
126
            return 5;
×
127
        case State::Superseded:
4✔
128
            return 6;
4✔
129
    }
×
130
    REALM_UNREACHABLE();
131
}
×
132

133
} // namespace
134

135
Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
136
    : id(obj.get<ObjectId>(parent->m_sub_id))
137
    , created_at(obj.get<Timestamp>(parent->m_sub_created_at))
138
    , updated_at(obj.get<Timestamp>(parent->m_sub_updated_at))
139
    , name(obj.is_null(parent->m_sub_name) ? OptionalString(util::none)
140
                                           : OptionalString{obj.get<String>(parent->m_sub_name)})
141
    , object_class_name(obj.get<String>(parent->m_sub_object_class_name))
142
    , query_string(obj.get<String>(parent->m_sub_query_str))
143
{
7,466✔
144
}
7,466✔
145

146
Subscription::Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str)
147
    : id(ObjectId::gen())
148
    , created_at(std::chrono::system_clock::now())
149
    , updated_at(created_at)
150
    , name(std::move(name))
151
    , object_class_name(std::move(object_class_name))
152
    , query_string(std::move(query_str))
153
{
942✔
154
}
942✔
155

156

157
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, const Obj& obj,
158
                                 MakingMutableCopy making_mutable_copy)
159
    : m_mgr(mgr)
160
    , m_cur_version(tr.get_version())
161
    , m_version(obj.get_primary_key().get_int())
162
    , m_obj_key(obj.get_key())
163
{
10,696✔
164
    REALM_ASSERT(obj.is_valid());
10,696✔
165
    if (!making_mutable_copy) {
10,696✔
166
        load_from_database(obj);
9,414✔
167
    }
9,414✔
168
}
10,696✔
169

170
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag)
171
    : m_mgr(mgr)
172
    , m_version(version)
173
    , m_state(State::Superseded)
174
{
8✔
175
}
8✔
176

177
void SubscriptionSet::load_from_database(const Obj& obj)
178
{
9,414✔
179
    auto mgr = get_flx_subscription_store(); // Throws
9,414✔
180

4,710✔
181
    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
9,414✔
182
    m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
9,414✔
183
    m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
9,414✔
184
    auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
9,414✔
185
    m_subs.clear();
9,414✔
186
    for (size_t idx = 0; idx < sub_list.size(); ++idx) {
16,880✔
187
        m_subs.push_back(Subscription(mgr.get(), sub_list.get_object(idx)));
7,466✔
188
    }
7,466✔
189
}
9,414✔
190

191
std::shared_ptr<const SubscriptionStore> SubscriptionSet::get_flx_subscription_store() const
192
{
19,664✔
193
    if (auto mgr = m_mgr.lock()) {
19,664✔
194
        return mgr;
19,660✔
195
    }
19,660✔
196
    throw std::logic_error("Active SubscriptionSet without a SubscriptionStore");
4✔
197
}
4✔
198

199
int64_t SubscriptionSet::version() const
200
{
16,010✔
201
    return m_version;
16,010✔
202
}
16,010✔
203

204
DB::version_type SubscriptionSet::snapshot_version() const
205
{
1,278✔
206
    return m_snapshot_version;
1,278✔
207
}
1,278✔
208

209
SubscriptionSet::State SubscriptionSet::state() const
210
{
10,038✔
211
    return m_state;
10,038✔
212
}
10,038✔
213

214
StringData SubscriptionSet::error_str() const
215
{
852✔
216
    if (m_error_str.empty()) {
852✔
217
        return StringData{};
824✔
218
    }
824✔
219
    return m_error_str;
28✔
220
}
28✔
221

222
size_t SubscriptionSet::size() const
223
{
296✔
224
    return m_subs.size();
296✔
225
}
296✔
226

227
const Subscription& SubscriptionSet::at(size_t index) const
228
{
28✔
229
    return m_subs.at(index);
28✔
230
}
28✔
231

232
SubscriptionSet::const_iterator SubscriptionSet::begin() const
233
{
3,720✔
234
    return m_subs.begin();
3,720✔
235
}
3,720✔
236

237
SubscriptionSet::const_iterator SubscriptionSet::end() const
238
{
4,706✔
239
    return m_subs.end();
4,706✔
240
}
4,706✔
241

242
const Subscription* SubscriptionSet::find(StringData name) const
243
{
64✔
244
    for (auto&& sub : *this) {
92✔
245
        if (sub.name == name)
92✔
246
            return &sub;
48✔
247
    }
92✔
248
    return nullptr;
40✔
249
}
64✔
250

251
const Subscription* SubscriptionSet::find(const Query& query) const
252
{
48✔
253
    const auto query_desc = query.get_description();
48✔
254
    const auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
48✔
255
    for (auto&& sub : *this) {
60✔
256
        if (sub.object_class_name == table_name && sub.query_string == query_desc)
60✔
257
            return &sub;
48✔
258
    }
60✔
259
    return nullptr;
24✔
260
}
48✔
261

262
MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
263
                                               MakingMutableCopy making_mutable_copy)
264
    : SubscriptionSet(mgr, *tr, obj, making_mutable_copy)
265
    , m_tr(std::move(tr))
266
    , m_obj(std::move(obj))
267
    , m_old_state(state())
268
{
3,440✔
269
}
3,440✔
270

271
void MutableSubscriptionSet::check_is_mutable() const
272
{
4,460✔
273
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
4,460✔
274
        throw WrongTransactionState("Not a write transaction");
12✔
275
    }
12✔
276
}
4,460✔
277

278
// This uses the 'swap and pop' idiom to run in constant time.
279
// The iterator returned is:
280
//  1. end(), if the last subscription is removed
281
//  2. same iterator it is passed (but pointing to the last subscription in set), otherwise
282
MutableSubscriptionSet::iterator MutableSubscriptionSet::erase(const_iterator it)
283
{
56✔
284
    check_is_mutable();
56✔
285
    REALM_ASSERT(it != end());
56✔
286
    if (it == std::prev(m_subs.end())) {
56✔
287
        m_subs.pop_back();
24✔
288
        return end();
24✔
289
    }
24✔
290
    auto back = std::prev(m_subs.end());
32✔
291
    // const_iterator to iterator in constant time (See https://stackoverflow.com/a/10669041)
16✔
292
    auto iterator = m_subs.erase(it, it);
32✔
293
    std::swap(*iterator, *back);
32✔
294
    m_subs.pop_back();
32✔
295
    return iterator;
32✔
296
}
32✔
297

298
bool MutableSubscriptionSet::erase(StringData name)
299
{
12✔
300
    check_is_mutable();
12✔
301
    auto ptr = find(name);
12✔
302
    if (!ptr)
12✔
303
        return false;
4✔
304
    auto it = m_subs.begin() + (ptr - &m_subs.front());
8✔
305
    erase(it);
8✔
306
    return true;
8✔
307
}
8✔
308

309
bool MutableSubscriptionSet::erase(const Query& query)
310
{
24✔
311
    check_is_mutable();
24✔
312
    auto ptr = find(query);
24✔
313
    if (!ptr)
24✔
314
        return false;
×
315
    auto it = m_subs.begin() + (ptr - &m_subs.front());
24✔
316
    erase(it);
24✔
317
    return true;
24✔
318
}
24✔
319

320
bool MutableSubscriptionSet::erase_by_class_name(StringData object_class_name)
321
{
16✔
322
    // TODO: Use std::erase_if when switching to C++20.
8✔
323
    auto it = std::remove_if(m_subs.begin(), m_subs.end(), [&object_class_name](const Subscription& sub) {
36✔
324
        return sub.object_class_name == object_class_name;
36✔
325
    });
36✔
326
    auto erased = end() - it;
16✔
327
    m_subs.erase(it, end());
16✔
328
    return erased > 0;
16✔
329
}
16✔
330

331
bool MutableSubscriptionSet::erase_by_id(ObjectId id)
332
{
12✔
333
    auto it = std::find_if(m_subs.begin(), m_subs.end(), [&id](const Subscription& sub) -> bool {
20✔
334
        return sub.id == id;
20✔
335
    });
20✔
336
    if (it == end()) {
12✔
337
        return false;
4✔
338
    }
4✔
339
    erase(it);
8✔
340
    return true;
8✔
341
}
8✔
342

343
void MutableSubscriptionSet::clear()
344
{
376✔
345
    check_is_mutable();
376✔
346
    m_subs.clear();
376✔
347
}
376✔
348

349
void MutableSubscriptionSet::insert_sub(const Subscription& sub)
350
{
796✔
351
    check_is_mutable();
796✔
352
    m_subs.push_back(sub);
796✔
353
}
796✔
354

355
std::pair<SubscriptionSet::iterator, bool>
356
MutableSubscriptionSet::insert_or_assign_impl(iterator it, util::Optional<std::string> name,
357
                                              std::string object_class_name, std::string query_str)
358
{
938✔
359
    check_is_mutable();
938✔
360
    if (it != end()) {
938✔
361
        auto& sub = m_subs[it - begin()];
40✔
362
        sub.object_class_name = std::move(object_class_name);
40✔
363
        sub.query_string = std::move(query_str);
40✔
364
        sub.updated_at = Timestamp{std::chrono::system_clock::now()};
40✔
365

20✔
366
        return {it, false};
40✔
367
    }
40✔
368
    it = m_subs.insert(m_subs.end(),
898✔
369
                       Subscription(std::move(name), std::move(object_class_name), std::move(query_str)));
898✔
370

448✔
371
    return {it, true};
898✔
372
}
898✔
373

374
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(std::string_view name,
375
                                                                                    const Query& query)
376
{
140✔
377
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
140✔
378
    auto query_str = query.get_description();
140✔
379
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
118✔
380
        return sub.name == name;
96✔
381
    });
96✔
382

70✔
383
    return insert_or_assign_impl(it, std::string{name}, std::move(table_name), std::move(query_str));
140✔
384
}
140✔
385

386
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(const Query& query)
387
{
798✔
388
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
798✔
389
    auto query_str = query.get_description();
798✔
390
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
506✔
391
        return (!sub.name && sub.object_class_name == table_name && sub.query_string == query_str);
216✔
392
    });
216✔
393

398✔
394
    return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str));
798✔
395
}
798✔
396

397
void MutableSubscriptionSet::import(const SubscriptionSet& src_subs)
398
{
128✔
399
    clear();
128✔
400
    for (const Subscription& sub : src_subs) {
132✔
401
        insert_sub(sub);
132✔
402
    }
132✔
403
}
128✔
404

405
void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::string_view> error_str)
406
{
2,258✔
407
    check_is_mutable();
2,258✔
408
    auto old_state = state();
2,258✔
409
    if (error_str && new_state != State::Error) {
2,258✔
410
        throw std::logic_error("Cannot supply an error message for a subscription set when state is not Error");
×
411
    }
×
412
    switch (new_state) {
2,258✔
413
        case State::Uncommitted:
✔
414
            throw std::logic_error("cannot set subscription set state to uncommitted");
×
415

416
        case State::Error:
20✔
417
            if (old_state != State::Bootstrapping && old_state != State::Pending && old_state != State::Uncommitted) {
20!
418
                throw std::logic_error(
×
419
                    "subscription set must be in Bootstrapping or Pending to update state to error");
×
420
            }
×
421
            if (!error_str) {
20✔
422
                throw std::logic_error("Must supply an error message when setting a subscription to the error state");
×
423
            }
×
424

10✔
425
            m_state = new_state;
20✔
426
            m_error_str = std::string{*error_str};
20✔
427
            break;
20✔
428
        case State::Bootstrapping:
52✔
429
            [[fallthrough]];
52✔
430
        case State::AwaitingMark:
822✔
431
            m_state = new_state;
822✔
432
            break;
822✔
433
        case State::Complete: {
1,416✔
434
            auto mgr = get_flx_subscription_store(); // Throws
1,416✔
435
            m_state = new_state;
1,416✔
436
            mgr->supercede_prior_to(m_tr, version());
1,416✔
437
            break;
1,416✔
438
        }
52✔
439
        case State::Superseded:
26✔
440
            throw std::logic_error("Cannot set a subscription to the superseded state");
×
441
            break;
26✔
442
        case State::Pending:
26✔
443
            throw std::logic_error("Cannot set subscription set to the pending state");
×
444
            break;
26✔
445
    }
2,258✔
446
}
2,258✔
447

448
MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const
449
{
1,282✔
450
    auto mgr = get_flx_subscription_store(); // Throws
1,282✔
451
    return mgr->make_mutable_copy(*this);
1,282✔
452
}
1,282✔
453

454
void SubscriptionSet::refresh()
455
{
24✔
456
    auto mgr = get_flx_subscription_store(); // Throws
24✔
457
    if (mgr->would_refresh(m_cur_version)) {
24✔
458
        *this = mgr->get_refreshed(m_obj_key, version());
20✔
459
    }
20✔
460
}
24✔
461

462
util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
463
{
792✔
464
    auto mgr = get_flx_subscription_store(); // Throws
792✔
465

396✔
466
    std::unique_lock<std::mutex> lk(mgr->m_pending_notifications_mutex);
792✔
467
    // If we've already been superceded by another version getting completed, then we should skip registering
396✔
468
    // a notification because it may never fire.
396✔
469
    if (mgr->m_min_outstanding_version > version()) {
792✔
470
        return util::Future<State>::make_ready(State::Superseded);
4✔
471
    }
4✔
472

394✔
473
    // Begin by blocking process_notifications from starting to fill futures. No matter the outcome, we'll
394✔
474
    // unblock process_notifications() at the end of this function via the guard we construct below.
394✔
475
    mgr->m_outstanding_requests++;
788✔
476
    auto guard = util::make_scope_exit([&]() noexcept {
788✔
477
        if (!lk.owns_lock()) {
788✔
478
            lk.lock();
92✔
479
        }
92✔
480
        --mgr->m_outstanding_requests;
788✔
481
        mgr->m_pending_notifications_cv.notify_one();
788✔
482
    });
788✔
483
    lk.unlock();
788✔
484

394✔
485
    State cur_state = state();
788✔
486
    StringData err_str = error_str();
788✔
487

394✔
488
    // If there have been writes to the database since this SubscriptionSet was created, we need to fetch
394✔
489
    // the updated version from the DB to know the true current state and maybe return a ready future.
394✔
490
    if (m_cur_version < mgr->m_db->get_version_of_latest_snapshot()) {
788✔
491
        auto refreshed_self = mgr->get_refreshed(m_obj_key, version());
28✔
492
        cur_state = refreshed_self.state();
28✔
493
        err_str = refreshed_self.error_str();
28✔
494
    }
28✔
495
    // If we've already reached the desired state, or if the subscription is in an error state,
394✔
496
    // we can return a ready future immediately.
394✔
497
    if (cur_state == State::Error) {
788✔
498
        return util::Future<State>::make_ready(Status{ErrorCodes::SubscriptionFailed, err_str});
4✔
499
    }
4✔
500
    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
784✔
501
        return util::Future<State>::make_ready(cur_state);
88✔
502
    }
88✔
503

348✔
504
    // Otherwise put in a new request to be filled in by process_notifications().
348✔
505
    lk.lock();
696✔
506

348✔
507
    // Otherwise, make a promise/future pair and add it to the list of pending notifications.
348✔
508
    auto [promise, future] = util::make_promise_future<State>();
696✔
509
    mgr->m_pending_notifications.emplace_back(version(), std::move(promise), notify_when);
696✔
510
    return std::move(future);
696✔
511
}
696✔
512

513
void SubscriptionSet::get_state_change_notification(
514
    State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> cb) const
515
{
×
516
    get_state_change_notification(notify_when).get_async([cb = std::move(cb)](StatusWith<State> result) {
×
517
        if (result.is_ok()) {
×
518
            cb(result.get_value(), {});
×
519
        }
×
520
        else {
×
521
            cb({}, result.get_status());
×
522
        }
×
523
    });
×
524
}
×
525

526
void MutableSubscriptionSet::process_notifications()
527
{
3,368✔
528
    auto mgr = get_flx_subscription_store(); // Throws
3,368✔
529
    auto new_state = state();
3,368✔
530
    auto my_version = version();
3,368✔
531

1,684✔
532
    std::list<SubscriptionStore::NotificationRequest> to_finish;
3,368✔
533
    std::unique_lock<std::mutex> lk(mgr->m_pending_notifications_mutex);
3,368✔
534
    mgr->m_pending_notifications_cv.wait(lk, [&] {
3,368✔
535
        return mgr->m_outstanding_requests == 0;
3,368✔
536
    });
3,368✔
537

1,684✔
538
    for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
5,096✔
539
        if ((it->version == my_version &&
1,728✔
540
             (new_state == State::Error || state_to_order(new_state) >= state_to_order(it->notify_when))) ||
1,486✔
541
            (new_state == State::Complete && it->version < my_version)) {
1,406✔
542
            to_finish.splice(to_finish.end(), mgr->m_pending_notifications, it++);
656✔
543
        }
656✔
544
        else {
1,072✔
545
            ++it;
1,072✔
546
        }
1,072✔
547
    }
1,728✔
548

1,684✔
549
    if (new_state == State::Complete) {
3,368✔
550
        mgr->m_min_outstanding_version = my_version;
1,416✔
551
    }
1,416✔
552

1,684✔
553
    lk.unlock();
3,368✔
554

1,684✔
555
    for (auto& req : to_finish) {
2,012✔
556
        if (new_state == State::Error && req.version == my_version) {
656✔
557
            req.promise.set_error({ErrorCodes::SubscriptionFailed, std::string_view(error_str())});
20✔
558
        }
20✔
559
        else if (req.version < my_version) {
636✔
560
            req.promise.emplace_value(State::Superseded);
12✔
561
        }
12✔
562
        else {
624✔
563
            req.promise.emplace_value(new_state);
624✔
564
        }
624✔
565
    }
656✔
566
}
3,368✔
567

568
SubscriptionSet MutableSubscriptionSet::commit()
569
{
3,368✔
570
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
3,368✔
571
        throw std::logic_error("SubscriptionSet is not in a commitable state");
×
572
    }
×
573
    auto mgr = get_flx_subscription_store(); // Throws
3,368✔
574

1,684✔
575
    if (m_old_state == State::Uncommitted) {
3,368✔
576
        if (m_state == State::Uncommitted) {
1,214✔
577
            m_state = State::Pending;
1,114✔
578
        }
1,114✔
579
        m_obj.set(mgr->m_sub_set_snapshot_version, static_cast<int64_t>(m_tr->get_version()));
1,214✔
580

606✔
581
        auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions);
1,214✔
582
        obj_sub_list.clear();
1,214✔
583
        for (const auto& sub : m_subs) {
1,350✔
584
            auto new_sub =
1,350✔
585
                obj_sub_list.create_and_insert_linked_object(obj_sub_list.is_empty() ? 0 : obj_sub_list.size());
1,246✔
586
            new_sub.set(mgr->m_sub_id, sub.id);
1,350✔
587
            new_sub.set(mgr->m_sub_created_at, sub.created_at);
1,350✔
588
            new_sub.set(mgr->m_sub_updated_at, sub.updated_at);
1,350✔
589
            if (sub.name) {
1,350✔
590
                new_sub.set(mgr->m_sub_name, StringData(*sub.name));
172✔
591
            }
172✔
592
            new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name));
1,350✔
593
            new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string));
1,350✔
594
        }
1,350✔
595
    }
1,214✔
596
    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
3,368✔
597
    if (!m_error_str.empty()) {
3,368✔
598
        m_obj.set(mgr->m_sub_set_error_str, StringData(m_error_str));
20✔
599
    }
20✔
600

1,684✔
601
    const auto flx_version = version();
3,368✔
602
    m_tr->commit_and_continue_as_read();
3,368✔
603

1,684✔
604
    process_notifications();
3,368✔
605

1,684✔
606
    return mgr->get_refreshed(m_obj.get_key(), flx_version, m_tr->get_version_of_current_transaction());
3,368✔
607
}
3,368✔
608

609
std::string SubscriptionSet::to_ext_json() const
610
{
1,826✔
611
    if (m_subs.empty()) {
1,826✔
612
        return "{}";
698✔
613
    }
698✔
614

566✔
615
    util::FlatMap<std::string, std::vector<std::string>> table_to_query;
1,128✔
616
    for (const auto& sub : *this) {
1,276✔
617
        std::string table_name(sub.object_class_name);
1,276✔
618
        auto& queries_for_table = table_to_query.at(table_name);
1,276✔
619
        auto query_it = std::find(queries_for_table.begin(), queries_for_table.end(), sub.query_string);
1,276✔
620
        if (query_it != queries_for_table.end()) {
1,276✔
621
            continue;
8✔
622
        }
8✔
623
        queries_for_table.emplace_back(sub.query_string);
1,268✔
624
    }
1,268✔
625

566✔
626
    if (table_to_query.empty()) {
1,128✔
627
        return "{}";
×
628
    }
×
629

566✔
630
    // TODO this is pulling in a giant compile-time dependency. We should have a better way of escaping the
566✔
631
    // query strings into a json object.
566✔
632
    nlohmann::json output_json;
1,128✔
633
    for (auto& table : table_to_query) {
1,208✔
634
        // We want to make sure that the queries appear in some kind of canonical order so that if there are
606✔
635
        // two subscription sets with the same subscriptions in different orders, the server doesn't have to
606✔
636
        // waste a bunch of time re-running the queries for that table.
606✔
637
        std::stable_sort(table.second.begin(), table.second.end());
1,208✔
638

606✔
639
        bool is_first = true;
1,208✔
640
        std::ostringstream obuf;
1,208✔
641
        for (const auto& query_str : table.second) {
1,268✔
642
            if (!is_first) {
1,268✔
643
                obuf << " OR ";
60✔
644
            }
60✔
645
            is_first = false;
1,268✔
646
            obuf << "(" << query_str << ")";
1,268✔
647
        }
1,268✔
648
        output_json[table.first] = obuf.str();
1,208✔
649
    }
1,208✔
650

566✔
651
    return output_json.dump();
1,128✔
652
}
1,128✔
653

654
namespace {
655
class SubscriptionStoreInit : public SubscriptionStore {
656
public:
657
    explicit SubscriptionStoreInit(DBRef db)
658
        : SubscriptionStore(std::move(db))
659
    {
912✔
660
    }
912✔
661
};
662
} // namespace
663

664
SubscriptionStoreRef SubscriptionStore::create(DBRef db)
665
{
912✔
666
    return std::make_shared<SubscriptionStoreInit>(std::move(db));
912✔
667
}
912✔
668

669
SubscriptionStore::SubscriptionStore(DBRef db)
670
    : m_db(std::move(db))
671
{
912✔
672
    std::vector<SyncMetadataTable> internal_tables{
912✔
673
        {&m_sub_set_table,
912✔
674
         c_flx_subscription_sets_table,
912✔
675
         {&m_sub_set_version_num, c_flx_sub_sets_version_field, type_Int},
912✔
676
         {
912✔
677
             {&m_sub_set_state, c_flx_sub_sets_state_field, type_Int},
912✔
678
             {&m_sub_set_snapshot_version, c_flx_sub_sets_snapshot_version_field, type_Int},
912✔
679
             {&m_sub_set_error_str, c_flx_sub_sets_error_str_field, type_String, true},
912✔
680
             {&m_sub_set_subscriptions, c_flx_sub_sets_subscriptions_field, c_flx_subscriptions_table, true},
912✔
681
         }},
912✔
682
        {&m_sub_table,
912✔
683
         c_flx_subscriptions_table,
912✔
684
         SyncMetadataTable::IsEmbeddedTag{},
912✔
685
         {
912✔
686
             {&m_sub_id, c_flx_sub_id_field, type_ObjectId},
912✔
687
             {&m_sub_created_at, c_flx_sub_created_at_field, type_Timestamp},
912✔
688
             {&m_sub_updated_at, c_flx_sub_updated_at_field, type_Timestamp},
912✔
689
             {&m_sub_name, c_flx_sub_name_field, type_String, true},
912✔
690
             {&m_sub_object_class_name, c_flx_sub_object_class_field, type_String},
912✔
691
             {&m_sub_query_str, c_flx_sub_query_str_field, type_String},
912✔
692
         }},
912✔
693
    };
912✔
694

456✔
695
    auto tr = m_db->start_read();
912✔
696
    SyncMetadataSchemaVersions schema_versions(tr);
912✔
697

456✔
698
    if (auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_subscription_store);
912✔
699
        !schema_version) {
912✔
700
        tr->promote_to_write();
764✔
701
        schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version);
764✔
702
        create_sync_metadata_schema(tr, &internal_tables);
764✔
703
        tr->commit_and_continue_as_read();
764✔
704
    }
764✔
705
    else {
148✔
706
        if (*schema_version != c_flx_schema_version) {
148✔
707
            throw std::runtime_error("Invalid schema version for flexible sync metadata");
×
708
        }
×
709
        load_sync_metadata_schema(tr, &internal_tables);
148✔
710
    }
148✔
711

456✔
712
    // Make sure the subscription set table is properly initialized
456✔
713
    initialize_subscriptions_table(std::move(tr), false);
912✔
714
}
912✔
715

716
void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table)
717
{
932✔
718
    if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) {
932✔
719
        tr->promote_to_write();
784✔
720
        // If erase_table is true, clear out the sub_sets table
390✔
721
        if (clear_table) {
784✔
722
            sub_sets->clear();
20✔
723
        }
20✔
724
        // There should always be at least one subscription set so that the user can always wait
390✔
725
        // for synchronizationon on the result of get_latest().
390✔
726
        auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
784✔
727
        zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
784✔
728
        zero_sub.set(m_sub_set_snapshot_version, tr->get_version());
784✔
729
        tr->commit();
784✔
730
    }
784✔
731
}
932✔
732

733
SubscriptionSet SubscriptionStore::get_latest() const
734
{
1,446✔
735
    auto tr = m_db->start_frozen();
1,446✔
736
    auto sub_sets = tr->get_table(m_sub_set_table);
1,446✔
737
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
722✔
738
    REALM_ASSERT(!sub_sets->is_empty());
1,446✔
739

722✔
740
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,446✔
741
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
1,446✔
742

722✔
743
    return SubscriptionSet(weak_from_this(), *tr, latest_obj);
1,446✔
744
}
1,446✔
745

746
SubscriptionSet SubscriptionStore::get_active() const
747
{
1,360✔
748
    auto tr = m_db->start_frozen();
1,360✔
749
    return SubscriptionSet(weak_from_this(), *tr, get_active(*tr));
1,360✔
750
}
1,360✔
751

752
Obj SubscriptionStore::get_active(const Transaction& tr) const
753
{
1,400✔
754
    auto sub_sets = tr.get_table(m_sub_set_table);
1,400✔
755
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
704✔
756
    REALM_ASSERT(!sub_sets->is_empty());
1,400✔
757

704✔
758
    DescriptorOrdering descriptor_ordering;
1,400✔
759
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
1,400✔
760
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,400✔
761
    auto res = sub_sets->where()
1,400✔
762
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
1,400✔
763
                   .Or()
1,400✔
764
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,400✔
765
                   .find_all(descriptor_ordering);
1,400✔
766

704✔
767
    // If there is no active subscription yet, return the zeroth subscription.
704✔
768
    if (res.is_empty()) {
1,400✔
769
        return sub_sets->get_object_with_primary_key(0);
746✔
770
    }
746✔
771
    return res.get_object(0);
654✔
772
}
654✔
773

774
SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
775
{
1,134✔
776
    auto tr = m_db->start_read();
1,134✔
777
    auto sub_sets = tr->get_table(m_sub_set_table);
1,134✔
778
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
568✔
779
    REALM_ASSERT(!sub_sets->is_empty());
1,134✔
780

568✔
781
    VersionInfo ret;
1,134✔
782
    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,134✔
783
    DescriptorOrdering descriptor_ordering;
1,134✔
784
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
1,134✔
785
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,134✔
786

568✔
787
    auto res = sub_sets->where()
1,134✔
788
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
1,134✔
789
                   .Or()
1,134✔
790
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,134✔
791
                   .find_all(descriptor_ordering);
1,134✔
792
    ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
918✔
793

568✔
794
    res = sub_sets->where()
1,134✔
795
              .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,134✔
796
              .find_all(descriptor_ordering);
1,134✔
797
    ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
1,126✔
798

568✔
799
    return ret;
1,134✔
800
}
1,134✔
801

802
util::Optional<SubscriptionStore::PendingSubscription>
803
SubscriptionStore::get_next_pending_version(int64_t last_query_version) const
804
{
9,202✔
805
    auto tr = m_db->start_read();
9,202✔
806
    auto sub_sets = tr->get_table(m_sub_set_table);
9,202✔
807
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
4,768✔
808
    REALM_ASSERT(!sub_sets->is_empty());
9,202✔
809

4,768✔
810
    DescriptorOrdering descriptor_ordering;
9,202✔
811
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
9,202✔
812
    auto res = sub_sets->where()
9,202✔
813
                   .greater(sub_sets->get_primary_key_column(), last_query_version)
9,202✔
814
                   .group()
9,202✔
815
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
9,202✔
816
                   .Or()
9,202✔
817
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
9,202✔
818
                   .end_group()
9,202✔
819
                   .find_all(descriptor_ordering);
9,202✔
820

4,768✔
821
    if (res.is_empty()) {
9,202✔
822
        return util::none;
7,516✔
823
    }
7,516✔
824

842✔
825
    auto obj = res.get_object(0);
1,686✔
826
    auto query_version = obj.get_primary_key().get_int();
1,686✔
827
    auto snapshot_version = obj.get<int64_t>(m_sub_set_snapshot_version);
1,686✔
828
    return PendingSubscription{query_version, static_cast<DB::version_type>(snapshot_version)};
1,686✔
829
}
1,686✔
830

831
std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions() const
832
{
84✔
833
    std::vector<SubscriptionSet> subscriptions_to_recover;
84✔
834
    auto active_sub = get_active();
84✔
835
    auto cur_query_version = active_sub.version();
84✔
836
    // get a copy of the pending subscription sets since the active version
42✔
837
    while (auto next_pending = get_next_pending_version(cur_query_version)) {
260✔
838
        cur_query_version = next_pending->query_version;
176✔
839
        subscriptions_to_recover.push_back(get_by_version(cur_query_version));
176✔
840
    }
176✔
841
    return subscriptions_to_recover;
84✔
842
}
84✔
843

844
void SubscriptionStore::notify_all_state_change_notifications(Status status)
845
{
938✔
846
    std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
938✔
847
    m_pending_notifications_cv.wait(lk, [&] {
938✔
848
        return m_outstanding_requests == 0;
938✔
849
    });
938✔
850

470✔
851
    auto to_finish = std::move(m_pending_notifications);
938✔
852
    lk.unlock();
938✔
853

470✔
854
    // Just complete/cancel the pending notifications - this function does not alter the
470✔
855
    // state of any pending subscriptions
470✔
856
    for (auto& req : to_finish) {
482✔
857
        req.promise.set_error(status);
24✔
858
    }
24✔
859
}
938✔
860

861
void SubscriptionStore::terminate()
862
{
20✔
863
    // Clear out and initialize the subscription store
10✔
864
    initialize_subscriptions_table(m_db->start_read(), true);
20✔
865

10✔
866
    std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
20✔
867
    m_pending_notifications_cv.wait(lk, [&] {
20✔
868
        return m_outstanding_requests == 0;
20✔
869
    });
20✔
870
    auto to_finish = std::move(m_pending_notifications);
20✔
871
    m_min_outstanding_version = 0;
20✔
872

10✔
873
    lk.unlock();
20✔
874

10✔
875
    for (auto& req : to_finish) {
18✔
876
        req.promise.emplace_value(SubscriptionSet::State::Superseded);
16✔
877
    }
16✔
878
}
20✔
879

880
MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id)
881
{
2,162✔
882
    auto tr = m_db->start_write();
2,162✔
883
    auto sub_sets = tr->get_table(m_sub_set_table);
2,162✔
884
    auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
2,162✔
885
    if (!obj) {
2,162✔
886
        throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
4✔
887
    }
4✔
888
    return MutableSubscriptionSet(weak_from_this(), std::move(tr), obj);
2,158✔
889
}
2,158✔
890

891
SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const
892
{
1,042✔
893
    auto tr = m_db->start_frozen();
1,042✔
894
    auto sub_sets = tr->get_table(m_sub_set_table);
1,042✔
895
    if (auto obj = sub_sets->get_object_with_primary_key(version_id)) {
1,042✔
896
        return SubscriptionSet(weak_from_this(), *tr, obj);
1,042✔
897
    }
1,042✔
898

NEW
899
    std::lock_guard lk(m_pending_notifications_mutex);
×
NEW
900
    if (version_id < m_min_outstanding_version) {
×
NEW
901
        return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
×
NEW
902
    }
×
NEW
903
    throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
×
UNCOV
904
}
×
905

906
SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version,
907
                                                 std::optional<DB::VersionID> db_version) const
908
{
3,416✔
909
    auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
3,416✔
910
    auto sub_sets = tr->get_table(m_sub_set_table);
3,416✔
911
    if (auto obj = sub_sets->try_get_object(key)) {
3,416✔
912
        return SubscriptionSet(weak_from_this(), *tr, obj);
3,408✔
913
    }
3,408✔
914
    return SubscriptionSet(weak_from_this(), version, SubscriptionSet::SupersededTag{});
8✔
915
}
8✔
916

917
SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
918
{
9,876✔
919
    auto sub_sets = tr.get_table(m_sub_set_table);
9,876✔
920
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
4,954✔
921
    REALM_ASSERT(!sub_sets->is_empty());
9,876✔
922

4,954✔
923
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
9,876✔
924
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
9,876✔
925

4,954✔
926
    TableSet ret;
9,876✔
927
    auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
9,876✔
928
    for (size_t idx = 0; idx < subs.size(); ++idx) {
17,678✔
929
        auto sub_obj = subs.get_object(idx);
7,802✔
930
        ret.emplace(sub_obj.get<StringData>(m_sub_object_class_name));
7,802✔
931
    }
7,802✔
932

4,954✔
933
    return ret;
9,876✔
934
}
9,876✔
935

936
void SubscriptionStore::supercede_prior_to(TransactionRef tr, int64_t version_id) const
937
{
1,416✔
938
    auto sub_sets = tr->get_table(m_sub_set_table);
1,416✔
939
    Query remove_query(sub_sets);
1,416✔
940
    remove_query.less(sub_sets->get_primary_key_column(), version_id);
1,416✔
941
    remove_query.remove();
1,416✔
942
}
1,416✔
943

944
MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set) const
945
{
1,282✔
946
    auto new_tr = m_db->start_write();
1,282✔
947

640✔
948
    auto sub_sets = new_tr->get_table(m_sub_set_table);
1,282✔
949
    auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
1,282✔
950

640✔
951
    MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
1,282✔
952
                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}),
1,282✔
953
                                       SubscriptionSet::MakingMutableCopy{true});
1,282✔
954
    for (const auto& sub : set) {
948✔
955
        new_set_obj.insert_sub(sub);
616✔
956
    }
616✔
957

640✔
958
    return new_set_obj;
1,282✔
959
}
1,282✔
960

961
bool SubscriptionStore::would_refresh(DB::version_type version) const noexcept
962
{
20✔
963
    return version < m_db->get_version_of_latest_snapshot();
20✔
964
}
20✔
965

966
int64_t SubscriptionStore::set_active_as_latest(Transaction& wt)
967
{
40✔
968
    auto sub_sets = wt.get_table(m_sub_set_table);
40✔
969
    auto active = get_active(wt);
40✔
970
    // Delete all newer subscription sets, if any
20✔
971
    sub_sets->where().greater(sub_sets->get_primary_key_column(), active.get_primary_key().get_int()).remove();
40✔
972
    // Mark the active set as complete even if it was previously WaitingForMark
20✔
973
    // as we've completed rebootstrapping before calling this.
20✔
974
    active.set(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete));
40✔
975
    return active.get_primary_key().get_int();
40✔
976
}
40✔
977

978
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc