• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jorgen.edelbo_402

21 Aug 2024 11:10AM UTC coverage: 91.054% (-0.03%) from 91.085%
jorgen.edelbo_402

Pull #7803

Evergreen

jedelbo
Small fix to Table::typed_write

When writing the realm to a new file from a write transaction,
the Table may be COW so that the top ref is changed. So don't
use the ref that is present in the group when the operation starts.
Pull Request #7803: Feature/string compression

103494 of 181580 branches covered (57.0%)

1929 of 1999 new or added lines in 46 files covered. (96.5%)

695 existing lines in 51 files now uncovered.

220142 of 241772 relevant lines covered (91.05%)

7344461.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.75
/src/realm/sync/subscriptions.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2021 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/subscriptions.hpp"
20

21
#include "external/json/json.hpp"
22

23
#include "realm/data_type.hpp"
24
#include "realm/keys.hpp"
25
#include "realm/list.hpp"
26
#include "realm/sort_descriptor.hpp"
27
#include "realm/sync/noinst/sync_metadata_schema.hpp"
28
#include "realm/table.hpp"
29
#include "realm/table_view.hpp"
30
#include "realm/transaction.hpp"
31
#include "realm/util/flat_map.hpp"
32

33
#include <algorithm>
34
#include <initializer_list>
35
#include <stdexcept>
36

37
namespace realm::sync {
38
namespace {
39
// Schema version history:
40
//   v2: Initial public beta.
41

42
constexpr static int c_flx_schema_version = 2;
43
constexpr static std::string_view c_flx_subscription_sets_table("flx_subscription_sets");
44
constexpr static std::string_view c_flx_subscriptions_table("flx_subscriptions");
45

46
constexpr static std::string_view c_flx_sub_sets_state_field("state");
47
constexpr static std::string_view c_flx_sub_sets_version_field("version");
48
constexpr static std::string_view c_flx_sub_sets_error_str_field("error");
49
constexpr static std::string_view c_flx_sub_sets_subscriptions_field("subscriptions");
50
constexpr static std::string_view c_flx_sub_sets_snapshot_version_field("snapshot_version");
51

52
constexpr static std::string_view c_flx_sub_id_field("id");
53
constexpr static std::string_view c_flx_sub_created_at_field("created_at");
54
constexpr static std::string_view c_flx_sub_updated_at_field("updated_at");
55
constexpr static std::string_view c_flx_sub_name_field("name");
56
constexpr static std::string_view c_flx_sub_object_class_field("object_class");
57
constexpr static std::string_view c_flx_sub_query_str_field("query");
58

59
using OptionalString = util::Optional<std::string>;
60

61
enum class SubscriptionStateForStorage : int64_t {
62
    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
63
    Pending = 1,
64
    // The server is currently sending the initial state that represents this subscription set to the client.
65
    Bootstrapping = 2,
66
    // This subscription set is the active subscription set that is currently being synchronized with the server.
67
    Complete = 3,
68
    // An error occurred while processing this subscription set on the server. Check error_str() for details.
69
    Error = 4,
70
    // The last bootstrap message containing the initial state for this subscription set has been received. The
71
    // client is awaiting a mark message to mark this subscription as fully caught up to history.
72
    AwaitingMark = 6,
73
};
74

75
SubscriptionSet::State state_from_storage(int64_t value)
76
{
30,270✔
77
    switch (static_cast<SubscriptionStateForStorage>(value)) {
30,270✔
78
        case SubscriptionStateForStorage::Pending:
16,232✔
79
            return SubscriptionSet::State::Pending;
16,232✔
80
        case SubscriptionStateForStorage::Bootstrapping:
4,362✔
81
            return SubscriptionSet::State::Bootstrapping;
4,362✔
82
        case SubscriptionStateForStorage::AwaitingMark:
5,772✔
83
            return SubscriptionSet::State::AwaitingMark;
5,772✔
84
        case SubscriptionStateForStorage::Complete:
3,824✔
85
            return SubscriptionSet::State::Complete;
3,824✔
86
        case SubscriptionStateForStorage::Error:
80✔
87
            return SubscriptionSet::State::Error;
80✔
88
        default:
✔
89
            throw RuntimeError(ErrorCodes::InvalidArgument,
×
90
                               util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
×
91
    }
30,270✔
92
}
30,270✔
93

94
constexpr int64_t state_to_storage(SubscriptionSet::State state)
95
{
100,616✔
96
    switch (state) {
100,616✔
97
        case SubscriptionSet::State::Pending:
19,700✔
98
            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
19,700✔
99
        case SubscriptionSet::State::Bootstrapping:
35,366✔
100
            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
35,366✔
101
        case SubscriptionSet::State::AwaitingMark:
22,758✔
102
            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
22,758✔
103
        case SubscriptionSet::State::Complete:
22,764✔
104
            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
22,764✔
105
        case SubscriptionSet::State::Error:
28✔
106
            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
28✔
107
        default:
✔
108
            REALM_UNREACHABLE();
109
    }
100,616✔
110
}
100,616✔
111

112
constexpr size_t state_to_order(SubscriptionSet::State needle)
113
{
31,944✔
114
    using State = SubscriptionSet::State;
31,944✔
115
    switch (needle) {
31,944✔
116
        case State::Uncommitted:
88✔
117
            return 0;
88✔
118
        case State::Pending:
9,306✔
119
            return 1;
9,306✔
120
        case State::Bootstrapping:
2,046✔
121
            return 2;
2,046✔
122
        case State::AwaitingMark:
3,556✔
123
            return 3;
3,556✔
124
        case State::Complete:
16,756✔
125
            return 4;
16,756✔
126
        case State::Error:
136✔
127
            return 5;
136✔
128
        case State::Superseded:
56✔
129
            return 6;
56✔
130
    }
31,944✔
131
    REALM_UNREACHABLE();
132
}
×
133

134
template <typename T, typename Predicate>
135
void splice_if(std::list<T>& src, std::list<T>& dst, Predicate pred)
136
{
168✔
137
    for (auto it = src.begin(); it != src.end();) {
200✔
138
        if (pred(*it)) {
32✔
139
            dst.splice(dst.end(), src, it++);
20✔
140
        }
20✔
141
        else {
12✔
142
            ++it;
12✔
143
        }
12✔
144
    }
32✔
145
}
168✔
146

147
} // namespace
148

149
Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
150
    : id(obj.get<ObjectId>(parent->m_sub_id))
3,216✔
151
    , created_at(obj.get<Timestamp>(parent->m_sub_created_at))
3,216✔
152
    , updated_at(obj.get<Timestamp>(parent->m_sub_updated_at))
3,216✔
153
    , name(obj.is_null(parent->m_sub_name) ? OptionalString(util::none)
3,216✔
154
                                           : OptionalString{obj.get<String>(parent->m_sub_name)})
3,216✔
155
    , object_class_name(obj.get<String>(parent->m_sub_object_class_name))
3,216✔
156
    , query_string(obj.get<String>(parent->m_sub_query_str))
3,216✔
157
{
6,436✔
158
}
6,436✔
159

160
Subscription::Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str)
161
    : id(ObjectId::gen())
808✔
162
    , created_at(std::chrono::system_clock::now())
808✔
163
    , updated_at(created_at)
808✔
164
    , name(std::move(name))
808✔
165
    , object_class_name(std::move(object_class_name))
808✔
166
    , query_string(std::move(query_str))
808✔
167
{
1,618✔
168
}
1,618✔
169

170

171
SubscriptionSet::SubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, const Transaction& tr, const Obj& obj,
172
                                 MakingMutableCopy making_mutable_copy)
173
    : m_mgr(mgr)
4,654✔
174
    , m_cur_version(tr.get_version())
4,654✔
175
    , m_version(obj.get_primary_key().get_int())
4,654✔
176
    , m_obj_key(obj.get_key())
4,654✔
177
{
9,316✔
178
    REALM_ASSERT(obj.is_valid());
9,316✔
179
    if (!making_mutable_copy) {
9,316✔
180
        load_from_database(obj);
7,666✔
181
    }
7,666✔
182
}
9,316✔
183

184
SubscriptionSet::SubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, int64_t version, SupersededTag)
185
    : m_mgr(mgr)
10✔
186
    , m_version(version)
10✔
187
    , m_state(State::Superseded)
10✔
188
{
20✔
189
}
20✔
190

191
void SubscriptionSet::load_from_database(const Obj& obj)
192
{
7,666✔
193
    auto mgr = get_flx_subscription_store(); // Throws
7,666✔
194

195
    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
7,666✔
196
    m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
7,666✔
197
    m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
7,666✔
198
    auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
7,666✔
199
    m_subs.clear();
7,666✔
200
    for (size_t idx = 0; idx < sub_list.size(); ++idx) {
14,102✔
201
        m_subs.push_back(Subscription(mgr.get(), sub_list.get_object(idx)));
6,436✔
202
    }
6,436✔
203
}
7,666✔
204

205
std::shared_ptr<SubscriptionStore> SubscriptionSet::get_flx_subscription_store() const
206
{
12,262✔
207
    if (auto mgr = m_mgr.lock()) {
12,262✔
208
        return mgr;
12,258✔
209
    }
12,258✔
210
    throw RuntimeError(ErrorCodes::BrokenInvariant, "Active SubscriptionSet without a SubscriptionStore");
4✔
211
}
12,262✔
212

213
int64_t SubscriptionSet::version() const
214
{
13,036✔
215
    return m_version;
13,036✔
216
}
13,036✔
217

218
DB::version_type SubscriptionSet::snapshot_version() const
219
{
1,378✔
220
    return m_snapshot_version;
1,378✔
221
}
1,378✔
222

223
SubscriptionSet::State SubscriptionSet::state() const
224
{
1,732✔
225
    return m_state;
1,732✔
226
}
1,732✔
227

228
StringData SubscriptionSet::error_str() const
229
{
1,372✔
230
    if (m_error_str.empty()) {
1,372✔
231
        return StringData{};
1,360✔
232
    }
1,360✔
233
    return m_error_str;
12✔
234
}
1,372✔
235

236
size_t SubscriptionSet::size() const
237
{
308✔
238
    return m_subs.size();
308✔
239
}
308✔
240

241
const Subscription& SubscriptionSet::at(size_t index) const
242
{
36✔
243
    return m_subs.at(index);
36✔
244
}
36✔
245

246
SubscriptionSet::const_iterator SubscriptionSet::begin() const
247
{
5,422✔
248
    return m_subs.begin();
5,422✔
249
}
5,422✔
250

251
SubscriptionSet::const_iterator SubscriptionSet::end() const
252
{
7,120✔
253
    return m_subs.end();
7,120✔
254
}
7,120✔
255

256
const Subscription* SubscriptionSet::find(StringData name) const
257
{
64✔
258
    for (auto&& sub : *this) {
92✔
259
        if (sub.name == name)
92✔
260
            return &sub;
48✔
261
    }
92✔
262
    return nullptr;
16✔
263
}
64✔
264

265
const Subscription* SubscriptionSet::find(const Query& query) const
266
{
48✔
267
    const auto query_desc = query.get_description();
48✔
268
    const auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
48✔
269
    for (auto&& sub : *this) {
60✔
270
        if (sub.object_class_name == table_name && sub.query_string == query_desc)
60✔
271
            return &sub;
48✔
272
    }
60✔
273
    return nullptr;
×
274
}
48✔
275

276
MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, TransactionRef tr, Obj obj)
277
    : SubscriptionSet(mgr, *tr, obj, MakingMutableCopy{true})
824✔
278
    , m_tr(std::move(tr))
824✔
279
    , m_obj(std::move(obj))
824✔
280
{
1,650✔
281
}
1,650✔
282

283
void MutableSubscriptionSet::check_is_mutable() const
284
{
2,922✔
285
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
2,922✔
286
        throw WrongTransactionState("Not a write transaction");
12✔
287
    }
12✔
288
}
2,922✔
289

290
// This uses the 'swap and pop' idiom to run in constant time.
291
// The iterator returned is:
292
//  1. end(), if the last subscription is removed
293
//  2. same iterator it is passed (but pointing to the last subscription in set), otherwise
294
MutableSubscriptionSet::iterator MutableSubscriptionSet::erase(const_iterator it)
295
{
56✔
296
    check_is_mutable();
56✔
297
    REALM_ASSERT(it != end());
56✔
298
    if (it == std::prev(m_subs.end())) {
56✔
299
        m_subs.pop_back();
24✔
300
        return end();
24✔
301
    }
24✔
302
    auto back = std::prev(m_subs.end());
32✔
303
    // const_iterator to iterator in constant time (See https://stackoverflow.com/a/10669041)
304
    auto iterator = m_subs.erase(it, it);
32✔
305
    std::swap(*iterator, *back);
32✔
306
    m_subs.pop_back();
32✔
307
    return iterator;
32✔
308
}
56✔
309

310
bool MutableSubscriptionSet::erase(StringData name)
311
{
12✔
312
    check_is_mutable();
12✔
313
    auto ptr = find(name);
12✔
314
    if (!ptr)
12✔
315
        return false;
4✔
316
    auto it = m_subs.begin() + (ptr - &m_subs.front());
8✔
317
    erase(it);
8✔
318
    return true;
8✔
319
}
12✔
320

321
bool MutableSubscriptionSet::erase(const Query& query)
322
{
24✔
323
    check_is_mutable();
24✔
324
    auto ptr = find(query);
24✔
325
    if (!ptr)
24✔
326
        return false;
×
327
    auto it = m_subs.begin() + (ptr - &m_subs.front());
24✔
328
    erase(it);
24✔
329
    return true;
24✔
330
}
24✔
331

332
bool MutableSubscriptionSet::erase_by_class_name(StringData object_class_name)
333
{
28✔
334
    // TODO: Use std::erase_if when switching to C++20.
335
    auto it = std::remove_if(m_subs.begin(), m_subs.end(), [&object_class_name](const Subscription& sub) {
72✔
336
        return sub.object_class_name == object_class_name;
72✔
337
    });
72✔
338
    auto erased = end() - it;
28✔
339
    m_subs.erase(it, end());
28✔
340
    return erased > 0;
28✔
341
}
28✔
342

343
bool MutableSubscriptionSet::erase_by_id(ObjectId id)
344
{
12✔
345
    auto it = std::find_if(m_subs.begin(), m_subs.end(), [&id](const Subscription& sub) -> bool {
20✔
346
        return sub.id == id;
20✔
347
    });
20✔
348
    if (it == end()) {
12✔
349
        return false;
4✔
350
    }
4✔
351
    erase(it);
8✔
352
    return true;
8✔
353
}
12✔
354

355
void MutableSubscriptionSet::clear()
356
{
396✔
357
    check_is_mutable();
396✔
358
    m_subs.clear();
396✔
359
}
396✔
360

361
void MutableSubscriptionSet::insert_sub(const Subscription& sub)
362
{
636✔
363
    check_is_mutable();
636✔
364
    m_subs.push_back(sub);
636✔
365
}
636✔
366

367
std::pair<SubscriptionSet::iterator, bool>
368
MutableSubscriptionSet::insert_or_assign_impl(iterator it, util::Optional<std::string> name,
369
                                              std::string object_class_name, std::string query_str)
370
{
1,618✔
371
    check_is_mutable();
1,618✔
372
    if (it != end()) {
1,618✔
373
        auto& sub = m_subs[it - begin()];
44✔
374
        sub.object_class_name = std::move(object_class_name);
44✔
375
        sub.query_string = std::move(query_str);
44✔
376
        sub.updated_at = Timestamp{std::chrono::system_clock::now()};
44✔
377

378
        return {it, false};
44✔
379
    }
44✔
380
    it = m_subs.insert(m_subs.end(),
1,574✔
381
                       Subscription(std::move(name), std::move(object_class_name), std::move(query_str)));
1,574✔
382

383
    return {it, true};
1,574✔
384
}
1,618✔
385

386
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(std::string_view name,
387
                                                                                    const Query& query)
388
{
188✔
389
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
188✔
390
    auto query_str = query.get_description();
188✔
391
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
188✔
392
        return sub.name == name;
148✔
393
    });
148✔
394

395
    return insert_or_assign_impl(it, std::string{name}, std::move(table_name), std::move(query_str));
188✔
396
}
188✔
397

398
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(const Query& query)
399
{
1,430✔
400
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
1,430✔
401
    auto query_str = query.get_description();
1,430✔
402
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
1,430✔
403
        return (!sub.name && sub.object_class_name == table_name && sub.query_string == query_str);
552✔
404
    });
552✔
405

406
    return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str));
1,430✔
407
}
1,430✔
408

409
void MutableSubscriptionSet::import(SubscriptionSet&& src_subs)
410
{
180✔
411
    check_is_mutable();
180✔
412
    SubscriptionSet::import(std::move(src_subs));
180✔
413
}
180✔
414

415
void SubscriptionSet::import(SubscriptionSet&& src_subs)
416
{
180✔
417
    m_subs = std::move(src_subs.m_subs);
180✔
418
}
180✔
419

420
void MutableSubscriptionSet::set_state(State new_state)
421
{
56✔
422
    REALM_ASSERT(m_state == State::Uncommitted);
56✔
423
    m_state = new_state;
56✔
424
}
56✔
425

426
MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const
427
{
1,650✔
428
    auto mgr = get_flx_subscription_store(); // Throws
1,650✔
429
    return mgr->make_mutable_copy(*this);
1,650✔
430
}
1,650✔
431

432
void SubscriptionSet::refresh()
433
{
48✔
434
    auto mgr = get_flx_subscription_store(); // Throws
48✔
435
    if (mgr->would_refresh(m_cur_version)) {
48✔
436
        *this = mgr->get_refreshed(m_obj_key, version());
36✔
437
    }
36✔
438
}
48✔
439

440
util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
441
{
1,316✔
442
    auto mgr = get_flx_subscription_store(); // Throws
1,316✔
443

444
    util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex);
1,316✔
445
    State cur_state = state();
1,316✔
446
    std::string err_str = error_str();
1,316✔
447

448
    // If there have been writes to the database since this SubscriptionSet was created, we need to fetch
449
    // the updated version from the DB to know the true current state and maybe return a ready future.
450
    if (m_cur_version < mgr->m_db->get_version_of_latest_snapshot()) {
1,316✔
451
        auto refreshed_self = mgr->get_refreshed(m_obj_key, version());
40✔
452
        cur_state = refreshed_self.state();
40✔
453
        err_str = refreshed_self.error_str();
40✔
454
    }
40✔
455
    // If we've already reached the desired state, or if the subscription is in an error state,
456
    // we can return a ready future immediately.
457
    if (cur_state == State::Error) {
1,316✔
458
        return util::Future<State>::make_ready(Status{ErrorCodes::SubscriptionFailed, err_str});
4✔
459
    }
4✔
460
    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
1,312✔
461
        return util::Future<State>::make_ready(cur_state);
88✔
462
    }
88✔
463

464
    // Otherwise, make a promise/future pair and add it to the list of pending notifications.
465
    auto [promise, future] = util::make_promise_future<State>();
1,224✔
466
    mgr->m_pending_notifications.emplace_back(version(), std::move(promise), notify_when);
1,224✔
467
    return std::move(future);
1,224✔
468
}
1,316✔
469

470
void SubscriptionSet::get_state_change_notification(
471
    State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> cb) const
472
{
×
473
    get_state_change_notification(notify_when).get_async([cb = std::move(cb)](StatusWith<State> result) {
×
474
        if (result.is_ok()) {
×
475
            cb(result.get_value(), {});
×
476
        }
×
477
        else {
×
478
            cb({}, result.get_status());
×
479
        }
×
480
    });
×
481
}
×
482

483
void SubscriptionStore::report_progress()
484
{
32✔
485
    TransactionRef tr;
32✔
486
    report_progress(tr);
32✔
487
}
32✔
488

489
void SubscriptionStore::report_progress(TransactionRef& tr)
490
{
38,460✔
491
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
38,460✔
492
    if (m_pending_notifications.empty())
38,460✔
493
        return;
24,238✔
494

495
    if (!tr)
14,222✔
496
        tr = m_db->start_read();
14,154✔
497
    auto sub_sets = tr->get_table(m_sub_set_table);
14,222✔
498

499
    struct NotificationCompletion {
14,222✔
500
        util::Promise<State> promise;
14,222✔
501
        std::string_view error_str;
14,222✔
502
        State state;
14,222✔
503
    };
14,222✔
504
    std::vector<NotificationCompletion> to_finish;
14,222✔
505
    m_pending_notifications.remove_if([&](NotificationRequest& req) {
14,668✔
506
        Obj obj = sub_sets->get_object_with_primary_key(req.version);
14,668✔
507
        if (!obj) {
14,668✔
508
            to_finish.push_back({std::move(req.promise), {}, State::Superseded});
16✔
509
            return true;
16✔
510
        }
16✔
511

512
        auto state = state_from_storage(obj.get<int64_t>(m_sub_set_state));
14,652✔
513
        if (state_to_order(state) < state_to_order(req.notify_when))
14,652✔
514
            return false;
13,536✔
515

516
        std::string_view error_str;
1,116✔
517
        if (state == State::Error) {
1,116✔
518
            error_str = std::string_view(obj.get<StringData>(m_sub_set_error_str));
48✔
519
        }
48✔
520
        to_finish.push_back({std::move(req.promise), error_str, state});
1,116✔
521
        return true;
1,116✔
522
    });
14,652✔
523
    lk.unlock();
14,222✔
524

525
    for (auto& [promise, error_str, state] : to_finish) {
14,222✔
526
        if (state == State::Error) {
1,132✔
527
            promise.set_error({ErrorCodes::SubscriptionFailed, error_str});
48✔
528
        }
48✔
529
        else {
1,084✔
530
            promise.emplace_value(state);
1,084✔
531
        }
1,084✔
532
    }
1,132✔
533
}
14,222✔
534

535
int64_t SubscriptionStore::get_downloading_query_version(Transaction& tr) const
536
{
14,824✔
537
    auto sub_sets = tr.get_table(m_sub_set_table);
14,824✔
538
    ObjKey key;
14,824✔
539
    const Mixed states[] = {
14,824✔
540
        state_to_storage(SubscriptionSet::State::Complete),
14,824✔
541
        state_to_storage(SubscriptionSet::State::AwaitingMark),
14,824✔
542
        state_to_storage(SubscriptionSet::State::Bootstrapping),
14,824✔
543
    };
14,824✔
544
    sub_sets->where().in(m_sub_set_state, std::begin(states), std::end(states)).max(m_sub_set_version_num, &key);
14,824✔
545
    return key ? sub_sets->get_object(key).get<int64_t>(m_sub_set_version_num) : 0;
14,824✔
546
}
14,824✔
547

548
SubscriptionSet MutableSubscriptionSet::commit()
549
{
1,582✔
550
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
1,582✔
551
        throw LogicError(ErrorCodes::WrongTransactionState, "SubscriptionSet has already been committed");
×
552
    }
×
553
    auto mgr = get_flx_subscription_store(); // Throws
1,582✔
554

555
    if (m_state == State::Uncommitted) {
1,582✔
556
        m_state = State::Pending;
1,526✔
557
    }
1,526✔
558
    m_obj.set(mgr->m_sub_set_snapshot_version, static_cast<int64_t>(m_tr->get_version()));
1,582✔
559

560
    auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions);
1,582✔
561
    obj_sub_list.clear();
1,582✔
562
    for (const auto& sub : m_subs) {
2,018✔
563
        auto new_sub = obj_sub_list.create_and_insert_linked_object(obj_sub_list.size());
2,018✔
564
        new_sub.set(mgr->m_sub_id, sub.id);
2,018✔
565
        new_sub.set(mgr->m_sub_created_at, sub.created_at);
2,018✔
566
        new_sub.set(mgr->m_sub_updated_at, sub.updated_at);
2,018✔
567
        if (sub.name) {
2,018✔
568
            new_sub.set(mgr->m_sub_name, StringData(*sub.name));
256✔
569
        }
256✔
570
        new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name));
2,018✔
571
        new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string));
2,018✔
572
    }
2,018✔
573
    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
1,582✔
574

575
    const auto flx_version = version();
1,582✔
576
    m_tr->commit_and_continue_as_read();
1,582✔
577

578
    mgr->report_progress(m_tr);
1,582✔
579

580
    return mgr->get_refreshed(m_obj.get_key(), flx_version, m_tr->get_version_of_current_transaction());
1,582✔
581
}
1,582✔
582

583
std::string SubscriptionSet::to_ext_json() const
584
{
3,174✔
585
    if (m_subs.empty()) {
3,174✔
586
        return "{}";
1,272✔
587
    }
1,272✔
588

589
    util::FlatMap<std::string, std::vector<std::string>> table_to_query;
1,902✔
590
    for (const auto& sub : *this) {
2,514✔
591
        std::string table_name(sub.object_class_name);
2,514✔
592
        auto& queries_for_table = table_to_query.at(table_name);
2,514✔
593
        auto query_it = std::find(queries_for_table.begin(), queries_for_table.end(), sub.query_string);
2,514✔
594
        if (query_it != queries_for_table.end()) {
2,514✔
595
            continue;
8✔
596
        }
8✔
597
        queries_for_table.emplace_back(sub.query_string);
2,506✔
598
    }
2,506✔
599

600
    if (table_to_query.empty()) {
1,902✔
UNCOV
601
        return "{}";
×
UNCOV
602
    }
×
603

604
    // TODO this is pulling in a giant compile-time dependency. We should have a better way of escaping the
605
    // query strings into a json object.
606
    nlohmann::json output_json;
1,902✔
607
    for (auto& table : table_to_query) {
2,418✔
608
        // We want to make sure that the queries appear in some kind of canonical order so that if there are
609
        // two subscription sets with the same subscriptions in different orders, the server doesn't have to
610
        // waste a bunch of time re-running the queries for that table.
611
        std::stable_sort(table.second.begin(), table.second.end());
2,418✔
612

613
        bool is_first = true;
2,418✔
614
        std::ostringstream obuf;
2,418✔
615
        for (const auto& query_str : table.second) {
2,506✔
616
            if (!is_first) {
2,506✔
617
                obuf << " OR ";
88✔
618
            }
88✔
619
            is_first = false;
2,506✔
620
            obuf << "(" << query_str << ")";
2,506✔
621
        }
2,506✔
622
        output_json[table.first] = obuf.str();
2,418✔
623
    }
2,418✔
624

625
    return output_json.dump();
1,902✔
626
}
1,902✔
627

628
SubscriptionStoreRef SubscriptionStore::create(DBRef db)
629
{
1,586✔
630
    return std::make_shared<SubscriptionStore>(Private(), std::move(db));
1,586✔
631
}
1,586✔
632

633
SubscriptionStore::SubscriptionStore(Private, DBRef db)
634
    : m_db(std::move(db))
794✔
635
{
1,586✔
636
    std::vector<SyncMetadataTable> internal_tables{
1,586✔
637
        {&m_sub_set_table,
1,586✔
638
         c_flx_subscription_sets_table,
1,586✔
639
         {&m_sub_set_version_num, c_flx_sub_sets_version_field, type_Int},
1,586✔
640
         {
1,586✔
641
             {&m_sub_set_state, c_flx_sub_sets_state_field, type_Int},
1,586✔
642
             {&m_sub_set_snapshot_version, c_flx_sub_sets_snapshot_version_field, type_Int},
1,586✔
643
             {&m_sub_set_error_str, c_flx_sub_sets_error_str_field, type_String, true},
1,586✔
644
             {&m_sub_set_subscriptions, c_flx_sub_sets_subscriptions_field, c_flx_subscriptions_table, true},
1,586✔
645
         }},
1,586✔
646
        {&m_sub_table,
1,586✔
647
         c_flx_subscriptions_table,
1,586✔
648
         SyncMetadataTable::IsEmbeddedTag{},
1,586✔
649
         {
1,586✔
650
             {&m_sub_id, c_flx_sub_id_field, type_ObjectId},
1,586✔
651
             {&m_sub_created_at, c_flx_sub_created_at_field, type_Timestamp},
1,586✔
652
             {&m_sub_updated_at, c_flx_sub_updated_at_field, type_Timestamp},
1,586✔
653
             {&m_sub_name, c_flx_sub_name_field, type_String, true},
1,586✔
654
             {&m_sub_object_class_name, c_flx_sub_object_class_field, type_String},
1,586✔
655
             {&m_sub_query_str, c_flx_sub_query_str_field, type_String},
1,586✔
656
         }},
1,586✔
657
    };
1,586✔
658

659
    auto tr = m_db->start_read();
1,586✔
660
    // Start with a reader so it doesn't try to write until we are ready
661
    SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
1,586✔
662

663
    if (auto schema_version =
1,586✔
664
            schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_subscription_store)) {
1,586✔
665
        if (*schema_version != c_flx_schema_version) {
286✔
UNCOV
666
            throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
×
UNCOV
667
                               "Invalid schema version for flexible sync metadata");
×
UNCOV
668
        }
×
669
        load_sync_metadata_schema(tr, &internal_tables);
286✔
670
    }
286✔
671
    else {
1,300✔
672
        tr->promote_to_write();
1,300✔
673
        // Ensure the schema versions table is initialized (may add its own commit)
674
        SyncMetadataSchemaVersions schema_versions(tr);
1,300✔
675
        // Create the metadata schema and set the version (in the same commit)
676
        schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version);
1,300✔
677
        create_sync_metadata_schema(tr, &internal_tables);
1,300✔
678
        tr->commit_and_continue_as_read();
1,300✔
679
    }
1,300✔
680
    REALM_ASSERT(m_sub_set_table);
1,586✔
681

682
    // Make sure the subscription set table is properly initialized
683
    initialize_subscriptions_table(std::move(tr));
1,586✔
684
}
1,586✔
685

686
void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr)
687
{
1,586✔
688
    if (auto sub_sets = tr->get_table(m_sub_set_table); sub_sets->is_empty()) {
1,586✔
689
        tr->promote_to_write();
1,300✔
690
        clear(*tr);
1,300✔
691
        tr->commit();
1,300✔
692
    }
1,300✔
693
}
1,586✔
694

695
void SubscriptionStore::clear(Transaction& wt)
696
{
1,320✔
697
    auto sub_sets = wt.get_table(m_sub_set_table);
1,320✔
698
    sub_sets->clear();
1,320✔
699
    // There should always be at least one subscription set so that the user can always wait
700
    // for synchronizationon on the result of get_latest().
701
    auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
1,320✔
702
    zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
1,320✔
703
    zero_sub.set(m_sub_set_snapshot_version, wt.get_version());
1,320✔
704
}
1,320✔
705

706
SubscriptionSet SubscriptionStore::get_latest()
707
{
2,354✔
708
    auto tr = m_db->start_frozen();
2,354✔
709
    auto sub_sets = tr->get_table(m_sub_set_table);
2,354✔
710
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
711
    REALM_ASSERT(!sub_sets->is_empty());
2,354✔
712

713
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
2,354✔
714
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
2,354✔
715

716
    return SubscriptionSet(weak_from_this(), *tr, latest_obj);
2,354✔
717
}
2,354✔
718

719
SubscriptionSet SubscriptionStore::get_active()
720
{
2,216✔
721
    auto tr = m_db->start_frozen();
2,216✔
722
    return SubscriptionSet(weak_from_this(), *tr, get_active(*tr));
2,216✔
723
}
2,216✔
724

725
Obj SubscriptionStore::get_active(const Transaction& tr)
726
{
5,398✔
727
    auto sub_sets = tr.get_table(m_sub_set_table);
5,398✔
728
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
729
    REALM_ASSERT(!sub_sets->is_empty());
5,398✔
730

731
    ObjKey key;
5,398✔
732
    sub_sets->where()
5,398✔
733
        .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
5,398✔
734
        .Or()
5,398✔
735
        .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
5,398✔
736
        .max(m_sub_set_version_num, &key);
5,398✔
737

738
    // If there is no active subscription yet, return the zeroth subscription.
739
    return key ? sub_sets->get_object(key) : sub_sets->get_object_with_primary_key(0);
5,398✔
740
}
5,398✔
741

742
SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
743
{
88✔
744
    auto tr = m_db->start_read();
88✔
745
    auto sub_sets = tr->get_table(m_sub_set_table);
88✔
746
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
747
    REALM_ASSERT(!sub_sets->is_empty());
88✔
748

749
    auto get = [](Mixed m) {
176✔
750
        return m.is_type(type_Int) ? m.get_int() : SubscriptionSet::EmptyVersion;
176✔
751
    };
176✔
752

753
    VersionInfo ret;
88✔
754
    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
88✔
755
    ret.active = get(*sub_sets->where()
88✔
756
                          .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
88✔
757
                          .Or()
88✔
758
                          .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
88✔
759
                          .max(m_sub_set_version_num));
88✔
760
    ret.pending_mark = get(*sub_sets->where()
88✔
761
                                .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
88✔
762
                                .max(m_sub_set_version_num));
88✔
763
    return ret;
88✔
764
}
88✔
765

766
util::Optional<SubscriptionStore::PendingSubscription>
767
SubscriptionStore::get_next_pending_version(int64_t last_query_version) const
768
{
18,150✔
769
    auto tr = m_db->start_read();
18,150✔
770
    auto sub_sets = tr->get_table(m_sub_set_table);
18,150✔
771
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
772
    REALM_ASSERT(!sub_sets->is_empty());
18,150✔
773

774
    ObjKey key;
18,150✔
775
    sub_sets->where()
18,150✔
776
        .greater(sub_sets->get_primary_key_column(), last_query_version)
18,150✔
777
        .group()
18,150✔
778
        .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
18,150✔
779
        .Or()
18,150✔
780
        .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
18,150✔
781
        .end_group()
18,150✔
782
        .min(m_sub_set_version_num, &key);
18,150✔
783

784
    if (!key) {
18,150✔
785
        return util::none;
15,620✔
786
    }
15,620✔
787

788
    auto obj = sub_sets->get_object(key);
2,530✔
789
    auto query_version = obj.get_primary_key().get_int();
2,530✔
790
    auto snapshot_version = obj.get<int64_t>(m_sub_set_snapshot_version);
2,530✔
791
    return PendingSubscription{query_version, static_cast<DB::version_type>(snapshot_version)};
2,530✔
792
}
18,150✔
793

794
std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions()
795
{
24✔
796
    std::vector<SubscriptionSet> subscriptions_to_recover;
24✔
797
    auto active_sub = get_active();
24✔
798
    auto cur_query_version = active_sub.version();
24✔
799
    // get a copy of the pending subscription sets since the active version
800
    while (auto next_pending = get_next_pending_version(cur_query_version)) {
72✔
801
        cur_query_version = next_pending->query_version;
48✔
802
        subscriptions_to_recover.push_back(get_by_version(cur_query_version));
48✔
803
    }
48✔
804
    return subscriptions_to_recover;
24✔
805
}
24✔
806

807
void SubscriptionStore::notify_all_state_change_notifications(Status status)
808
{
1,590✔
809
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
1,590✔
810
    auto to_finish = std::move(m_pending_notifications);
1,590✔
811
    lk.unlock();
1,590✔
812

813
    // Just complete/cancel the pending notifications - this function does not alter the
814
    // state of any pending subscriptions
815
    for (auto& req : to_finish) {
1,590✔
816
        req.promise.set_error(status);
24✔
817
    }
24✔
818
}
1,590✔
819

820
void SubscriptionStore::reset(Transaction& wt)
821
{
20✔
822
    // Clear out and initialize the subscription store
823
    clear(wt);
20✔
824

825
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
20✔
826
    auto to_finish = std::move(m_pending_notifications);
20✔
827
    lk.unlock();
20✔
828

829
    for (auto& req : to_finish) {
20✔
830
        req.promise.emplace_value(SubscriptionSet::State::Superseded);
16✔
831
    }
16✔
832
}
20✔
833

834
void SubscriptionStore::begin_bootstrap(const Transaction& tr, int64_t query_version)
835
{
2,472✔
836
    auto sub_sets = tr.get_table(m_sub_set_table);
2,472✔
837
    REALM_ASSERT(!sub_sets->is_empty());
2,472✔
838
    Obj obj = sub_sets->get_object_with_primary_key(query_version);
2,472✔
839
    if (!obj) {
2,472✔
840
        throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
8✔
841
                           util::format("Received bootstrap for nonexistent query version %1", query_version));
8✔
842
    }
8✔
843

844
    switch (auto old_state = state_from_storage(obj.get<int64_t>(m_sub_set_state))) {
2,464✔
845
        case State::Complete:
68✔
846
        case State::AwaitingMark:
68✔
847
            // Once bootstrapping has completed it remains complete even if the
848
            // server decides to send us more bootstrap messages
849
            return;
68✔
850

851
        case State::Pending:
2,392✔
852
            obj.set(m_sub_set_state, state_to_storage(State::Bootstrapping));
2,392✔
853
            break;
2,392✔
854

855
        case State::Error: {
4✔
856
            auto error = obj.get<String>(m_sub_set_error_str);
4✔
857
            throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
4✔
858
                               util::format("Received bootstrap for query version %1 after receiving the error '%2'",
4✔
859
                                            query_version, error));
4✔
860
        }
68✔
861

UNCOV
862
        default:
✔
863
            // Any other state is an internal bug of some sort
UNCOV
864
            REALM_ASSERT_EX(false, old_state);
×
UNCOV
865
            static_cast<void>(old_state);
×
866
    }
2,464✔
867
}
2,464✔
868

869
void SubscriptionStore::do_complete_bootstrap(const Transaction& tr, int64_t query_version, State new_state)
870
{
2,444✔
871
    auto sub_sets = tr.get_table(m_sub_set_table);
2,444✔
872
    REALM_ASSERT(!sub_sets->is_empty());
2,444✔
873
    Obj obj = sub_sets->get_object_with_primary_key(query_version);
2,444✔
874
    // The sub set object being deleted while we're in the middle of applying
875
    // a bootstrap would be an internal bug
876
    REALM_ASSERT(obj);
2,444✔
877

878
    switch (auto old_state = state_from_storage(obj.get<int64_t>(m_sub_set_state))) {
2,444✔
879
        case State::Complete:
68✔
880
        case State::AwaitingMark:
68✔
881
            // We were applying a bootstrap for a subscription which had already
882
            // completed applying a bootstrap, which means something like a
883
            // permission change occurred server-side which triggered a rebootstrap.
884
            return;
68✔
885

886
        case State::Bootstrapping:
2,376✔
887
            obj.set(m_sub_set_state, state_to_storage(new_state));
2,376✔
888
            break;
2,376✔
889

UNCOV
890
        default:
✔
891
            // Any other state is an internal bug of some sort
UNCOV
892
            REALM_ASSERT_EX(false, old_state);
×
UNCOV
893
            static_cast<void>(old_state);
×
894
    }
2,444✔
895

896
    // Supersede all older subscription sets
897
    if (new_state == State::AwaitingMark) {
2,376✔
898
        sub_sets->where().less(m_sub_set_version_num, query_version).remove();
2,352✔
899
    }
2,352✔
900
}
2,376✔
901

902
void SubscriptionStore::complete_bootstrap(const Transaction& tr, int64_t query_version)
903
{
2,420✔
904
    do_complete_bootstrap(tr, query_version, State::AwaitingMark);
2,420✔
905
}
2,420✔
906

907
void SubscriptionStore::cancel_bootstrap(const Transaction& tr, int64_t query_version)
908
{
24✔
909
    do_complete_bootstrap(tr, query_version, State::Pending);
24✔
910
}
24✔
911

912
void SubscriptionStore::set_error(int64_t query_version, std::string_view error_str)
913
{
36✔
914
    auto tr = m_db->start_write();
36✔
915
    auto sub_sets = tr->get_table(m_sub_set_table);
36✔
916
    auto obj = sub_sets->get_object_with_primary_key(query_version);
36✔
917
    if (!obj) {
36✔
918
        // This can happen either due to a bug in the sync client or due to the
919
        // server sending us an error message for an invalid query version. We
920
        // assume it is the latter here.
921
        throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
4✔
922
                           util::format("Invalid state update for nonexistent query version %1", query_version));
4✔
923
    }
4✔
924

925
    auto old_state = state_from_storage(obj.get<int64_t>(m_sub_set_state));
32✔
926
    if (old_state == State::Complete) {
32✔
927
        throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
4✔
928
                           util::format("Received error '%1' for already-completed query version %2. This "
4✔
929
                                        "may be due to a queryable field being removed in the server-side "
4✔
930
                                        "configuration making the previous subscription set no longer valid.",
4✔
931
                                        error_str, query_version));
4✔
932
    }
4✔
933

934
    obj.set(m_sub_set_state, state_to_storage(State::Error));
28✔
935
    obj.set(m_sub_set_error_str, error_str);
28✔
936
    tr->commit();
28✔
937
}
28✔
938

939
void SubscriptionStore::download_complete()
940
{
3,014✔
941
    auto tr = m_db->start_read();
3,014✔
942
    auto obj = get_active(*tr);
3,014✔
943
    if (state_from_storage(obj.get<int64_t>(m_sub_set_state)) != State::AwaitingMark)
3,014✔
944
        return;
776✔
945

946
    // Although subscription sets can be created from any thread or process,
947
    // they're only *modified* on the sync client thread, so we don't have to
948
    // recheck that things have changed after the promote to write
949
    tr->promote_to_write();
2,238✔
950
    obj.set(m_sub_set_state, state_to_storage(State::Complete));
2,238✔
951
    tr->commit();
2,238✔
952
}
2,238✔
953

954
SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id)
955
{
1,458✔
956
    auto tr = m_db->start_frozen();
1,458✔
957
    auto sub_sets = tr->get_table(m_sub_set_table);
1,458✔
958
    if (auto obj = sub_sets->get_object_with_primary_key(version_id)) {
1,458✔
959
        return SubscriptionSet(weak_from_this(), *tr, obj);
1,454✔
960
    }
1,454✔
961
    REALM_ASSERT(!sub_sets->is_empty());
4✔
962
    if (version_id < sub_sets->min(m_sub_set_version_num)->get_int()) {
4✔
963
        return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
4✔
964
    }
4✔
UNCOV
965
    throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
×
966
}
4✔
967

968
SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version, std::optional<DB::VersionID> db_version)
969
{
1,658✔
970
    auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
1,658✔
971
    auto sub_sets = tr->get_table(m_sub_set_table);
1,658✔
972
    if (auto obj = sub_sets->try_get_object(key)) {
1,658✔
973
        return SubscriptionSet(weak_from_this(), *tr, obj);
1,642✔
974
    }
1,642✔
975
    return SubscriptionSet(weak_from_this(), version, SubscriptionSet::SupersededTag{});
16✔
976
}
1,658✔
977

978
SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
979
{
15,492✔
980
    auto sub_sets = tr.get_table(m_sub_set_table);
15,492✔
981
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
982
    REALM_ASSERT(!sub_sets->is_empty());
15,492✔
983

984
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
15,492✔
985
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
15,492✔
986

987
    TableSet ret;
15,492✔
988
    auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
15,492✔
989
    for (size_t idx = 0; idx < subs.size(); ++idx) {
29,162✔
990
        auto sub_obj = subs.get_object(idx);
13,670✔
991
        ret.emplace(sub_obj.get<StringData>(m_sub_object_class_name));
13,670✔
992
    }
13,670✔
993

994
    return ret;
15,492✔
995
}
15,492✔
996

997
MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set)
998
{
1,650✔
999
    auto new_tr = m_db->start_write();
1,650✔
1000

1001
    auto sub_sets = new_tr->get_table(m_sub_set_table);
1,650✔
1002
    auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
1,650✔
1003

1004
    MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
1,650✔
1005
                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}));
1,650✔
1006
    for (const auto& sub : set) {
1,650✔
1007
        new_set_obj.insert_sub(sub);
588✔
1008
    }
588✔
1009

1010
    return new_set_obj;
1,650✔
1011
}
1,650✔
1012

1013
bool SubscriptionStore::would_refresh(DB::version_type version) const noexcept
1014
{
44✔
1015
    return version < m_db->get_version_of_latest_snapshot();
44✔
1016
}
44✔
1017

1018
int64_t SubscriptionStore::set_active_as_latest(Transaction& wt)
1019
{
48✔
1020
    auto sub_sets = wt.get_table(m_sub_set_table);
48✔
1021
    auto active = get_active(wt);
48✔
1022
    // Delete all newer subscription sets, if any
1023
    sub_sets->where().greater(sub_sets->get_primary_key_column(), active.get_primary_key().get_int()).remove();
48✔
1024
    // Mark the active set as complete even if it was previously WaitingForMark
1025
    // as we've completed rebootstrapping before calling this.
1026
    active.set(m_sub_set_state, state_to_storage(State::Complete));
48✔
1027
    auto version = active.get_primary_key().get_int();
48✔
1028

1029
    std::list<NotificationRequest> to_finish;
48✔
1030
    {
48✔
1031
        util::CheckedLockGuard lock(m_pending_notifications_mutex);
48✔
1032
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
48✔
1033
            if (req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete))
16✔
1034
                return true;
4✔
1035
            return req.version != version;
12✔
1036
        });
16✔
1037
    }
48✔
1038

1039
    for (auto& req : to_finish) {
48✔
1040
        req.promise.emplace_value(req.version == version ? State::Complete : State::Superseded);
16✔
1041
    }
16✔
1042

1043
    return version;
48✔
1044
}
48✔
1045

1046
int64_t SubscriptionStore::mark_active_as_complete(Transaction& wt)
1047
{
120✔
1048
    auto active = get_active(wt);
120✔
1049
    active.set(m_sub_set_state, state_to_storage(State::Complete));
120✔
1050
    auto version = active.get_primary_key().get_int();
120✔
1051

1052
    std::list<NotificationRequest> to_finish;
120✔
1053
    {
120✔
1054
        util::CheckedLockGuard lock(m_pending_notifications_mutex);
120✔
1055
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
120✔
1056
            return req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete);
16✔
1057
        });
16✔
1058
    }
120✔
1059

1060
    for (auto& req : to_finish) {
120✔
1061
        req.promise.emplace_value(State::Complete);
4✔
1062
    }
4✔
1063

1064
    return version;
120✔
1065
}
120✔
1066

1067
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc