• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1829

13 Nov 2023 04:39AM UTC coverage: 91.666% (+0.001%) from 91.665%
1829

push

Evergreen

web-flow
Merge pull request #7119 from realm/tg/client-reset-flx-notifications

Deliver appropriate subscription state change notifications in DiscardLocal client resets

92104 of 168886 branches covered (0.0%)

512 of 522 new or added lines in 12 files covered. (98.08%)

95 existing lines in 18 files now uncovered.

231065 of 252072 relevant lines covered (91.67%)

6642642.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.34
/src/realm/sync/subscriptions.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2021 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/subscriptions.hpp"
20

21
#include "external/json/json.hpp"
22

23
#include "realm/data_type.hpp"
24
#include "realm/keys.hpp"
25
#include "realm/list.hpp"
26
#include "realm/sort_descriptor.hpp"
27
#include "realm/sync/noinst/sync_metadata_schema.hpp"
28
#include "realm/table.hpp"
29
#include "realm/table_view.hpp"
30
#include "realm/transaction.hpp"
31
#include "realm/util/flat_map.hpp"
32

33
#include <algorithm>
34
#include <initializer_list>
35
#include <stdexcept>
36

37
namespace realm::sync {
38
namespace {
39
// Schema version history:
40
//   v2: Initial public beta.
41

42
constexpr static int c_flx_schema_version = 2;
43
constexpr static std::string_view c_flx_subscription_sets_table("flx_subscription_sets");
44
constexpr static std::string_view c_flx_subscriptions_table("flx_subscriptions");
45

46
constexpr static std::string_view c_flx_sub_sets_state_field("state");
47
constexpr static std::string_view c_flx_sub_sets_version_field("version");
48
constexpr static std::string_view c_flx_sub_sets_error_str_field("error");
49
constexpr static std::string_view c_flx_sub_sets_subscriptions_field("subscriptions");
50
constexpr static std::string_view c_flx_sub_sets_snapshot_version_field("snapshot_version");
51

52
constexpr static std::string_view c_flx_sub_id_field("id");
53
constexpr static std::string_view c_flx_sub_created_at_field("created_at");
54
constexpr static std::string_view c_flx_sub_updated_at_field("updated_at");
55
constexpr static std::string_view c_flx_sub_name_field("name");
56
constexpr static std::string_view c_flx_sub_object_class_field("object_class");
57
constexpr static std::string_view c_flx_sub_query_str_field("query");
58

59
using OptionalString = util::Optional<std::string>;
60

61
enum class SubscriptionStateForStorage : int64_t {
62
    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
63
    Pending = 1,
64
    // The server is currently sending the initial state that represents this subscription set to the client.
65
    Bootstrapping = 2,
66
    // This subscription set is the active subscription set that is currently being synchronized with the server.
67
    Complete = 3,
68
    // An error occurred while processing this subscription set on the server. Check error_str() for details.
69
    Error = 4,
70
    // The last bootstrap message containing the initial state for this subscription set has been received. The
71
    // client is awaiting a mark message to mark this subscription as fully caught up to history.
72
    AwaitingMark = 6,
73
};
74

75
SubscriptionSet::State state_from_storage(int64_t value)
76
{
9,830✔
77
    switch (static_cast<SubscriptionStateForStorage>(value)) {
9,830✔
78
        case SubscriptionStateForStorage::Pending:
5,528✔
79
            return SubscriptionSet::State::Pending;
5,528✔
80
        case SubscriptionStateForStorage::Bootstrapping:
124✔
81
            return SubscriptionSet::State::Bootstrapping;
124✔
82
        case SubscriptionStateForStorage::AwaitingMark:
1,544✔
83
            return SubscriptionSet::State::AwaitingMark;
1,544✔
84
        case SubscriptionStateForStorage::Complete:
2,598✔
85
            return SubscriptionSet::State::Complete;
2,598✔
86
        case SubscriptionStateForStorage::Error:
36✔
87
            return SubscriptionSet::State::Error;
36✔
88
        default:
✔
89
            throw std::runtime_error(util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
×
90
    }
9,830✔
91
}
9,830✔
92

93
int64_t state_to_storage(SubscriptionSet::State state)
94
{
28,300✔
95
    switch (state) {
28,300✔
96
        case SubscriptionSet::State::Pending:
10,276✔
97
            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
10,276✔
98
        case SubscriptionSet::State::Bootstrapping:
9,162✔
99
            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
9,162✔
100
        case SubscriptionSet::State::AwaitingMark:
4,642✔
101
            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
4,642✔
102
        case SubscriptionSet::State::Complete:
4,200✔
103
            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
4,200✔
104
        case SubscriptionSet::State::Error:
20✔
105
            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
20✔
106
        default:
✔
107
            REALM_UNREACHABLE();
108
    }
28,300✔
109
}
28,300✔
110

111
size_t state_to_order(SubscriptionSet::State needle)
112
{
4,328✔
113
    using State = SubscriptionSet::State;
4,328✔
114
    switch (needle) {
4,328✔
115
        case State::Uncommitted:
24✔
116
            return 0;
24✔
117
        case State::Pending:
760✔
118
            return 1;
760✔
119
        case State::Bootstrapping:
56✔
120
            return 2;
56✔
121
        case State::AwaitingMark:
624✔
122
            return 3;
624✔
123
        case State::Complete:
2,860✔
124
            return 4;
2,860✔
125
        case State::Error:
✔
126
            return 5;
×
127
        case State::Superseded:
4✔
128
            return 6;
4✔
129
    }
×
130
    REALM_UNREACHABLE();
131
}
×
132

133
template <typename T, typename Predicate>
134
void splice_if(std::list<T>& src, std::list<T>& dst, Predicate pred)
135
{
3,562✔
136
    for (auto it = src.begin(); it != src.end();) {
5,508✔
137
        if (pred(*it)) {
1,946✔
138
            dst.splice(dst.end(), src, it++);
724✔
139
        }
724✔
140
        else {
1,222✔
141
            ++it;
1,222✔
142
        }
1,222✔
143
    }
1,946✔
144
}
3,562✔
145

146
} // namespace
147

148
Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
149
    : id(obj.get<ObjectId>(parent->m_sub_id))
150
    , created_at(obj.get<Timestamp>(parent->m_sub_created_at))
151
    , updated_at(obj.get<Timestamp>(parent->m_sub_updated_at))
152
    , name(obj.is_null(parent->m_sub_name) ? OptionalString(util::none)
153
                                           : OptionalString{obj.get<String>(parent->m_sub_name)})
154
    , object_class_name(obj.get<String>(parent->m_sub_object_class_name))
155
    , query_string(obj.get<String>(parent->m_sub_query_str))
156
{
7,844✔
157
}
7,844✔
158

159
Subscription::Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str)
160
    : id(ObjectId::gen())
161
    , created_at(std::chrono::system_clock::now())
162
    , updated_at(created_at)
163
    , name(std::move(name))
164
    , object_class_name(std::move(object_class_name))
165
    , query_string(std::move(query_str))
166
{
998✔
167
}
998✔
168

169

170
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, const Obj& obj,
171
                                 MakingMutableCopy making_mutable_copy)
172
    : m_mgr(mgr)
173
    , m_cur_version(tr.get_version())
174
    , m_version(obj.get_primary_key().get_int())
175
    , m_obj_key(obj.get_key())
176
{
11,168✔
177
    REALM_ASSERT(obj.is_valid());
11,168✔
178
    if (!making_mutable_copy) {
11,168✔
179
        load_from_database(obj);
9,830✔
180
    }
9,830✔
181
}
11,168✔
182

183
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag)
184
    : m_mgr(mgr)
185
    , m_version(version)
186
    , m_state(State::Superseded)
187
{
20✔
188
}
20✔
189

190
void SubscriptionSet::load_from_database(const Obj& obj)
191
{
9,830✔
192
    auto mgr = get_flx_subscription_store(); // Throws
9,830✔
193

4,908✔
194
    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
9,830✔
195
    m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
9,830✔
196
    m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
9,830✔
197
    auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
9,830✔
198
    m_subs.clear();
9,830✔
199
    for (size_t idx = 0; idx < sub_list.size(); ++idx) {
17,674✔
200
        m_subs.push_back(Subscription(mgr.get(), sub_list.get_object(idx)));
7,844✔
201
    }
7,844✔
202
}
9,830✔
203

204
std::shared_ptr<const SubscriptionStore> SubscriptionSet::get_flx_subscription_store() const
205
{
20,560✔
206
    if (auto mgr = m_mgr.lock()) {
20,560✔
207
        return mgr;
20,556✔
208
    }
20,556✔
209
    throw std::logic_error("Active SubscriptionSet without a SubscriptionStore");
4✔
210
}
4✔
211

212
int64_t SubscriptionSet::version() const
213
{
13,248✔
214
    return m_version;
13,248✔
215
}
13,248✔
216

217
DB::version_type SubscriptionSet::snapshot_version() const
218
{
1,320✔
219
    return m_snapshot_version;
1,320✔
220
}
1,320✔
221

222
SubscriptionSet::State SubscriptionSet::state() const
223
{
10,520✔
224
    return m_state;
10,520✔
225
}
10,520✔
226

227
StringData SubscriptionSet::error_str() const
228
{
908✔
229
    if (m_error_str.empty()) {
908✔
230
        return StringData{};
880✔
231
    }
880✔
232
    return m_error_str;
28✔
233
}
28✔
234

235
size_t SubscriptionSet::size() const
236
{
300✔
237
    return m_subs.size();
300✔
238
}
300✔
239

240
const Subscription& SubscriptionSet::at(size_t index) const
241
{
32✔
242
    return m_subs.at(index);
32✔
243
}
32✔
244

245
SubscriptionSet::const_iterator SubscriptionSet::begin() const
246
{
3,874✔
247
    return m_subs.begin();
3,874✔
248
}
3,874✔
249

250
SubscriptionSet::const_iterator SubscriptionSet::end() const
251
{
4,916✔
252
    return m_subs.end();
4,916✔
253
}
4,916✔
254

255
const Subscription* SubscriptionSet::find(StringData name) const
256
{
64✔
257
    for (auto&& sub : *this) {
92✔
258
        if (sub.name == name)
92✔
259
            return &sub;
48✔
260
    }
92✔
261
    return nullptr;
40✔
262
}
64✔
263

264
const Subscription* SubscriptionSet::find(const Query& query) const
265
{
48✔
266
    const auto query_desc = query.get_description();
48✔
267
    const auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
48✔
268
    for (auto&& sub : *this) {
60✔
269
        if (sub.object_class_name == table_name && sub.query_string == query_desc)
60✔
270
            return &sub;
48✔
271
    }
60✔
272
    return nullptr;
24✔
273
}
48✔
274

275
MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
276
                                               MakingMutableCopy making_mutable_copy)
277
    : SubscriptionSet(mgr, *tr, obj, making_mutable_copy)
278
    , m_tr(std::move(tr))
279
    , m_obj(std::move(obj))
280
    , m_old_state(state())
281
{
3,586✔
282
}
3,586✔
283

284
void MutableSubscriptionSet::check_is_mutable() const
285
{
4,646✔
286
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
4,646✔
287
        throw WrongTransactionState("Not a write transaction");
12✔
288
    }
12✔
289
}
4,646✔
290

291
// This uses the 'swap and pop' idiom to run in constant time.
292
// The iterator returned is:
293
//  1. end(), if the last subscription is removed
294
//  2. same iterator it is passed (but pointing to the last subscription in set), otherwise
295
MutableSubscriptionSet::iterator MutableSubscriptionSet::erase(const_iterator it)
296
{
56✔
297
    check_is_mutable();
56✔
298
    REALM_ASSERT(it != end());
56✔
299
    if (it == std::prev(m_subs.end())) {
56✔
300
        m_subs.pop_back();
24✔
301
        return end();
24✔
302
    }
24✔
303
    auto back = std::prev(m_subs.end());
32✔
304
    // const_iterator to iterator in constant time (See https://stackoverflow.com/a/10669041)
16✔
305
    auto iterator = m_subs.erase(it, it);
32✔
306
    std::swap(*iterator, *back);
32✔
307
    m_subs.pop_back();
32✔
308
    return iterator;
32✔
309
}
32✔
310

311
bool MutableSubscriptionSet::erase(StringData name)
312
{
12✔
313
    check_is_mutable();
12✔
314
    auto ptr = find(name);
12✔
315
    if (!ptr)
12✔
316
        return false;
4✔
317
    auto it = m_subs.begin() + (ptr - &m_subs.front());
8✔
318
    erase(it);
8✔
319
    return true;
8✔
320
}
8✔
321

322
bool MutableSubscriptionSet::erase(const Query& query)
323
{
24✔
324
    check_is_mutable();
24✔
325
    auto ptr = find(query);
24✔
326
    if (!ptr)
24✔
327
        return false;
×
328
    auto it = m_subs.begin() + (ptr - &m_subs.front());
24✔
329
    erase(it);
24✔
330
    return true;
24✔
331
}
24✔
332

333
bool MutableSubscriptionSet::erase_by_class_name(StringData object_class_name)
334
{
16✔
335
    // TODO: Use std::erase_if when switching to C++20.
8✔
336
    auto it = std::remove_if(m_subs.begin(), m_subs.end(), [&object_class_name](const Subscription& sub) {
36✔
337
        return sub.object_class_name == object_class_name;
36✔
338
    });
36✔
339
    auto erased = end() - it;
16✔
340
    m_subs.erase(it, end());
16✔
341
    return erased > 0;
16✔
342
}
16✔
343

344
bool MutableSubscriptionSet::erase_by_id(ObjectId id)
345
{
12✔
346
    auto it = std::find_if(m_subs.begin(), m_subs.end(), [&id](const Subscription& sub) -> bool {
20✔
347
        return sub.id == id;
20✔
348
    });
20✔
349
    if (it == end()) {
12✔
350
        return false;
4✔
351
    }
4✔
352
    erase(it);
8✔
353
    return true;
8✔
354
}
8✔
355

356
void MutableSubscriptionSet::clear()
357
{
376✔
358
    check_is_mutable();
376✔
359
    m_subs.clear();
376✔
360
}
376✔
361

362
void MutableSubscriptionSet::insert_sub(const Subscription& sub)
363
{
828✔
364
    check_is_mutable();
828✔
365
    m_subs.push_back(sub);
828✔
366
}
828✔
367

368
std::pair<SubscriptionSet::iterator, bool>
369
MutableSubscriptionSet::insert_or_assign_impl(iterator it, util::Optional<std::string> name,
370
                                              std::string object_class_name, std::string query_str)
371
{
994✔
372
    check_is_mutable();
994✔
373
    if (it != end()) {
994✔
374
        auto& sub = m_subs[it - begin()];
40✔
375
        sub.object_class_name = std::move(object_class_name);
40✔
376
        sub.query_string = std::move(query_str);
40✔
377
        sub.updated_at = Timestamp{std::chrono::system_clock::now()};
40✔
378

20✔
379
        return {it, false};
40✔
380
    }
40✔
381
    it = m_subs.insert(m_subs.end(),
954✔
382
                       Subscription(std::move(name), std::move(object_class_name), std::move(query_str)));
954✔
383

476✔
384
    return {it, true};
954✔
385
}
954✔
386

387
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(std::string_view name,
388
                                                                                    const Query& query)
389
{
160✔
390
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
160✔
391
    auto query_str = query.get_description();
160✔
392
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
140✔
393
        return sub.name == name;
120✔
394
    });
120✔
395

80✔
396
    return insert_or_assign_impl(it, std::string{name}, std::move(table_name), std::move(query_str));
160✔
397
}
160✔
398

399
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(const Query& query)
400
{
834✔
401
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
834✔
402
    auto query_str = query.get_description();
834✔
403
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
528✔
404
        return (!sub.name && sub.object_class_name == table_name && sub.query_string == query_str);
224✔
405
    });
224✔
406

416✔
407
    return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str));
834✔
408
}
834✔
409

410
void MutableSubscriptionSet::import(const SubscriptionSet& src_subs)
411
{
128✔
412
    clear();
128✔
413
    for (const Subscription& sub : src_subs) {
132✔
414
        insert_sub(sub);
132✔
415
    }
132✔
416
}
128✔
417

418
void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::string_view> error_str)
419
{
2,356✔
420
    check_is_mutable();
2,356✔
421
    auto old_state = state();
2,356✔
422
    if (error_str && new_state != State::Error) {
2,356✔
423
        throw std::logic_error("Cannot supply an error message for a subscription set when state is not Error");
×
424
    }
×
425
    switch (new_state) {
2,356✔
426
        case State::Uncommitted:
✔
427
            throw std::logic_error("cannot set subscription set state to uncommitted");
×
428

429
        case State::Error:
20✔
430
            if (old_state != State::Bootstrapping && old_state != State::Pending && old_state != State::Uncommitted) {
20!
431
                throw std::logic_error(
×
432
                    "subscription set must be in Bootstrapping or Pending to update state to error");
×
433
            }
×
434
            if (!error_str) {
20✔
435
                throw std::logic_error("Must supply an error message when setting a subscription to the error state");
×
436
            }
×
437

10✔
438
            m_state = new_state;
20✔
439
            m_error_str = std::string{*error_str};
20✔
440
            break;
20✔
441
        case State::Bootstrapping:
52✔
442
            [[fallthrough]];
52✔
443
        case State::AwaitingMark:
856✔
444
            m_state = new_state;
856✔
445
            break;
856✔
446
        case State::Complete: {
1,480✔
447
            auto mgr = get_flx_subscription_store(); // Throws
1,480✔
448
            m_state = new_state;
1,480✔
449
            mgr->supercede_prior_to(m_tr, version());
1,480✔
450
            break;
1,480✔
451
        }
52✔
452
        case State::Superseded:
26✔
453
            throw std::logic_error("Cannot set a subscription to the superseded state");
×
454
        case State::Pending:
26✔
UNCOV
455
            throw std::logic_error("Cannot set subscription set to the pending state");
×
456
    }
2,356✔
457
}
2,356✔
458

459
MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const
460
{
1,338✔
461
    auto mgr = get_flx_subscription_store(); // Throws
1,338✔
462
    return mgr->make_mutable_copy(*this);
1,338✔
463
}
1,338✔
464

465
void SubscriptionSet::refresh()
466
{
40✔
467
    auto mgr = get_flx_subscription_store(); // Throws
40✔
468
    if (mgr->would_refresh(m_cur_version)) {
40✔
469
        *this = mgr->get_refreshed(m_obj_key, version());
36✔
470
    }
36✔
471
}
40✔
472

473
util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
474
{
844✔
475
    auto mgr = get_flx_subscription_store(); // Throws
844✔
476

422✔
477
    util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex);
844✔
478
    // If we've already been superceded by another version getting completed, then we should skip registering
422✔
479
    // a notification because it may never fire.
422✔
480
    if (mgr->m_min_outstanding_version > version()) {
844✔
481
        return util::Future<State>::make_ready(State::Superseded);
4✔
482
    }
4✔
483

420✔
484
    State cur_state = state();
840✔
485
    StringData err_str = error_str();
840✔
486

420✔
487
    // If there have been writes to the database since this SubscriptionSet was created, we need to fetch
420✔
488
    // the updated version from the DB to know the true current state and maybe return a ready future.
420✔
489
    if (m_cur_version < mgr->m_db->get_version_of_latest_snapshot()) {
840✔
490
        auto refreshed_self = mgr->get_refreshed(m_obj_key, version());
32✔
491
        cur_state = refreshed_self.state();
32✔
492
        err_str = refreshed_self.error_str();
32✔
493
    }
32✔
494
    // If we've already reached the desired state, or if the subscription is in an error state,
420✔
495
    // we can return a ready future immediately.
420✔
496
    if (cur_state == State::Error) {
840✔
497
        return util::Future<State>::make_ready(Status{ErrorCodes::SubscriptionFailed, err_str});
4✔
498
    }
4✔
499
    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
836✔
500
        return util::Future<State>::make_ready(cur_state);
72✔
501
    }
72✔
502

382✔
503
    // Otherwise, make a promise/future pair and add it to the list of pending notifications.
382✔
504
    auto [promise, future] = util::make_promise_future<State>();
764✔
505
    mgr->m_pending_notifications.emplace_back(version(), std::move(promise), notify_when);
764✔
506
    return std::move(future);
764✔
507
}
764✔
508

509
void SubscriptionSet::get_state_change_notification(
510
    State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> cb) const
511
{
×
512
    get_state_change_notification(notify_when).get_async([cb = std::move(cb)](StatusWith<State> result) {
×
513
        if (result.is_ok()) {
×
514
            cb(result.get_value(), {});
×
515
        }
×
516
        else {
×
517
            cb({}, result.get_status());
×
518
        }
×
519
    });
×
520
}
×
521

522
void MutableSubscriptionSet::process_notifications()
523
{
3,514✔
524
    auto mgr = get_flx_subscription_store(); // Throws
3,514✔
525
    auto new_state = state();
3,514✔
526

1,752✔
527
    std::list<SubscriptionStore::NotificationRequest> to_finish;
3,514✔
528
    {
3,514✔
529
        util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex);
3,514✔
530
        splice_if(mgr->m_pending_notifications, to_finish, [&](auto& req) {
2,718✔
531
            return (req.version == m_version &&
1,930✔
532
                    (new_state == State::Error || state_to_order(new_state) >= state_to_order(req.notify_when))) ||
1,636✔
533
                   (new_state == State::Complete && req.version < m_version);
1,582✔
534
        });
1,930✔
535

1,752✔
536
        if (new_state == State::Complete) {
3,514✔
537
            mgr->m_min_outstanding_version = m_version;
1,480✔
538
        }
1,480✔
539
    }
3,514✔
540

1,752✔
541
    for (auto& req : to_finish) {
2,106✔
542
        if (new_state == State::Error && req.version == m_version) {
708✔
543
            req.promise.set_error({ErrorCodes::SubscriptionFailed, std::string_view(error_str())});
20✔
544
        }
20✔
545
        else if (req.version < m_version) {
688✔
546
            req.promise.emplace_value(State::Superseded);
12✔
547
        }
12✔
548
        else {
676✔
549
            req.promise.emplace_value(new_state);
676✔
550
        }
676✔
551
    }
708✔
552
}
3,514✔
553

554
SubscriptionSet MutableSubscriptionSet::commit()
555
{
3,514✔
556
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
3,514✔
557
        throw std::logic_error("SubscriptionSet is not in a commitable state");
×
558
    }
×
559
    auto mgr = get_flx_subscription_store(); // Throws
3,514✔
560

1,752✔
561
    if (m_old_state == State::Uncommitted) {
3,514✔
562
        if (m_state == State::Uncommitted) {
1,270✔
563
            m_state = State::Pending;
1,162✔
564
        }
1,162✔
565
        m_obj.set(mgr->m_sub_set_snapshot_version, static_cast<int64_t>(m_tr->get_version()));
1,270✔
566

634✔
567
        auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions);
1,270✔
568
        obj_sub_list.clear();
1,270✔
569
        for (const auto& sub : m_subs) {
1,438✔
570
            auto new_sub =
1,438✔
571
                obj_sub_list.create_and_insert_linked_object(obj_sub_list.is_empty() ? 0 : obj_sub_list.size());
1,318✔
572
            new_sub.set(mgr->m_sub_id, sub.id);
1,438✔
573
            new_sub.set(mgr->m_sub_created_at, sub.created_at);
1,438✔
574
            new_sub.set(mgr->m_sub_updated_at, sub.updated_at);
1,438✔
575
            if (sub.name) {
1,438✔
576
                new_sub.set(mgr->m_sub_name, StringData(*sub.name));
216✔
577
            }
216✔
578
            new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name));
1,438✔
579
            new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string));
1,438✔
580
        }
1,438✔
581
    }
1,270✔
582
    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
3,514✔
583
    if (!m_error_str.empty()) {
3,514✔
584
        m_obj.set(mgr->m_sub_set_error_str, StringData(m_error_str));
20✔
585
    }
20✔
586

1,752✔
587
    const auto flx_version = version();
3,514✔
588
    m_tr->commit_and_continue_as_read();
3,514✔
589

1,752✔
590
    process_notifications();
3,514✔
591

1,752✔
592
    return mgr->get_refreshed(m_obj.get_key(), flx_version, m_tr->get_version_of_current_transaction());
3,514✔
593
}
3,514✔
594

595
std::string SubscriptionSet::to_ext_json() const
596
{
1,898✔
597
    if (m_subs.empty()) {
1,898✔
598
        return "{}";
728✔
599
    }
728✔
600

588✔
601
    util::FlatMap<std::string, std::vector<std::string>> table_to_query;
1,170✔
602
    for (const auto& sub : *this) {
1,330✔
603
        std::string table_name(sub.object_class_name);
1,330✔
604
        auto& queries_for_table = table_to_query.at(table_name);
1,330✔
605
        auto query_it = std::find(queries_for_table.begin(), queries_for_table.end(), sub.query_string);
1,330✔
606
        if (query_it != queries_for_table.end()) {
1,330✔
607
            continue;
8✔
608
        }
8✔
609
        queries_for_table.emplace_back(sub.query_string);
1,322✔
610
    }
1,322✔
611

588✔
612
    if (table_to_query.empty()) {
1,170✔
613
        return "{}";
×
614
    }
×
615

588✔
616
    // TODO this is pulling in a giant compile-time dependency. We should have a better way of escaping the
588✔
617
    // query strings into a json object.
588✔
618
    nlohmann::json output_json;
1,170✔
619
    for (auto& table : table_to_query) {
1,262✔
620
        // We want to make sure that the queries appear in some kind of canonical order so that if there are
634✔
621
        // two subscription sets with the same subscriptions in different orders, the server doesn't have to
634✔
622
        // waste a bunch of time re-running the queries for that table.
634✔
623
        std::stable_sort(table.second.begin(), table.second.end());
1,262✔
624

634✔
625
        bool is_first = true;
1,262✔
626
        std::ostringstream obuf;
1,262✔
627
        for (const auto& query_str : table.second) {
1,322✔
628
            if (!is_first) {
1,322✔
629
                obuf << " OR ";
60✔
630
            }
60✔
631
            is_first = false;
1,322✔
632
            obuf << "(" << query_str << ")";
1,322✔
633
        }
1,322✔
634
        output_json[table.first] = obuf.str();
1,262✔
635
    }
1,262✔
636

588✔
637
    return output_json.dump();
1,170✔
638
}
1,170✔
639

640
namespace {
641
class SubscriptionStoreInit : public SubscriptionStore {
642
public:
643
    explicit SubscriptionStoreInit(DBRef db)
644
        : SubscriptionStore(std::move(db))
645
    {
952✔
646
    }
952✔
647
};
648
} // namespace
649

650
SubscriptionStoreRef SubscriptionStore::create(DBRef db)
651
{
952✔
652
    return std::make_shared<SubscriptionStoreInit>(std::move(db));
952✔
653
}
952✔
654

655
SubscriptionStore::SubscriptionStore(DBRef db)
656
    : m_db(std::move(db))
657
{
952✔
658
    std::vector<SyncMetadataTable> internal_tables{
952✔
659
        {&m_sub_set_table,
952✔
660
         c_flx_subscription_sets_table,
952✔
661
         {&m_sub_set_version_num, c_flx_sub_sets_version_field, type_Int},
952✔
662
         {
952✔
663
             {&m_sub_set_state, c_flx_sub_sets_state_field, type_Int},
952✔
664
             {&m_sub_set_snapshot_version, c_flx_sub_sets_snapshot_version_field, type_Int},
952✔
665
             {&m_sub_set_error_str, c_flx_sub_sets_error_str_field, type_String, true},
952✔
666
             {&m_sub_set_subscriptions, c_flx_sub_sets_subscriptions_field, c_flx_subscriptions_table, true},
952✔
667
         }},
952✔
668
        {&m_sub_table,
952✔
669
         c_flx_subscriptions_table,
952✔
670
         SyncMetadataTable::IsEmbeddedTag{},
952✔
671
         {
952✔
672
             {&m_sub_id, c_flx_sub_id_field, type_ObjectId},
952✔
673
             {&m_sub_created_at, c_flx_sub_created_at_field, type_Timestamp},
952✔
674
             {&m_sub_updated_at, c_flx_sub_updated_at_field, type_Timestamp},
952✔
675
             {&m_sub_name, c_flx_sub_name_field, type_String, true},
952✔
676
             {&m_sub_object_class_name, c_flx_sub_object_class_field, type_String},
952✔
677
             {&m_sub_query_str, c_flx_sub_query_str_field, type_String},
952✔
678
         }},
952✔
679
    };
952✔
680

476✔
681
    auto tr = m_db->start_read();
952✔
682
    SyncMetadataSchemaVersions schema_versions(tr);
952✔
683

476✔
684
    if (auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_subscription_store);
952✔
685
        !schema_version) {
952✔
686
        tr->promote_to_write();
804✔
687
        schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version);
804✔
688
        create_sync_metadata_schema(tr, &internal_tables);
804✔
689
        tr->commit_and_continue_as_read();
804✔
690
    }
804✔
691
    else {
148✔
692
        if (*schema_version != c_flx_schema_version) {
148✔
693
            throw std::runtime_error("Invalid schema version for flexible sync metadata");
×
694
        }
×
695
        load_sync_metadata_schema(tr, &internal_tables);
148✔
696
    }
148✔
697

476✔
698
    // Make sure the subscription set table is properly initialized
476✔
699
    initialize_subscriptions_table(std::move(tr), false);
952✔
700
}
952✔
701

702
void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table)
703
{
972✔
704
    if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) {
972✔
705
        tr->promote_to_write();
824✔
706
        // If erase_table is true, clear out the sub_sets table
410✔
707
        if (clear_table) {
824✔
708
            sub_sets->clear();
20✔
709
        }
20✔
710
        // There should always be at least one subscription set so that the user can always wait
410✔
711
        // for synchronizationon on the result of get_latest().
410✔
712
        auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
824✔
713
        zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
824✔
714
        zero_sub.set(m_sub_set_snapshot_version, tr->get_version());
824✔
715
        tr->commit();
824✔
716
    }
824✔
717
}
972✔
718

719
SubscriptionSet SubscriptionStore::get_latest() const
720
{
1,478✔
721
    auto tr = m_db->start_frozen();
1,478✔
722
    auto sub_sets = tr->get_table(m_sub_set_table);
1,478✔
723
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
738✔
724
    REALM_ASSERT(!sub_sets->is_empty());
1,478✔
725

738✔
726
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,478✔
727
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
1,478✔
728

738✔
729
    return SubscriptionSet(weak_from_this(), *tr, latest_obj);
1,478✔
730
}
1,478✔
731

732
SubscriptionSet SubscriptionStore::get_active() const
733
{
1,458✔
734
    auto tr = m_db->start_frozen();
1,458✔
735
    return SubscriptionSet(weak_from_this(), *tr, get_active(*tr));
1,458✔
736
}
1,458✔
737

738
Obj SubscriptionStore::get_active(const Transaction& tr) const
739
{
1,506✔
740
    auto sub_sets = tr.get_table(m_sub_set_table);
1,506✔
741
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
756✔
742
    REALM_ASSERT(!sub_sets->is_empty());
1,506✔
743

756✔
744
    DescriptorOrdering descriptor_ordering;
1,506✔
745
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
1,506✔
746
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,506✔
747
    auto res = sub_sets->where()
1,506✔
748
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
1,506✔
749
                   .Or()
1,506✔
750
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,506✔
751
                   .find_all(descriptor_ordering);
1,506✔
752

756✔
753
    // If there is no active subscription yet, return the zeroth subscription.
756✔
754
    if (res.is_empty()) {
1,506✔
755
        return sub_sets->get_object_with_primary_key(0);
808✔
756
    }
808✔
757
    return res.get_object(0);
698✔
758
}
698✔
759

760
SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
761
{
1,166✔
762
    auto tr = m_db->start_read();
1,166✔
763
    auto sub_sets = tr->get_table(m_sub_set_table);
1,166✔
764
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
584✔
765
    REALM_ASSERT(!sub_sets->is_empty());
1,166✔
766

584✔
767
    VersionInfo ret;
1,166✔
768
    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,166✔
769
    DescriptorOrdering descriptor_ordering;
1,166✔
770
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
1,166✔
771
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,166✔
772

584✔
773
    auto res = sub_sets->where()
1,166✔
774
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
1,166✔
775
                   .Or()
1,166✔
776
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,166✔
777
                   .find_all(descriptor_ordering);
1,166✔
778
    ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
950✔
779

584✔
780
    res = sub_sets->where()
1,166✔
781
              .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
1,166✔
782
              .find_all(descriptor_ordering);
1,166✔
783
    ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
1,158✔
784

584✔
785
    return ret;
1,166✔
786
}
1,166✔
787

788
util::Optional<SubscriptionStore::PendingSubscription>
789
SubscriptionStore::get_next_pending_version(int64_t last_query_version) const
790
{
9,114✔
791
    auto tr = m_db->start_read();
9,114✔
792
    auto sub_sets = tr->get_table(m_sub_set_table);
9,114✔
793
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
4,704✔
794
    REALM_ASSERT(!sub_sets->is_empty());
9,114✔
795

4,704✔
796
    DescriptorOrdering descriptor_ordering;
9,114✔
797
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
9,114✔
798
    auto res = sub_sets->where()
9,114✔
799
                   .greater(sub_sets->get_primary_key_column(), last_query_version)
9,114✔
800
                   .group()
9,114✔
801
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
9,114✔
802
                   .Or()
9,114✔
803
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
9,114✔
804
                   .end_group()
9,114✔
805
                   .find_all(descriptor_ordering);
9,114✔
806

4,704✔
807
    if (res.is_empty()) {
9,114✔
808
        return util::none;
7,372✔
809
    }
7,372✔
810

870✔
811
    auto obj = res.get_object(0);
1,742✔
812
    auto query_version = obj.get_primary_key().get_int();
1,742✔
813
    auto snapshot_version = obj.get<int64_t>(m_sub_set_snapshot_version);
1,742✔
814
    return PendingSubscription{query_version, static_cast<DB::version_type>(snapshot_version)};
1,742✔
815
}
1,742✔
816

817
std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions() const
818
{
88✔
819
    std::vector<SubscriptionSet> subscriptions_to_recover;
88✔
820
    auto active_sub = get_active();
88✔
821
    auto cur_query_version = active_sub.version();
88✔
822
    // get a copy of the pending subscription sets since the active version
44✔
823
    while (auto next_pending = get_next_pending_version(cur_query_version)) {
264✔
824
        cur_query_version = next_pending->query_version;
176✔
825
        subscriptions_to_recover.push_back(get_by_version(cur_query_version));
176✔
826
    }
176✔
827
    return subscriptions_to_recover;
88✔
828
}
88✔
829

830
void SubscriptionStore::notify_all_state_change_notifications(Status status)
831
{
970✔
832
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
970✔
833
    auto to_finish = std::move(m_pending_notifications);
970✔
834
    lk.unlock();
970✔
835

486✔
836
    // Just complete/cancel the pending notifications - this function does not alter the
486✔
837
    // state of any pending subscriptions
486✔
838
    for (auto& req : to_finish) {
498✔
839
        req.promise.set_error(status);
24✔
840
    }
24✔
841
}
970✔
842

843
void SubscriptionStore::terminate()
844
{
20✔
845
    // Clear out and initialize the subscription store
10✔
846
    initialize_subscriptions_table(m_db->start_read(), true);
20✔
847

10✔
848
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
20✔
849
    auto to_finish = std::move(m_pending_notifications);
20✔
850
    m_min_outstanding_version = 0;
20✔
851
    lk.unlock();
20✔
852

10✔
853
    for (auto& req : to_finish) {
18✔
854
        req.promise.emplace_value(SubscriptionSet::State::Superseded);
16✔
855
    }
16✔
856
}
20✔
857

858
MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id)
859
{
2,252✔
860
    auto tr = m_db->start_write();
2,252✔
861
    auto sub_sets = tr->get_table(m_sub_set_table);
2,252✔
862
    auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
2,252✔
863
    if (!obj) {
2,252✔
864
        throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
4✔
865
    }
4✔
866
    return MutableSubscriptionSet(weak_from_this(), std::move(tr), obj);
2,248✔
867
}
2,248✔
868

869
SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const
870
{
1,084✔
871
    auto tr = m_db->start_frozen();
1,084✔
872
    auto sub_sets = tr->get_table(m_sub_set_table);
1,084✔
873
    if (auto obj = sub_sets->get_object_with_primary_key(version_id)) {
1,084✔
874
        return SubscriptionSet(weak_from_this(), *tr, obj);
1,084✔
875
    }
1,084✔
876

NEW
877
    util::CheckedLockGuard lk(m_pending_notifications_mutex);
×
878
    if (version_id < m_min_outstanding_version) {
×
879
        return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
×
880
    }
×
881
    throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
×
882
}
×
883

884
SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version,
885
                                                 std::optional<DB::VersionID> db_version) const
886
{
3,582✔
887
    auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
3,582✔
888
    auto sub_sets = tr->get_table(m_sub_set_table);
3,582✔
889
    if (auto obj = sub_sets->try_get_object(key)) {
3,582✔
890
        return SubscriptionSet(weak_from_this(), *tr, obj);
3,562✔
891
    }
3,562✔
892
    return SubscriptionSet(weak_from_this(), version, SubscriptionSet::SupersededTag{});
20✔
893
}
20✔
894

895
SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
896
{
10,084✔
897
    auto sub_sets = tr.get_table(m_sub_set_table);
10,084✔
898
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
5,032✔
899
    REALM_ASSERT(!sub_sets->is_empty());
10,084✔
900

5,032✔
901
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
10,084✔
902
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
10,084✔
903

5,032✔
904
    TableSet ret;
10,084✔
905
    auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
10,084✔
906
    for (size_t idx = 0; idx < subs.size(); ++idx) {
18,072✔
907
        auto sub_obj = subs.get_object(idx);
7,988✔
908
        ret.emplace(sub_obj.get<StringData>(m_sub_object_class_name));
7,988✔
909
    }
7,988✔
910

5,032✔
911
    return ret;
10,084✔
912
}
10,084✔
913

914
void SubscriptionStore::supercede_prior_to(TransactionRef tr, int64_t version_id) const
915
{
1,480✔
916
    auto sub_sets = tr->get_table(m_sub_set_table);
1,480✔
917
    Query remove_query(sub_sets);
1,480✔
918
    remove_query.less(sub_sets->get_primary_key_column(), version_id);
1,480✔
919
    remove_query.remove();
1,480✔
920
}
1,480✔
921

922
MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set) const
923
{
1,338✔
924
    auto new_tr = m_db->start_write();
1,338✔
925

668✔
926
    auto sub_sets = new_tr->get_table(m_sub_set_table);
1,338✔
927
    auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
1,338✔
928

668✔
929
    MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
1,338✔
930
                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}),
1,338✔
931
                                       SubscriptionSet::MakingMutableCopy{true});
1,338✔
932
    for (const auto& sub : set) {
992✔
933
        new_set_obj.insert_sub(sub);
648✔
934
    }
648✔
935

668✔
936
    return new_set_obj;
1,338✔
937
}
1,338✔
938

939
bool SubscriptionStore::would_refresh(DB::version_type version) const noexcept
940
{
36✔
941
    return version < m_db->get_version_of_latest_snapshot();
36✔
942
}
36✔
943

944
int64_t SubscriptionStore::set_active_as_latest(Transaction& wt)
945
{
48✔
946
    auto sub_sets = wt.get_table(m_sub_set_table);
48✔
947
    auto active = get_active(wt);
48✔
948
    // Delete all newer subscription sets, if any
24✔
949
    sub_sets->where().greater(sub_sets->get_primary_key_column(), active.get_primary_key().get_int()).remove();
48✔
950
    // Mark the active set as complete even if it was previously WaitingForMark
24✔
951
    // as we've completed rebootstrapping before calling this.
24✔
952
    active.set(m_sub_set_state, state_to_storage(State::Complete));
48✔
953
    auto version = active.get_primary_key().get_int();
48✔
954

24✔
955
    std::list<NotificationRequest> to_finish;
48✔
956
    {
48✔
957
        util::CheckedLockGuard lock(m_pending_notifications_mutex);
48✔
958
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
32✔
959
            if (req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete))
16✔
960
                return true;
4✔
961
            return req.version != version;
12✔
962
        });
12✔
963
    }
48✔
964

24✔
965
    for (auto& req : to_finish) {
32✔
966
        req.promise.emplace_value(req.version == version ? State::Complete : State::Superseded);
14✔
967
    }
16✔
968

24✔
969
    return version;
48✔
970
}
48✔
971

972
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc