• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_112

27 Oct 2023 10:49AM UTC coverage: 91.586% (+0.02%) from 91.571%
thomas.goyne_112

push

Evergreen

web-flow
Merge pull request #7085 from realm/release/13.23.2

Release/13.23.2

91754 of 168238 branches covered (0.0%)

230143 of 251285 relevant lines covered (91.59%)

7082763.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.53
/src/realm/sync/subscriptions.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2021 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/subscriptions.hpp"
20

21
#include "external/json/json.hpp"
22

23
#include "realm/data_type.hpp"
24
#include "realm/transaction.hpp"
25
#include "realm/keys.hpp"
26
#include "realm/list.hpp"
27
#include "realm/sort_descriptor.hpp"
28
#include "realm/sync/noinst/sync_metadata_schema.hpp"
29
#include "realm/table.hpp"
30
#include "realm/table_view.hpp"
31
#include "realm/util/flat_map.hpp"
32
#include <algorithm>
33
#include <initializer_list>
34
#include <stdexcept>
35

36
#include <algorithm>
37

38
namespace realm::sync {
39
namespace {
40
// Schema version history:
41
//   v2: Initial public beta.
42

43
constexpr static int c_flx_schema_version = 2;
44
constexpr static std::string_view c_flx_subscription_sets_table("flx_subscription_sets");
45
constexpr static std::string_view c_flx_subscriptions_table("flx_subscriptions");
46

47
constexpr static std::string_view c_flx_sub_sets_state_field("state");
48
constexpr static std::string_view c_flx_sub_sets_version_field("version");
49
constexpr static std::string_view c_flx_sub_sets_error_str_field("error");
50
constexpr static std::string_view c_flx_sub_sets_subscriptions_field("subscriptions");
51
constexpr static std::string_view c_flx_sub_sets_snapshot_version_field("snapshot_version");
52

53
constexpr static std::string_view c_flx_sub_id_field("id");
54
constexpr static std::string_view c_flx_sub_created_at_field("created_at");
55
constexpr static std::string_view c_flx_sub_updated_at_field("updated_at");
56
constexpr static std::string_view c_flx_sub_name_field("name");
57
constexpr static std::string_view c_flx_sub_object_class_field("object_class");
58
constexpr static std::string_view c_flx_sub_query_str_field("query");
59

60
using OptionalString = util::Optional<std::string>;
61

62
enum class SubscriptionStateForStorage : int64_t {
63
    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
64
    Pending = 1,
65
    // The server is currently sending the initial state that represents this subscription set to the client.
66
    Bootstrapping = 2,
67
    // This subscription set is the active subscription set that is currently being synchronized with the server.
68
    Complete = 3,
69
    // An error occurred while processing this subscription set on the server. Check error_str() for details.
70
    Error = 4,
71
    // The last bootstrap message containing the initial state for this subscription set has been received. The
72
    // client is awaiting a mark message to mark this subscription as fully caught up to history.
73
    AwaitingMark = 6,
74
};
75

76
SubscriptionSet::State state_from_storage(int64_t value)
77
{
9,404✔
78
    switch (static_cast<SubscriptionStateForStorage>(value)) {
9,404✔
79
        case SubscriptionStateForStorage::Pending:
5,216✔
80
            return SubscriptionSet::State::Pending;
5,216✔
81
        case SubscriptionStateForStorage::Bootstrapping:
124✔
82
            return SubscriptionSet::State::Bootstrapping;
124✔
83
        case SubscriptionStateForStorage::AwaitingMark:
1,462✔
84
            return SubscriptionSet::State::AwaitingMark;
1,462✔
85
        case SubscriptionStateForStorage::Complete:
2,566✔
86
            return SubscriptionSet::State::Complete;
2,566✔
87
        case SubscriptionStateForStorage::Error:
36✔
88
            return SubscriptionSet::State::Error;
36✔
89
        default:
✔
90
            throw std::runtime_error(util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
×
91
    }
9,404✔
92
}
9,404✔
93

94
int64_t state_to_storage(SubscriptionSet::State state)
95
{
27,322✔
96
    switch (state) {
27,322✔
97
        case SubscriptionSet::State::Pending:
10,148✔
98
            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
10,148✔
99
        case SubscriptionSet::State::Bootstrapping:
9,094✔
100
            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
9,094✔
101
        case SubscriptionSet::State::AwaitingMark:
4,202✔
102
            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
4,202✔
103
        case SubscriptionSet::State::Complete:
3,858✔
104
            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
3,858✔
105
        case SubscriptionSet::State::Error:
20✔
106
            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
20✔
107
        default:
✔
108
            REALM_UNREACHABLE();
×
109
    }
27,322✔
110
}
27,322✔
111

112
size_t state_to_order(SubscriptionSet::State needle)
113
{
3,960✔
114
    using State = SubscriptionSet::State;
3,960✔
115
    switch (needle) {
3,960✔
116
        case State::Uncommitted:
24✔
117
            return 0;
24✔
118
        case State::Pending:
684✔
119
            return 1;
684✔
120
        case State::Bootstrapping:
56✔
121
            return 2;
56✔
122
        case State::AwaitingMark:
564✔
123
            return 3;
564✔
124
        case State::Complete:
2,632✔
125
            return 4;
2,632✔
126
        case State::Error:
✔
127
            return 5;
×
128
        case State::Superseded:
✔
129
            return 6;
×
130
    }
×
131
    REALM_UNREACHABLE();
×
132
}
×
133

134
} // namespace
135

136
Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
137
    : id(obj.get<ObjectId>(parent->m_sub_id))
138
    , created_at(obj.get<Timestamp>(parent->m_sub_created_at))
139
    , updated_at(obj.get<Timestamp>(parent->m_sub_updated_at))
140
    , name(obj.is_null(parent->m_sub_name) ? OptionalString(util::none)
141
                                           : OptionalString{obj.get<String>(parent->m_sub_name)})
142
    , object_class_name(obj.get<String>(parent->m_sub_object_class_name))
143
    , query_string(obj.get<String>(parent->m_sub_query_str))
144
{
7,492✔
145
}
7,492✔
146

147
Subscription::Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str)
148
    : id(ObjectId::gen())
149
    , created_at(std::chrono::system_clock::now())
150
    , updated_at(created_at)
151
    , name(std::move(name))
152
    , object_class_name(std::move(object_class_name))
153
    , query_string(std::move(query_str))
154
{
934✔
155
}
934✔
156

157

158
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, const Transaction& tr, Obj obj,
159
                                 MakingMutableCopy making_mutable_copy)
160
    : m_mgr(mgr)
161
    , m_cur_version(tr.get_version())
162
    , m_version(obj.get_primary_key().get_int())
163
{
10,714✔
164
    REALM_ASSERT(obj.is_valid());
10,714✔
165
    if (!making_mutable_copy) {
10,714✔
166
        load_from_database(std::move(obj));
9,404✔
167
    }
9,404✔
168
}
10,714✔
169

170
SubscriptionSet::SubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, int64_t version, SupersededTag)
171
    : m_mgr(mgr)
172
    , m_version(version)
173
    , m_state(State::Superseded)
174
{
4✔
175
}
4✔
176

177
void SubscriptionSet::load_from_database(Obj obj)
178
{
9,404✔
179
    auto mgr = get_flx_subscription_store(); // Throws
9,404✔
180

4,706✔
181
    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
9,404✔
182
    m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
9,404✔
183
    m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
9,404✔
184
    auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
9,404✔
185
    m_subs.clear();
9,404✔
186
    for (size_t idx = 0; idx < sub_list.size(); ++idx) {
16,896✔
187
        m_subs.push_back(Subscription(mgr.get(), sub_list.get_object(idx)));
7,492✔
188
    }
7,492✔
189
}
9,404✔
190

191
std::shared_ptr<const SubscriptionStore> SubscriptionSet::get_flx_subscription_store() const
192
{
19,710✔
193
    if (auto mgr = m_mgr.lock()) {
19,710✔
194
        return mgr;
19,706✔
195
    }
19,706✔
196
    throw std::logic_error("Active SubscriptionSet without a SubscriptionStore");
4✔
197
}
4✔
198

199
int64_t SubscriptionSet::version() const
200
{
16,060✔
201
    return m_version;
16,060✔
202
}
16,060✔
203

204
DB::version_type SubscriptionSet::snapshot_version() const
205
{
1,328✔
206
    return m_snapshot_version;
1,328✔
207
}
1,328✔
208

209
SubscriptionSet::State SubscriptionSet::state() const
210
{
10,134✔
211
    return m_state;
10,134✔
212
}
10,134✔
213

214
StringData SubscriptionSet::error_str() const
215
{
828✔
216
    if (m_error_str.empty()) {
828✔
217
        return StringData{};
800✔
218
    }
800✔
219
    return m_error_str;
28✔
220
}
28✔
221

222
size_t SubscriptionSet::size() const
223
{
296✔
224
    return m_subs.size();
296✔
225
}
296✔
226

227
const Subscription& SubscriptionSet::at(size_t index) const
228
{
28✔
229
    return m_subs.at(index);
28✔
230
}
28✔
231

232
SubscriptionSet::const_iterator SubscriptionSet::begin() const
233
{
3,718✔
234
    return m_subs.begin();
3,718✔
235
}
3,718✔
236

237
SubscriptionSet::const_iterator SubscriptionSet::end() const
238
{
4,696✔
239
    return m_subs.end();
4,696✔
240
}
4,696✔
241

242
const Subscription* SubscriptionSet::find(StringData name) const
243
{
64✔
244
    for (auto&& sub : *this) {
92✔
245
        if (sub.name == name)
92✔
246
            return &sub;
48✔
247
    }
92✔
248
    return nullptr;
40✔
249
}
64✔
250

251
const Subscription* SubscriptionSet::find(const Query& query) const
252
{
48✔
253
    const auto query_desc = query.get_description();
48✔
254
    const auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
48✔
255
    for (auto&& sub : *this) {
60✔
256
        if (sub.object_class_name == table_name && sub.query_string == query_desc)
60✔
257
            return &sub;
48✔
258
    }
60✔
259
    return nullptr;
24✔
260
}
48✔
261

262
MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<const SubscriptionStore> mgr, TransactionRef tr, Obj obj,
263
                                               MakingMutableCopy making_mutable_copy)
264
    : SubscriptionSet(mgr, *tr, obj, making_mutable_copy)
265
    , m_tr(std::move(tr))
266
    , m_obj(std::move(obj))
267
    , m_old_state(state())
268
{
3,448✔
269
}
3,448✔
270

271
void MutableSubscriptionSet::check_is_mutable() const
272
{
4,504✔
273
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
4,504✔
274
        throw WrongTransactionState("Not a write transaction");
12✔
275
    }
12✔
276
}
4,504✔
277

278
// This uses the 'swap and pop' idiom to run in constant time.
279
// The iterator returned is:
280
//  1. end(), if the last subscription is removed
281
//  2. same iterator it is passed (but pointing to the last subscription in set), otherwise
282
MutableSubscriptionSet::iterator MutableSubscriptionSet::erase(const_iterator it)
283
{
56✔
284
    check_is_mutable();
56✔
285
    REALM_ASSERT(it != end());
56✔
286
    if (it == std::prev(m_subs.end())) {
56✔
287
        m_subs.pop_back();
24✔
288
        return end();
24✔
289
    }
24✔
290
    auto back = std::prev(m_subs.end());
32✔
291
    // const_iterator to iterator in constant time (See https://stackoverflow.com/a/10669041)
16✔
292
    auto iterator = m_subs.erase(it, it);
32✔
293
    std::swap(*iterator, *back);
32✔
294
    m_subs.pop_back();
32✔
295
    return iterator;
32✔
296
}
32✔
297

298
bool MutableSubscriptionSet::erase(StringData name)
299
{
12✔
300
    check_is_mutable();
12✔
301
    auto ptr = find(name);
12✔
302
    if (!ptr)
12✔
303
        return false;
4✔
304
    auto it = m_subs.begin() + (ptr - &m_subs.front());
8✔
305
    erase(it);
8✔
306
    return true;
8✔
307
}
8✔
308

309
bool MutableSubscriptionSet::erase(const Query& query)
310
{
24✔
311
    check_is_mutable();
24✔
312
    auto ptr = find(query);
24✔
313
    if (!ptr)
24✔
314
        return false;
×
315
    auto it = m_subs.begin() + (ptr - &m_subs.front());
24✔
316
    erase(it);
24✔
317
    return true;
24✔
318
}
24✔
319

320
bool MutableSubscriptionSet::erase_by_class_name(StringData object_class_name)
321
{
16✔
322
    // TODO: Use std::erase_if when switching to C++20.
8✔
323
    auto it = std::remove_if(m_subs.begin(), m_subs.end(), [&object_class_name](const Subscription& sub) {
36✔
324
        return sub.object_class_name == object_class_name;
36✔
325
    });
36✔
326
    auto erased = end() - it;
16✔
327
    m_subs.erase(it, end());
16✔
328
    return erased > 0;
16✔
329
}
16✔
330

331
bool MutableSubscriptionSet::erase_by_id(ObjectId id)
332
{
12✔
333
    auto it = std::find_if(m_subs.begin(), m_subs.end(), [&id](const Subscription& sub) -> bool {
20✔
334
        return sub.id == id;
20✔
335
    });
20✔
336
    if (it == end()) {
12✔
337
        return false;
4✔
338
    }
4✔
339
    erase(it);
8✔
340
    return true;
8✔
341
}
8✔
342

343
void MutableSubscriptionSet::clear()
344
{
376✔
345
    check_is_mutable();
376✔
346
    m_subs.clear();
376✔
347
}
376✔
348

349
void MutableSubscriptionSet::insert_sub(const Subscription& sub)
350
{
832✔
351
    check_is_mutable();
832✔
352
    m_subs.push_back(sub);
832✔
353
}
832✔
354

355
std::pair<SubscriptionSet::iterator, bool>
356
MutableSubscriptionSet::insert_or_assign_impl(iterator it, util::Optional<std::string> name,
357
                                              std::string object_class_name, std::string query_str)
358
{
926✔
359
    check_is_mutable();
926✔
360
    if (it != end()) {
926✔
361
        auto& sub = m_subs[it - begin()];
36✔
362
        sub.object_class_name = std::move(object_class_name);
36✔
363
        sub.query_string = std::move(query_str);
36✔
364
        sub.updated_at = Timestamp{std::chrono::system_clock::now()};
36✔
365

18✔
366
        return {it, false};
36✔
367
    }
36✔
368
    it = m_subs.insert(m_subs.end(),
890✔
369
                       Subscription(std::move(name), std::move(object_class_name), std::move(query_str)));
890✔
370

444✔
371
    return {it, true};
890✔
372
}
890✔
373

374
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(std::string_view name,
375
                                                                                    const Query& query)
376
{
140✔
377
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
140✔
378
    auto query_str = query.get_description();
140✔
379
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
118✔
380
        return sub.name == name;
96✔
381
    });
96✔
382

70✔
383
    return insert_or_assign_impl(it, std::string{name}, std::move(table_name), std::move(query_str));
140✔
384
}
140✔
385

386
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(const Query& query)
387
{
786✔
388
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
786✔
389
    auto query_str = query.get_description();
786✔
390
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
498✔
391
        return (!sub.name && sub.object_class_name == table_name && sub.query_string == query_str);
212✔
392
    });
212✔
393

392✔
394
    return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str));
786✔
395
}
786✔
396

397
void MutableSubscriptionSet::import(const SubscriptionSet& src_subs)
398
{
128✔
399
    clear();
128✔
400
    for (const Subscription& sub : src_subs) {
132✔
401
        insert_sub(sub);
132✔
402
    }
132✔
403
}
128✔
404

405
void MutableSubscriptionSet::update_state(State new_state, util::Optional<std::string_view> error_str)
406
{
2,278✔
407
    check_is_mutable();
2,278✔
408
    auto old_state = state();
2,278✔
409
    if (error_str && new_state != State::Error) {
2,278✔
410
        throw std::logic_error("Cannot supply an error message for a subscription set when state is not Error");
×
411
    }
×
412
    switch (new_state) {
2,278✔
413
        case State::Uncommitted:
✔
414
            throw std::logic_error("cannot set subscription set state to uncommitted");
×
415

416
        case State::Error:
20✔
417
            if (old_state != State::Bootstrapping && old_state != State::Pending && old_state != State::Uncommitted) {
20!
418
                throw std::logic_error(
×
419
                    "subscription set must be in Bootstrapping or Pending to update state to error");
×
420
            }
×
421
            if (!error_str) {
20✔
422
                throw std::logic_error("Must supply an error message when setting a subscription to the error state");
×
423
            }
×
424

10✔
425
            m_state = new_state;
20✔
426
            m_error_str = std::string{*error_str};
20✔
427
            break;
20✔
428
        case State::Bootstrapping:
52✔
429
            [[fallthrough]];
52✔
430
        case State::AwaitingMark:
814✔
431
            m_state = new_state;
814✔
432
            break;
814✔
433
        case State::Complete: {
1,444✔
434
            auto mgr = get_flx_subscription_store(); // Throws
1,444✔
435
            m_state = new_state;
1,444✔
436
            mgr->supercede_prior_to(m_tr, version());
1,444✔
437
            break;
1,444✔
438
        }
52✔
439
        case State::Superseded:
26✔
440
            throw std::logic_error("Cannot set a subscription to the superseded state");
×
441
            break;
26✔
442
        case State::Pending:
26✔
443
            throw std::logic_error("Cannot set subscription set to the pending state");
×
444
            break;
26✔
445
    }
2,278✔
446
}
2,278✔
447

448
MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const
449
{
1,310✔
450
    auto mgr = get_flx_subscription_store(); // Throws
1,310✔
451
    return mgr->make_mutable_copy(*this);
1,310✔
452
}
1,310✔
453

454
void SubscriptionSet::refresh()
455
{
24✔
456
    auto mgr = get_flx_subscription_store(); // Throws
24✔
457
    if (mgr->would_refresh(m_cur_version)) {
24✔
458
        *this = mgr->get_by_version(version());
20✔
459
    }
20✔
460
}
24✔
461

462
util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
463
{
776✔
464
    auto mgr = get_flx_subscription_store(); // Throws
776✔
465

388✔
466
    std::unique_lock<std::mutex> lk(mgr->m_pending_notifications_mutex);
776✔
467
    // If we've already been superceded by another version getting completed, then we should skip registering
388✔
468
    // a notification because it may never fire.
388✔
469
    if (mgr->m_min_outstanding_version > version()) {
776✔
470
        return util::Future<State>::make_ready(State::Superseded);
8✔
471
    }
8✔
472

384✔
473
    // Begin by blocking process_notifications from starting to fill futures. No matter the outcome, we'll
384✔
474
    // unblock process_notifications() at the end of this function via the guard we construct below.
384✔
475
    mgr->m_outstanding_requests++;
768✔
476
    auto guard = util::make_scope_exit([&]() noexcept {
768✔
477
        if (!lk.owns_lock()) {
768✔
478
            lk.lock();
88✔
479
        }
88✔
480
        --mgr->m_outstanding_requests;
768✔
481
        mgr->m_pending_notifications_cv.notify_one();
768✔
482
    });
768✔
483
    lk.unlock();
768✔
484

384✔
485
    State cur_state = state();
768✔
486
    StringData err_str = error_str();
768✔
487

384✔
488
    // If there have been writes to the database since this SubscriptionSet was created, we need to fetch
384✔
489
    // the updated version from the DB to know the true current state and maybe return a ready future.
384✔
490
    if (m_cur_version < mgr->m_db->get_version_of_latest_snapshot()) {
768✔
491
        auto refreshed_self = mgr->get_by_version(version());
24✔
492
        cur_state = refreshed_self.state();
24✔
493
        err_str = refreshed_self.error_str();
24✔
494
    }
24✔
495
    // If we've already reached the desired state, or if the subscription is in an error state,
384✔
496
    // we can return a ready future immediately.
384✔
497
    if (cur_state == State::Error) {
768✔
498
        return util::Future<State>::make_ready(Status{ErrorCodes::SubscriptionFailed, err_str});
4✔
499
    }
4✔
500
    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
764✔
501
        return util::Future<State>::make_ready(cur_state);
84✔
502
    }
84✔
503

340✔
504
    // Otherwise put in a new request to be filled in by process_notifications().
340✔
505
    lk.lock();
680✔
506

340✔
507
    // Otherwise, make a promise/future pair and add it to the list of pending notifications.
340✔
508
    auto [promise, future] = util::make_promise_future<State>();
680✔
509
    mgr->m_pending_notifications.emplace_back(version(), std::move(promise), notify_when);
680✔
510
    return std::move(future);
680✔
511
}
680✔
512

513
void SubscriptionSet::get_state_change_notification(
514
    State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> cb) const
515
{
×
516
    get_state_change_notification(notify_when).get_async([cb = std::move(cb)](StatusWith<State> result) {
×
517
        if (result.is_ok()) {
×
518
            cb(result.get_value(), {});
×
519
        }
×
520
        else {
×
521
            cb({}, result.get_status());
×
522
        }
×
523
    });
×
524
}
×
525

526
void MutableSubscriptionSet::process_notifications()
527
{
3,376✔
528
    auto mgr = get_flx_subscription_store(); // Throws
3,376✔
529
    auto new_state = state();
3,376✔
530
    auto my_version = version();
3,376✔
531

1,688✔
532
    std::list<SubscriptionStore::NotificationRequest> to_finish;
3,376✔
533
    std::unique_lock<std::mutex> lk(mgr->m_pending_notifications_mutex);
3,376✔
534
    mgr->m_pending_notifications_cv.wait(lk, [&] {
3,376✔
535
        return mgr->m_outstanding_requests == 0;
3,376✔
536
    });
3,376✔
537

1,688✔
538
    for (auto it = mgr->m_pending_notifications.begin(); it != mgr->m_pending_notifications.end();) {
5,088✔
539
        if ((it->version == my_version &&
1,712✔
540
             (new_state == State::Error || state_to_order(new_state) >= state_to_order(it->notify_when))) ||
1,474✔
541
            (new_state == State::Complete && it->version < my_version)) {
1,392✔
542
            to_finish.splice(to_finish.end(), mgr->m_pending_notifications, it++);
652✔
543
        }
652✔
544
        else {
1,060✔
545
            ++it;
1,060✔
546
        }
1,060✔
547
    }
1,712✔
548

1,688✔
549
    if (new_state == State::Complete) {
3,376✔
550
        mgr->m_min_outstanding_version = my_version;
1,444✔
551
    }
1,444✔
552

1,688✔
553
    lk.unlock();
3,376✔
554

1,688✔
555
    for (auto& req : to_finish) {
2,014✔
556
        if (new_state == State::Error && req.version == my_version) {
652✔
557
            req.promise.set_error({ErrorCodes::SubscriptionFailed, std::string_view(error_str())});
20✔
558
        }
20✔
559
        else if (req.version < my_version) {
632✔
560
            req.promise.emplace_value(State::Superseded);
12✔
561
        }
12✔
562
        else {
620✔
563
            req.promise.emplace_value(new_state);
620✔
564
        }
620✔
565
    }
652✔
566
}
3,376✔
567

568
SubscriptionSet MutableSubscriptionSet::commit()
569
{
3,376✔
570
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
3,376✔
571
        throw std::logic_error("SubscriptionSet is not in a commitable state");
×
572
    }
×
573
    auto mgr = get_flx_subscription_store(); // Throws
3,376✔
574

1,688✔
575
    if (m_old_state == State::Uncommitted) {
3,376✔
576
        if (m_state == State::Uncommitted) {
1,242✔
577
            m_state = State::Pending;
1,102✔
578
        }
1,102✔
579
        m_obj.set(mgr->m_sub_set_snapshot_version, static_cast<int64_t>(m_tr->get_version()));
1,242✔
580

620✔
581
        auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions);
1,242✔
582
        obj_sub_list.clear();
1,242✔
583
        for (const auto& sub : m_subs) {
1,378✔
584
            auto new_sub =
1,378✔
585
                obj_sub_list.create_and_insert_linked_object(obj_sub_list.is_empty() ? 0 : obj_sub_list.size());
1,274✔
586
            new_sub.set(mgr->m_sub_id, sub.id);
1,378✔
587
            new_sub.set(mgr->m_sub_created_at, sub.created_at);
1,378✔
588
            new_sub.set(mgr->m_sub_updated_at, sub.updated_at);
1,378✔
589
            if (sub.name) {
1,378✔
590
                new_sub.set(mgr->m_sub_name, StringData(*sub.name));
188✔
591
            }
188✔
592
            new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name));
1,378✔
593
            new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string));
1,378✔
594
        }
1,378✔
595
    }
1,242✔
596
    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
3,376✔
597
    if (!m_error_str.empty()) {
3,376✔
598
        m_obj.set(mgr->m_sub_set_error_str, StringData(m_error_str));
20✔
599
    }
20✔
600

1,688✔
601
    const auto flx_version = version();
3,376✔
602
    m_tr->commit_and_continue_as_read();
3,376✔
603

1,688✔
604
    process_notifications();
3,376✔
605

1,688✔
606
    return mgr->get_by_version_impl(flx_version, m_tr->get_version_of_current_transaction());
3,376✔
607
}
3,376✔
608

609
std::string SubscriptionSet::to_ext_json() const
610
{
1,800✔
611
    if (m_subs.empty()) {
1,800✔
612
        return "{}";
686✔
613
    }
686✔
614

560✔
615
    util::FlatMap<std::string, std::vector<std::string>> table_to_query;
1,114✔
616
    for (const auto& sub : *this) {
1,262✔
617
        std::string table_name(sub.object_class_name);
1,262✔
618
        auto& queries_for_table = table_to_query.at(table_name);
1,262✔
619
        auto query_it = std::find(queries_for_table.begin(), queries_for_table.end(), sub.query_string);
1,262✔
620
        if (query_it != queries_for_table.end()) {
1,262✔
621
            continue;
8✔
622
        }
8✔
623
        queries_for_table.emplace_back(sub.query_string);
1,254✔
624
    }
1,254✔
625

560✔
626
    if (table_to_query.empty()) {
1,114✔
627
        return "{}";
×
628
    }
×
629

560✔
630
    // TODO this is pulling in a giant compile-time dependency. We should have a better way of escaping the
560✔
631
    // query strings into a json object.
560✔
632
    nlohmann::json output_json;
1,114✔
633
    for (auto& table : table_to_query) {
1,194✔
634
        // We want to make sure that the queries appear in some kind of canonical order so that if there are
600✔
635
        // two subscription sets with the same subscriptions in different orders, the server doesn't have to
600✔
636
        // waste a bunch of time re-running the queries for that table.
600✔
637
        std::stable_sort(table.second.begin(), table.second.end());
1,194✔
638

600✔
639
        bool is_first = true;
1,194✔
640
        std::ostringstream obuf;
1,194✔
641
        for (const auto& query_str : table.second) {
1,254✔
642
            if (!is_first) {
1,254✔
643
                obuf << " OR ";
60✔
644
            }
60✔
645
            is_first = false;
1,254✔
646
            obuf << "(" << query_str << ")";
1,254✔
647
        }
1,254✔
648
        output_json[table.first] = obuf.str();
1,194✔
649
    }
1,194✔
650

560✔
651
    return output_json.dump();
1,114✔
652
}
1,114✔
653

654
namespace {
655
class SubscriptionStoreInit : public SubscriptionStore {
656
public:
657
    explicit SubscriptionStoreInit(DBRef db)
658
        : SubscriptionStore(std::move(db))
659
    {
902✔
660
    }
902✔
661
};
662
} // namespace
663

664
SubscriptionStoreRef SubscriptionStore::create(DBRef db)
665
{
902✔
666
    return std::make_shared<SubscriptionStoreInit>(std::move(db));
902✔
667
}
902✔
668

669
SubscriptionStore::SubscriptionStore(DBRef db)
670
    : m_db(std::move(db))
671
{
902✔
672
    std::vector<SyncMetadataTable> internal_tables{
902✔
673
        {&m_sub_set_table,
902✔
674
         c_flx_subscription_sets_table,
902✔
675
         {&m_sub_set_version_num, c_flx_sub_sets_version_field, type_Int},
902✔
676
         {
902✔
677
             {&m_sub_set_state, c_flx_sub_sets_state_field, type_Int},
902✔
678
             {&m_sub_set_snapshot_version, c_flx_sub_sets_snapshot_version_field, type_Int},
902✔
679
             {&m_sub_set_error_str, c_flx_sub_sets_error_str_field, type_String, true},
902✔
680
             {&m_sub_set_subscriptions, c_flx_sub_sets_subscriptions_field, c_flx_subscriptions_table, true},
902✔
681
         }},
902✔
682
        {&m_sub_table,
902✔
683
         c_flx_subscriptions_table,
902✔
684
         SyncMetadataTable::IsEmbeddedTag{},
902✔
685
         {
902✔
686
             {&m_sub_id, c_flx_sub_id_field, type_ObjectId},
902✔
687
             {&m_sub_created_at, c_flx_sub_created_at_field, type_Timestamp},
902✔
688
             {&m_sub_updated_at, c_flx_sub_updated_at_field, type_Timestamp},
902✔
689
             {&m_sub_name, c_flx_sub_name_field, type_String, true},
902✔
690
             {&m_sub_object_class_name, c_flx_sub_object_class_field, type_String},
902✔
691
             {&m_sub_query_str, c_flx_sub_query_str_field, type_String},
902✔
692
         }},
902✔
693
    };
902✔
694

452✔
695
    auto tr = m_db->start_read();
902✔
696
    SyncMetadataSchemaVersions schema_versions(tr);
902✔
697

452✔
698
    if (auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_flx_subscription_store);
452✔
699
        !schema_version) {
902✔
700
        tr->promote_to_write();
902✔
701
        schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version);
702
        create_sync_metadata_schema(tr, &internal_tables);
703
        tr->commit_and_continue_as_read();
922✔
704
    }
922✔
705
    else {
776✔
706
        if (*schema_version != c_flx_schema_version) {
386✔
707
            throw std::runtime_error("Invalid schema version for flexible sync metadata");
776✔
708
        }
20✔
709
        load_sync_metadata_schema(tr, &internal_tables);
20✔
710
    }
386✔
711

386✔
712
    // Make sure the subscription set table is properly initialized
776✔
713
    initialize_subscriptions_table(std::move(tr), false);
776✔
714
}
776✔
715

776✔
716
void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr, bool clear_table)
776✔
717
{
922✔
718
    if (auto sub_sets = tr->get_table(m_sub_set_table); clear_table || sub_sets->is_empty()) {
719
        tr->promote_to_write();
720
        // If erase_table is true, clear out the sub_sets table
1,434✔
721
        if (clear_table) {
1,434✔
722
            sub_sets->clear();
1,434✔
723
        }
716✔
724
        // There should always be at least one subscription set so that the user can always wait
1,434✔
725
        // for synchronizationon on the result of get_latest().
716✔
726
        auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
1,434✔
727
        zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
1,434✔
728
        zero_sub.set(m_sub_set_snapshot_version, tr->get_version());
716✔
729
        tr->commit();
1,434✔
730
    }
1,434✔
731
}
732

733
SubscriptionSet SubscriptionStore::get_latest() const
1,388✔
734
{
1,388✔
735
    auto tr = m_db->start_frozen();
1,388✔
736
    auto sub_sets = tr->get_table(m_sub_set_table);
698✔
737
    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
1,388✔
738
    REALM_ASSERT(!sub_sets->is_empty());
698✔
739

1,388✔
740
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,388✔
741
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
1,388✔
742

1,388✔
743
    return SubscriptionSet(weak_from_this(), *tr, latest_obj);
1,388✔
744
}
1,388✔
745

1,388✔
746
SubscriptionSet SubscriptionStore::get_active() const
1,388✔
747
{
698✔
748
    auto tr = m_db->start_frozen();
698✔
749
    auto sub_sets = tr->get_table(m_sub_set_table);
1,388✔
750
    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
738✔
751
    REALM_ASSERT(!sub_sets->is_empty());
738✔
752

650✔
753
    DescriptorOrdering descriptor_ordering;
650✔
754
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
755
    descriptor_ordering.append_limit(LimitDescriptor{1});
756
    auto res = sub_sets->where()
1,026✔
757
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
1,026✔
758
                   .Or()
1,026✔
759
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
514✔
760
                   .find_all(descriptor_ordering);
1,026✔
761

514✔
762
    // If there is no active subscription yet, return the zero'th subscription.
1,026✔
763
    if (res.is_empty()) {
1,026✔
764
        return SubscriptionSet(weak_from_this(), *tr, sub_sets->get_object_with_primary_key(int64_t(0)));
1,026✔
765
    }
1,026✔
766
    return SubscriptionSet(weak_from_this(), *tr, res.get_object(0));
1,026✔
767
}
514✔
768

1,026✔
769
SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
1,026✔
770
{
1,026✔
771
    auto tr = m_db->start_read();
1,026✔
772
    auto sub_sets = tr->get_table(m_sub_set_table);
1,026✔
773
    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
860✔
774
    REALM_ASSERT(!sub_sets->is_empty());
514✔
775

1,026✔
776
    VersionInfo ret;
1,026✔
777
    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,026✔
778
    DescriptorOrdering descriptor_ordering;
1,018✔
779
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
514✔
780
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,026✔
781

1,026✔
782
    auto res = sub_sets->where()
783
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
784
                   .Or()
785
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
9,046✔
786
                   .find_all(descriptor_ordering);
9,046✔
787
    ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
9,046✔
788

4,712✔
789
    res = sub_sets->where()
9,046✔
790
              .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
4,712✔
791
              .find_all(descriptor_ordering);
9,046✔
792
    ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
9,046✔
793

9,046✔
794
    return ret;
9,046✔
795
}
9,046✔
796

9,046✔
797
util::Optional<SubscriptionStore::PendingSubscription>
9,046✔
798
SubscriptionStore::get_next_pending_version(int64_t last_query_version, DB::version_type after_client_version) const
9,046✔
799
{
9,046✔
800
    auto tr = m_db->start_read();
9,046✔
801
    auto sub_sets = tr->get_table(m_sub_set_table);
9,046✔
802
    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
4,712✔
803
    REALM_ASSERT(!sub_sets->is_empty());
9,046✔
804

7,380✔
805
    DescriptorOrdering descriptor_ordering;
7,380✔
806
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
834✔
807
    auto res = sub_sets->where()
1,666✔
808
                   .greater(sub_sets->get_primary_key_column(), last_query_version)
1,666✔
809
                   .group()
1,666✔
810
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
1,666✔
811
                   .Or()
1,666✔
812
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
813
                   .end_group()
814
                   .greater_equal(m_sub_set_snapshot_version, static_cast<int64_t>(after_client_version))
84✔
815
                   .find_all(descriptor_ordering);
84✔
816

84✔
817
    if (res.is_empty()) {
84✔
818
        return util::none;
84✔
819
    }
84✔
820

64✔
821
    auto obj = res.get_object(0);
64✔
822
    auto query_version = obj.get_primary_key().get_int();
84✔
823
    auto snapshot_version = obj.get<int64_t>(m_sub_set_snapshot_version);
42✔
824
    return PendingSubscription{query_version, static_cast<DB::version_type>(snapshot_version)};
260✔
825
}
176✔
826

176✔
827
std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions() const
176✔
828
{
176✔
829
    std::vector<SubscriptionSet> subscriptions_to_recover;
84✔
830
    auto active_sub = get_active();
84✔
831
    auto cur_query_version = active_sub.version();
832
    DB::version_type db_version = 0;
833
    if (active_sub.state() == SubscriptionSet::State::Complete) {
4✔
834
        db_version = active_sub.snapshot_version();
4✔
835
    }
4✔
836
    REALM_ASSERT_EX(db_version != DB::version_type(-1), active_sub.state());
4✔
837
    // get a copy of the pending subscription sets since the active version
4✔
838
    while (auto next_pending = get_next_pending_version(cur_query_version, db_version)) {
2✔
839
        cur_query_version = next_pending->query_version;
4✔
840
        db_version = next_pending->snapshot_version;
4✔
841
        subscriptions_to_recover.push_back(get_by_version(cur_query_version));
2✔
842
    }
2✔
843
    return subscriptions_to_recover;
2✔
844
}
12✔
845

12✔
846
void SubscriptionStore::notify_all_state_change_notifications(Status status)
12✔
847
{
4✔
848
    std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
849
    m_pending_notifications_cv.wait(lk, [&] {
850
        return m_outstanding_requests == 0;
20✔
851
    });
10✔
852

20✔
853
    auto to_finish = std::move(m_pending_notifications);
10✔
854
    lk.unlock();
20✔
855

20✔
856
    // Just complete/cancel the pending notifications - this function does not alter the
20✔
857
    // state of any pending subscriptions
20✔
858
    for (auto& req : to_finish) {
20✔
859
        req.promise.set_error(status);
20✔
860
    }
10✔
861
}
20✔
862

10✔
863
void SubscriptionStore::terminate()
18✔
864
{
16✔
865
    // Clear out and initialize the subscription store
16✔
866
    initialize_subscriptions_table(m_db->start_read(), true);
20✔
867

868
    std::unique_lock<std::mutex> lk(m_pending_notifications_mutex);
869
    m_pending_notifications_cv.wait(lk, [&] {
2,142✔
870
        return m_outstanding_requests == 0;
2,142✔
871
    });
2,142✔
872
    auto to_finish = std::move(m_pending_notifications);
2,142✔
873
    m_min_outstanding_version = 0;
2,142✔
874

4✔
875
    lk.unlock();
4✔
876

2,138✔
877
    for (auto& req : to_finish) {
2,138✔
878
        req.promise.emplace_value(SubscriptionSet::State::Superseded);
879
    }
880
}
1,072✔
881

1,072✔
882
MutableSubscriptionSet SubscriptionStore::get_mutable_by_version(int64_t version_id)
1,072✔
883
{
884
    auto tr = m_db->start_write();
885
    auto sub_sets = tr->get_table(m_sub_set_table);
886
    auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
4,448✔
887
    if (!obj) {
4,448✔
888
        throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
4,448✔
889
    }
4,448✔
890
    return MutableSubscriptionSet(weak_from_this(), std::move(tr), obj);
4,448✔
891
}
4,444✔
892

4,444✔
893
SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id) const
4✔
894
{
4✔
895
    return get_by_version_impl(version_id, util::none);
4✔
896
}
4✔
897

4✔
898
SubscriptionSet SubscriptionStore::get_by_version_impl(int64_t version_id,
×
899
                                                       util::Optional<DB::VersionID> db_version) const
×
900
{
4,448✔
901
    auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
902
    auto sub_sets = tr->get_table(m_sub_set_table);
903
    auto obj = sub_sets->get_object_with_primary_key(Mixed{version_id});
9,808✔
904
    if (obj) {
9,808✔
905
        return SubscriptionSet(weak_from_this(), *tr, obj);
4,916✔
906
    }
9,808✔
907
    else {
4,916✔
908
        std::lock_guard<std::mutex> lk(m_pending_notifications_mutex);
9,808✔
909
        if (version_id < m_min_outstanding_version) {
9,808✔
910
            return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
4,916✔
911
        }
9,808✔
912
        throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
9,808✔
913
    }
17,594✔
914
}
7,786✔
915

7,786✔
916
SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
7,786✔
917
{
4,916✔
918
    auto sub_sets = tr.get_table(m_sub_set_table);
9,808✔
919
    // There should always be at least one SubscriptionSet - the zero'th subscription set for schema instructions.
9,808✔
920
    REALM_ASSERT(!sub_sets->is_empty());
921

922
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,444✔
923
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
1,444✔
924

1,444✔
925
    TableSet ret;
1,444✔
926
    auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
1,444✔
927
    for (size_t idx = 0; idx < subs.size(); ++idx) {
1,444✔
928
        auto sub_obj = subs.get_object(idx);
929
        ret.emplace(sub_obj.get<StringData>(m_sub_object_class_name));
930
    }
1,310✔
931

1,310✔
932
    return ret;
654✔
933
}
1,310✔
934

1,310✔
935
void SubscriptionStore::supercede_prior_to(TransactionRef tr, int64_t version_id) const
654✔
936
{
1,310✔
937
    auto sub_sets = tr->get_table(m_sub_set_table);
1,310✔
938
    Query remove_query(sub_sets);
1,310✔
939
    remove_query.less(sub_sets->get_primary_key_column(), version_id);
980✔
940
    remove_query.remove();
652✔
941
}
652✔
942

654✔
943
MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set) const
1,310✔
944
{
1,310✔
945
    auto new_tr = m_db->start_write();
946

947
    auto sub_sets = new_tr->get_table(m_sub_set_table);
20✔
948
    auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
20✔
949

20✔
950
    MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
951
                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}),
952
                                       SubscriptionSet::MakingMutableCopy{true});
953
    for (const auto& sub : set) {
954
        new_set_obj.insert_sub(sub);
955
    }
956

957
    return new_set_obj;
958
}
959

960
bool SubscriptionStore::would_refresh(DB::version_type version) const noexcept
961
{
962
    return version < m_db->get_version_of_latest_snapshot();
963
}
964

965
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc