• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_440

02 Jul 2024 07:51PM UTC coverage: 91.007% (+0.03%) from 90.974%
thomas.goyne_440

push

Evergreen

web-flow
[RCORE-2146] CAPI Remove `is_fatal` flag flip (#7751)

102408 of 180620 branches covered (56.7%)

0 of 1 new or added line in 1 file covered. (0.0%)

619 existing lines in 26 files now uncovered.

215623 of 236930 relevant lines covered (91.01%)

5563737.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.1
/src/realm/sync/subscriptions.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2021 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/subscriptions.hpp"
20

21
#include "external/json/json.hpp"
22

23
#include "realm/data_type.hpp"
24
#include "realm/keys.hpp"
25
#include "realm/list.hpp"
26
#include "realm/sort_descriptor.hpp"
27
#include "realm/sync/noinst/sync_metadata_schema.hpp"
28
#include "realm/table.hpp"
29
#include "realm/table_view.hpp"
30
#include "realm/transaction.hpp"
31
#include "realm/util/flat_map.hpp"
32

33
#include <algorithm>
34
#include <initializer_list>
35
#include <stdexcept>
36

37
namespace realm::sync {
38
namespace {
39
// Schema version history:
40
//   v2: Initial public beta.
41

42
constexpr static int c_flx_schema_version = 2;
43
constexpr static std::string_view c_flx_subscription_sets_table("flx_subscription_sets");
44
constexpr static std::string_view c_flx_subscriptions_table("flx_subscriptions");
45

46
constexpr static std::string_view c_flx_sub_sets_state_field("state");
47
constexpr static std::string_view c_flx_sub_sets_version_field("version");
48
constexpr static std::string_view c_flx_sub_sets_error_str_field("error");
49
constexpr static std::string_view c_flx_sub_sets_subscriptions_field("subscriptions");
50
constexpr static std::string_view c_flx_sub_sets_snapshot_version_field("snapshot_version");
51

52
constexpr static std::string_view c_flx_sub_id_field("id");
53
constexpr static std::string_view c_flx_sub_created_at_field("created_at");
54
constexpr static std::string_view c_flx_sub_updated_at_field("updated_at");
55
constexpr static std::string_view c_flx_sub_name_field("name");
56
constexpr static std::string_view c_flx_sub_object_class_field("object_class");
57
constexpr static std::string_view c_flx_sub_query_str_field("query");
58

59
using OptionalString = util::Optional<std::string>;
60

61
enum class SubscriptionStateForStorage : int64_t {
62
    // The subscription set has been persisted locally but has not been acknowledged by the server yet.
63
    Pending = 1,
64
    // The server is currently sending the initial state that represents this subscription set to the client.
65
    Bootstrapping = 2,
66
    // This subscription set is the active subscription set that is currently being synchronized with the server.
67
    Complete = 3,
68
    // An error occurred while processing this subscription set on the server. Check error_str() for details.
69
    Error = 4,
70
    // The last bootstrap message containing the initial state for this subscription set has been received. The
71
    // client is awaiting a mark message to mark this subscription as fully caught up to history.
72
    AwaitingMark = 6,
73
};
74

75
SubscriptionSet::State state_from_storage(int64_t value)
76
{
25,666✔
77
    switch (static_cast<SubscriptionStateForStorage>(value)) {
25,666✔
78
        case SubscriptionStateForStorage::Pending:
14,016✔
79
            return SubscriptionSet::State::Pending;
14,016✔
80
        case SubscriptionStateForStorage::Bootstrapping:
2,554✔
81
            return SubscriptionSet::State::Bootstrapping;
2,554✔
82
        case SubscriptionStateForStorage::AwaitingMark:
4,902✔
83
            return SubscriptionSet::State::AwaitingMark;
4,902✔
84
        case SubscriptionStateForStorage::Complete:
4,118✔
85
            return SubscriptionSet::State::Complete;
4,118✔
86
        case SubscriptionStateForStorage::Error:
76✔
87
            return SubscriptionSet::State::Error;
76✔
88
        default:
✔
89
            throw RuntimeError(ErrorCodes::InvalidArgument,
×
90
                               util::format("Invalid state for SubscriptionSet stored on disk: %1", value));
×
91
    }
25,666✔
92
}
25,666✔
93

94
int64_t state_to_storage(SubscriptionSet::State state)
95
{
42,334✔
96
    switch (state) {
42,334✔
97
        case SubscriptionSet::State::Pending:
14,266✔
98
            return static_cast<int64_t>(SubscriptionStateForStorage::Pending);
14,266✔
99
        case SubscriptionSet::State::Bootstrapping:
14,940✔
100
            return static_cast<int64_t>(SubscriptionStateForStorage::Bootstrapping);
14,940✔
101
        case SubscriptionSet::State::AwaitingMark:
6,554✔
102
            return static_cast<int64_t>(SubscriptionStateForStorage::AwaitingMark);
6,554✔
103
        case SubscriptionSet::State::Complete:
6,546✔
104
            return static_cast<int64_t>(SubscriptionStateForStorage::Complete);
6,546✔
105
        case SubscriptionSet::State::Error:
28✔
106
            return static_cast<int64_t>(SubscriptionStateForStorage::Error);
28✔
107
        default:
✔
108
            REALM_UNREACHABLE();
109
    }
42,334✔
110
}
42,334✔
111

112
size_t state_to_order(SubscriptionSet::State needle)
113
{
22,984✔
114
    using State = SubscriptionSet::State;
22,984✔
115
    switch (needle) {
22,984✔
116
        case State::Uncommitted:
88✔
117
            return 0;
88✔
118
        case State::Pending:
7,774✔
119
            return 1;
7,774✔
120
        case State::Bootstrapping:
396✔
121
            return 2;
396✔
122
        case State::AwaitingMark:
2,434✔
123
            return 3;
2,434✔
124
        case State::Complete:
12,100✔
125
            return 4;
12,100✔
126
        case State::Error:
136✔
127
            return 5;
136✔
128
        case State::Superseded:
56✔
129
            return 6;
56✔
130
    }
22,984✔
131
    REALM_UNREACHABLE();
132
}
×
133

134
template <typename T, typename Predicate>
135
void splice_if(std::list<T>& src, std::list<T>& dst, Predicate pred)
136
{
132✔
137
    for (auto it = src.begin(); it != src.end();) {
164✔
138
        if (pred(*it)) {
32✔
139
            dst.splice(dst.end(), src, it++);
20✔
140
        }
20✔
141
        else {
12✔
142
            ++it;
12✔
143
        }
12✔
144
    }
32✔
145
}
132✔
146

147
} // namespace
148

149
Subscription::Subscription(const SubscriptionStore* parent, Obj obj)
150
    : id(obj.get<ObjectId>(parent->m_sub_id))
4,198✔
151
    , created_at(obj.get<Timestamp>(parent->m_sub_created_at))
4,198✔
152
    , updated_at(obj.get<Timestamp>(parent->m_sub_updated_at))
4,198✔
153
    , name(obj.is_null(parent->m_sub_name) ? OptionalString(util::none)
4,198✔
154
                                           : OptionalString{obj.get<String>(parent->m_sub_name)})
4,198✔
155
    , object_class_name(obj.get<String>(parent->m_sub_object_class_name))
4,198✔
156
    , query_string(obj.get<String>(parent->m_sub_query_str))
4,198✔
157
{
8,396✔
158
}
8,396✔
159

160
Subscription::Subscription(util::Optional<std::string> name, std::string object_class_name, std::string query_str)
161
    : id(ObjectId::gen())
732✔
162
    , created_at(std::chrono::system_clock::now())
732✔
163
    , updated_at(created_at)
732✔
164
    , name(std::move(name))
732✔
165
    , object_class_name(std::move(object_class_name))
732✔
166
    , query_string(std::move(query_str))
732✔
167
{
1,466✔
168
}
1,466✔
169

170

171
SubscriptionSet::SubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, const Transaction& tr, const Obj& obj,
172
                                 MakingMutableCopy making_mutable_copy)
173
    : m_mgr(mgr)
5,110✔
174
    , m_cur_version(tr.get_version())
5,110✔
175
    , m_version(obj.get_primary_key().get_int())
5,110✔
176
    , m_obj_key(obj.get_key())
5,110✔
177
{
10,218✔
178
    REALM_ASSERT(obj.is_valid());
10,218✔
179
    if (!making_mutable_copy) {
10,218✔
180
        load_from_database(obj);
8,756✔
181
    }
8,756✔
182
}
10,218✔
183

184
SubscriptionSet::SubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, int64_t version, SupersededTag)
185
    : m_mgr(mgr)
10✔
186
    , m_version(version)
10✔
187
    , m_state(State::Superseded)
10✔
188
{
20✔
189
}
20✔
190

191
void SubscriptionSet::load_from_database(const Obj& obj)
192
{
8,756✔
193
    auto mgr = get_flx_subscription_store(); // Throws
8,756✔
194

195
    m_state = state_from_storage(obj.get<int64_t>(mgr->m_sub_set_state));
8,756✔
196
    m_error_str = obj.get<String>(mgr->m_sub_set_error_str);
8,756✔
197
    m_snapshot_version = static_cast<DB::version_type>(obj.get<int64_t>(mgr->m_sub_set_snapshot_version));
8,756✔
198
    auto sub_list = obj.get_linklist(mgr->m_sub_set_subscriptions);
8,756✔
199
    m_subs.clear();
8,756✔
200
    for (size_t idx = 0; idx < sub_list.size(); ++idx) {
17,152✔
201
        m_subs.push_back(Subscription(mgr.get(), sub_list.get_object(idx)));
8,396✔
202
    }
8,396✔
203
}
8,756✔
204

205
std::shared_ptr<SubscriptionStore> SubscriptionSet::get_flx_subscription_store() const
206
{
12,800✔
207
    if (auto mgr = m_mgr.lock()) {
12,800✔
208
        return mgr;
12,796✔
209
    }
12,796✔
210
    throw RuntimeError(ErrorCodes::BrokenInvariant, "Active SubscriptionSet without a SubscriptionStore");
4✔
211
}
12,800✔
212

213
int64_t SubscriptionSet::version() const
214
{
13,084✔
215
    return m_version;
13,084✔
216
}
13,084✔
217

218
DB::version_type SubscriptionSet::snapshot_version() const
219
{
1,158✔
220
    return m_snapshot_version;
1,158✔
221
}
1,158✔
222

223
SubscriptionSet::State SubscriptionSet::state() const
224
{
1,532✔
225
    return m_state;
1,532✔
226
}
1,532✔
227

228
StringData SubscriptionSet::error_str() const
229
{
1,196✔
230
    if (m_error_str.empty()) {
1,196✔
231
        return StringData{};
1,184✔
232
    }
1,184✔
233
    return m_error_str;
12✔
234
}
1,196✔
235

236
size_t SubscriptionSet::size() const
237
{
308✔
238
    return m_subs.size();
308✔
239
}
308✔
240

241
const Subscription& SubscriptionSet::at(size_t index) const
242
{
36✔
243
    return m_subs.at(index);
36✔
244
}
36✔
245

246
SubscriptionSet::const_iterator SubscriptionSet::begin() const
247
{
4,730✔
248
    return m_subs.begin();
4,730✔
249
}
4,730✔
250

251
SubscriptionSet::const_iterator SubscriptionSet::end() const
252
{
6,276✔
253
    return m_subs.end();
6,276✔
254
}
6,276✔
255

256
const Subscription* SubscriptionSet::find(StringData name) const
257
{
64✔
258
    for (auto&& sub : *this) {
92✔
259
        if (sub.name == name)
92✔
260
            return &sub;
48✔
261
    }
92✔
262
    return nullptr;
16✔
263
}
64✔
264

265
const Subscription* SubscriptionSet::find(const Query& query) const
266
{
48✔
267
    const auto query_desc = query.get_description();
48✔
268
    const auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
48✔
269
    for (auto&& sub : *this) {
60✔
270
        if (sub.object_class_name == table_name && sub.query_string == query_desc)
60✔
271
            return &sub;
48✔
272
    }
60✔
273
    return nullptr;
×
274
}
48✔
275

276
MutableSubscriptionSet::MutableSubscriptionSet(std::weak_ptr<SubscriptionStore> mgr, TransactionRef tr, Obj obj)
277
    : SubscriptionSet(mgr, *tr, obj, MakingMutableCopy{true})
730✔
278
    , m_tr(std::move(tr))
730✔
279
    , m_obj(std::move(obj))
730✔
280
{
1,462✔
281
}
1,462✔
282

283
void MutableSubscriptionSet::check_is_mutable() const
284
{
2,634✔
285
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
2,634✔
286
        throw WrongTransactionState("Not a write transaction");
12✔
287
    }
12✔
288
}
2,634✔
289

290
// This uses the 'swap and pop' idiom to run in constant time.
291
// The iterator returned is:
292
//  1. end(), if the last subscription is removed
293
//  2. same iterator it is passed (but pointing to the last subscription in set), otherwise
294
MutableSubscriptionSet::iterator MutableSubscriptionSet::erase(const_iterator it)
295
{
56✔
296
    check_is_mutable();
56✔
297
    REALM_ASSERT(it != end());
56✔
298
    if (it == std::prev(m_subs.end())) {
56✔
299
        m_subs.pop_back();
24✔
300
        return end();
24✔
301
    }
24✔
302
    auto back = std::prev(m_subs.end());
32✔
303
    // const_iterator to iterator in constant time (See https://stackoverflow.com/a/10669041)
304
    auto iterator = m_subs.erase(it, it);
32✔
305
    std::swap(*iterator, *back);
32✔
306
    m_subs.pop_back();
32✔
307
    return iterator;
32✔
308
}
56✔
309

310
bool MutableSubscriptionSet::erase(StringData name)
311
{
12✔
312
    check_is_mutable();
12✔
313
    auto ptr = find(name);
12✔
314
    if (!ptr)
12✔
315
        return false;
4✔
316
    auto it = m_subs.begin() + (ptr - &m_subs.front());
8✔
317
    erase(it);
8✔
318
    return true;
8✔
319
}
12✔
320

321
bool MutableSubscriptionSet::erase(const Query& query)
322
{
24✔
323
    check_is_mutable();
24✔
324
    auto ptr = find(query);
24✔
325
    if (!ptr)
24✔
326
        return false;
×
327
    auto it = m_subs.begin() + (ptr - &m_subs.front());
24✔
328
    erase(it);
24✔
329
    return true;
24✔
330
}
24✔
331

332
bool MutableSubscriptionSet::erase_by_class_name(StringData object_class_name)
333
{
28✔
334
    // TODO: Use std::erase_if when switching to C++20.
335
    auto it = std::remove_if(m_subs.begin(), m_subs.end(), [&object_class_name](const Subscription& sub) {
72✔
336
        return sub.object_class_name == object_class_name;
72✔
337
    });
72✔
338
    auto erased = end() - it;
28✔
339
    m_subs.erase(it, end());
28✔
340
    return erased > 0;
28✔
341
}
28✔
342

343
bool MutableSubscriptionSet::erase_by_id(ObjectId id)
344
{
12✔
345
    auto it = std::find_if(m_subs.begin(), m_subs.end(), [&id](const Subscription& sub) -> bool {
20✔
346
        return sub.id == id;
20✔
347
    });
20✔
348
    if (it == end()) {
12✔
349
        return false;
4✔
350
    }
4✔
351
    erase(it);
8✔
352
    return true;
8✔
353
}
12✔
354

355
void MutableSubscriptionSet::clear()
356
{
328✔
357
    check_is_mutable();
328✔
358
    m_subs.clear();
328✔
359
}
328✔
360

361
void MutableSubscriptionSet::insert_sub(const Subscription& sub)
362
{
604✔
363
    check_is_mutable();
604✔
364
    m_subs.push_back(sub);
604✔
365
}
604✔
366

367
std::pair<SubscriptionSet::iterator, bool>
368
MutableSubscriptionSet::insert_or_assign_impl(iterator it, util::Optional<std::string> name,
369
                                              std::string object_class_name, std::string query_str)
370
{
1,466✔
371
    check_is_mutable();
1,466✔
372
    if (it != end()) {
1,466✔
373
        auto& sub = m_subs[it - begin()];
44✔
374
        sub.object_class_name = std::move(object_class_name);
44✔
375
        sub.query_string = std::move(query_str);
44✔
376
        sub.updated_at = Timestamp{std::chrono::system_clock::now()};
44✔
377

378
        return {it, false};
44✔
379
    }
44✔
380
    it = m_subs.insert(m_subs.end(),
1,422✔
381
                       Subscription(std::move(name), std::move(object_class_name), std::move(query_str)));
1,422✔
382

383
    return {it, true};
1,422✔
384
}
1,466✔
385

386
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(std::string_view name,
387
                                                                                    const Query& query)
388
{
188✔
389
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
188✔
390
    auto query_str = query.get_description();
188✔
391
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
188✔
392
        return sub.name == name;
148✔
393
    });
148✔
394

395
    return insert_or_assign_impl(it, std::string{name}, std::move(table_name), std::move(query_str));
188✔
396
}
188✔
397

398
std::pair<SubscriptionSet::iterator, bool> MutableSubscriptionSet::insert_or_assign(const Query& query)
399
{
1,278✔
400
    auto table_name = Group::table_name_to_class_name(query.get_table()->get_name());
1,278✔
401
    auto query_str = query.get_description();
1,278✔
402
    auto it = std::find_if(begin(), end(), [&](const Subscription& sub) {
1,278✔
403
        return (!sub.name && sub.object_class_name == table_name && sub.query_string == query_str);
536✔
404
    });
536✔
405

406
    return insert_or_assign_impl(it, util::none, std::move(table_name), std::move(query_str));
1,278✔
407
}
1,278✔
408

409
void MutableSubscriptionSet::import(SubscriptionSet&& src_subs)
410
{
144✔
411
    check_is_mutable();
144✔
412
    SubscriptionSet::import(std::move(src_subs));
144✔
413
}
144✔
414

415
void SubscriptionSet::import(SubscriptionSet&& src_subs)
416
{
144✔
417
    m_subs = std::move(src_subs.m_subs);
144✔
418
}
144✔
419

420
void MutableSubscriptionSet::set_state(State new_state)
421
{
56✔
422
    REALM_ASSERT(m_state == State::Uncommitted);
56✔
423
    m_state = new_state;
56✔
424
}
56✔
425

426
MutableSubscriptionSet SubscriptionSet::make_mutable_copy() const
427
{
1,462✔
428
    auto mgr = get_flx_subscription_store(); // Throws
1,462✔
429
    return mgr->make_mutable_copy(*this);
1,462✔
430
}
1,462✔
431

432
void SubscriptionSet::refresh()
433
{
48✔
434
    auto mgr = get_flx_subscription_store(); // Throws
48✔
435
    if (mgr->would_refresh(m_cur_version)) {
48✔
436
        *this = mgr->get_refreshed(m_obj_key, version());
36✔
437
    }
36✔
438
}
48✔
439

440
util::Future<SubscriptionSet::State> SubscriptionSet::get_state_change_notification(State notify_when) const
441
{
1,140✔
442
    auto mgr = get_flx_subscription_store(); // Throws
1,140✔
443

444
    util::CheckedLockGuard lk(mgr->m_pending_notifications_mutex);
1,140✔
445
    // If we've already been superseded by another version getting completed, then we should skip registering
1,140✔
446
    // a notification because it may never fire.
1,140✔
447
    if (mgr->m_min_outstanding_version > version()) {
448
        return util::Future<State>::make_ready(State::Superseded);
449
    }
450

1,140✔
451
    State cur_state = state();
40✔
452
    std::string err_str = error_str();
40✔
453

40✔
454
    // If there have been writes to the database since this SubscriptionSet was created, we need to fetch
40✔
455
    // the updated version from the DB to know the true current state and maybe return a ready future.
456
    if (m_cur_version < mgr->m_db->get_version_of_latest_snapshot()) {
457
        auto refreshed_self = mgr->get_refreshed(m_obj_key, version());
1,140✔
458
        cur_state = refreshed_self.state();
4✔
459
        err_str = refreshed_self.error_str();
4✔
460
    }
1,136✔
461
    // If we've already reached the desired state, or if the subscription is in an error state,
90✔
462
    // we can return a ready future immediately.
90✔
463
    if (cur_state == State::Error) {
464
        return util::Future<State>::make_ready(Status{ErrorCodes::SubscriptionFailed, err_str});
465
    }
1,046✔
466
    else if (state_to_order(cur_state) >= state_to_order(notify_when)) {
1,046✔
467
        return util::Future<State>::make_ready(cur_state);
1,046✔
468
    }
1,140✔
469

470
    // Otherwise, make a promise/future pair and add it to the list of pending notifications.
471
    auto [promise, future] = util::make_promise_future<State>();
UNCOV
472
    mgr->m_pending_notifications.emplace_back(version(), std::move(promise), notify_when);
×
UNCOV
473
    return std::move(future);
×
UNCOV
474
}
×
UNCOV
475

×
UNCOV
476
void SubscriptionSet::get_state_change_notification(
×
UNCOV
477
    State notify_when, util::UniqueFunction<void(util::Optional<State>, util::Optional<Status>)> cb) const
×
478
{
×
479
    get_state_change_notification(notify_when).get_async([cb = std::move(cb)](StatusWith<State> result) {
×
480
        if (result.is_ok()) {
×
481
            cb(result.get_value(), {});
×
482
        }
483
        else {
484
            cb({}, result.get_status());
30,872✔
485
        }
30,872✔
486
    });
30,872✔
487
}
20,948✔
488

489
void SubscriptionStore::process_notifications(State new_state, int64_t version, std::string_view error_str)
9,924✔
490
{
9,924✔
491
    std::list<SubscriptionStore::NotificationRequest> to_finish;
492
    {
9,924✔
493
        util::CheckedLockGuard lk(m_pending_notifications_mutex);
9,924✔
494
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
9,924✔
495
            return (req.version == version &&
9,924✔
496
                    (new_state == State::Error || state_to_order(new_state) >= state_to_order(req.notify_when))) ||
9,924✔
497
                   (new_state == State::Complete && req.version < version);
9,924✔
498
        });
10,364✔
499

10,364✔
500
        if (new_state == State::Complete) {
10,364✔
501
            m_min_outstanding_version = version;
16✔
502
        }
16✔
503
    }
16✔
504

505
    for (auto& req : to_finish) {
10,348✔
506
        if (new_state == State::Error && req.version == version) {
10,348✔
507
            req.promise.set_error({ErrorCodes::SubscriptionFailed, error_str});
9,410✔
508
        }
509
        else if (req.version < version) {
938✔
510
            req.promise.emplace_value(State::Superseded);
938✔
511
        }
48✔
512
        else {
48✔
513
            req.promise.emplace_value(new_state);
938✔
514
        }
938✔
515
    }
10,348✔
516
}
9,924✔
517

518
SubscriptionSet MutableSubscriptionSet::commit()
9,924✔
519
{
954✔
520
    if (m_tr->get_transact_stage() != DB::transact_Writing) {
48✔
521
        throw LogicError(ErrorCodes::WrongTransactionState, "SubscriptionSet has already been committed");
48✔
522
    }
906✔
523
    auto mgr = get_flx_subscription_store(); // Throws
906✔
524

906✔
525
    if (m_state == State::Uncommitted) {
954✔
526
        m_state = State::Pending;
9,924✔
527
    }
528
    m_obj.set(mgr->m_sub_set_snapshot_version, static_cast<int64_t>(m_tr->get_version()));
529

1,394✔
530
    auto obj_sub_list = m_obj.get_linklist(mgr->m_sub_set_subscriptions);
1,394✔
UNCOV
531
    obj_sub_list.clear();
×
UNCOV
532
    for (const auto& sub : m_subs) {
×
533
        auto new_sub = obj_sub_list.create_and_insert_linked_object(obj_sub_list.size());
1,394✔
534
        new_sub.set(mgr->m_sub_id, sub.id);
535
        new_sub.set(mgr->m_sub_created_at, sub.created_at);
1,394✔
536
        new_sub.set(mgr->m_sub_updated_at, sub.updated_at);
1,338✔
537
        if (sub.name) {
1,338✔
538
            new_sub.set(mgr->m_sub_name, StringData(*sub.name));
1,394✔
539
        }
540
        new_sub.set(mgr->m_sub_object_class_name, StringData(sub.object_class_name));
1,394✔
541
        new_sub.set(mgr->m_sub_query_str, StringData(sub.query_string));
1,394✔
542
    }
1,814✔
543
    m_obj.set(mgr->m_sub_set_state, state_to_storage(m_state));
1,814✔
544
    if (!m_error_str.empty()) {
1,814✔
545
        m_obj.set(mgr->m_sub_set_error_str, StringData(m_error_str));
1,814✔
546
    }
1,814✔
547

1,814✔
548
    const auto flx_version = version();
256✔
549
    m_tr->commit_and_continue_as_read();
256✔
550

1,814✔
551
    mgr->process_notifications(m_state, flx_version, std::string_view(error_str()));
1,814✔
552

1,814✔
553
    return mgr->get_refreshed(m_obj.get_key(), flx_version, m_tr->get_version_of_current_transaction());
1,394✔
554
}
1,394✔
UNCOV
555

×
UNCOV
556
std::string SubscriptionSet::to_ext_json() const
×
557
{
558
    if (m_subs.empty()) {
1,394✔
559
        return "{}";
1,394✔
560
    }
561

1,394✔
562
    util::FlatMap<std::string, std::vector<std::string>> table_to_query;
563
    for (const auto& sub : *this) {
1,394✔
564
        std::string table_name(sub.object_class_name);
1,394✔
565
        auto& queries_for_table = table_to_query.at(table_name);
566
        auto query_it = std::find(queries_for_table.begin(), queries_for_table.end(), sub.query_string);
567
        if (query_it != queries_for_table.end()) {
2,618✔
568
            continue;
2,618✔
569
        }
1,068✔
570
        queries_for_table.emplace_back(sub.query_string);
1,068✔
571
    }
572

1,550✔
573
    if (table_to_query.empty()) {
2,142✔
574
        return "{}";
2,142✔
575
    }
2,142✔
576

2,142✔
577
    // TODO this is pulling in a giant compile-time dependency. We should have a better way of escaping the
2,142✔
578
    // query strings into a json object.
8✔
579
    nlohmann::json output_json;
8✔
580
    for (auto& table : table_to_query) {
2,134✔
581
        // We want to make sure that the queries appear in some kind of canonical order so that if there are
2,134✔
582
        // two subscription sets with the same subscriptions in different orders, the server doesn't have to
583
        // waste a bunch of time re-running the queries for that table.
1,550✔
UNCOV
584
        std::stable_sort(table.second.begin(), table.second.end());
×
UNCOV
585

×
586
        bool is_first = true;
587
        std::ostringstream obuf;
588
        for (const auto& query_str : table.second) {
589
            if (!is_first) {
1,550✔
590
                obuf << " OR ";
2,062✔
591
            }
592
            is_first = false;
593
            obuf << "(" << query_str << ")";
594
        }
2,062✔
595
        output_json[table.first] = obuf.str();
596
    }
2,062✔
597

2,062✔
598
    return output_json.dump();
2,134✔
599
}
2,134✔
600

72✔
601
SubscriptionStoreRef SubscriptionStore::create(DBRef db)
72✔
602
{
2,134✔
603
    return std::make_shared<SubscriptionStore>(Private(), std::move(db));
2,134✔
604
}
2,134✔
605

2,062✔
606
SubscriptionStore::SubscriptionStore(Private, DBRef db)
2,062✔
607
    : m_db(std::move(db))
608
{
1,550✔
609
    std::vector<SyncMetadataTable> internal_tables{
1,550✔
610
        {&m_sub_set_table,
611
         c_flx_subscription_sets_table,
612
         {&m_sub_set_version_num, c_flx_sub_sets_version_field, type_Int},
1,422✔
613
         {
1,422✔
614
             {&m_sub_set_state, c_flx_sub_sets_state_field, type_Int},
1,422✔
615
             {&m_sub_set_snapshot_version, c_flx_sub_sets_snapshot_version_field, type_Int},
616
             {&m_sub_set_error_str, c_flx_sub_sets_error_str_field, type_String, true},
617
             {&m_sub_set_subscriptions, c_flx_sub_sets_subscriptions_field, c_flx_subscriptions_table, true},
712✔
618
         }},
1,422✔
619
        {&m_sub_table,
1,422✔
620
         c_flx_subscriptions_table,
1,422✔
621
         SyncMetadataTable::IsEmbeddedTag{},
1,422✔
622
         {
1,422✔
623
             {&m_sub_id, c_flx_sub_id_field, type_ObjectId},
1,422✔
624
             {&m_sub_created_at, c_flx_sub_created_at_field, type_Timestamp},
1,422✔
625
             {&m_sub_updated_at, c_flx_sub_updated_at_field, type_Timestamp},
1,422✔
626
             {&m_sub_name, c_flx_sub_name_field, type_String, true},
1,422✔
627
             {&m_sub_object_class_name, c_flx_sub_object_class_field, type_String},
1,422✔
628
             {&m_sub_query_str, c_flx_sub_query_str_field, type_String},
1,422✔
629
         }},
1,422✔
630
    };
1,422✔
631

1,422✔
632
    auto tr = m_db->start_read();
1,422✔
633
    // Start with a reader so it doesn't try to write until we are ready
1,422✔
634
    SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
1,422✔
635

1,422✔
636
    if (auto schema_version =
1,422✔
637
            schema_versions_reader.get_version_for(tr, internal_schema_groups::c_flx_subscription_store)) {
1,422✔
638
        if (*schema_version != c_flx_schema_version) {
1,422✔
639
            throw RuntimeError(ErrorCodes::UnsupportedFileFormatVersion,
1,422✔
640
                               "Invalid schema version for flexible sync metadata");
1,422✔
641
        }
642
        load_sync_metadata_schema(tr, &internal_tables);
1,422✔
643
    }
644
    else {
1,422✔
645
        tr->promote_to_write();
646
        // Ensure the schema versions table is initialized (may add its own commit)
1,422✔
647
        SyncMetadataSchemaVersions schema_versions(tr);
1,422✔
648
        // Create the metadata schema and set the version (in the same commit)
290✔
UNCOV
649
        schema_versions.set_version_for(tr, internal_schema_groups::c_flx_subscription_store, c_flx_schema_version);
×
UNCOV
650
        create_sync_metadata_schema(tr, &internal_tables);
×
UNCOV
651
        tr->commit_and_continue_as_read();
×
652
    }
290✔
653
    REALM_ASSERT(m_sub_set_table);
290✔
654

1,132✔
655
    // Make sure the subscription set table is properly initialized
1,132✔
656
    initialize_subscriptions_table(std::move(tr));
657
}
1,132✔
658

659
void SubscriptionStore::initialize_subscriptions_table(TransactionRef&& tr)
1,132✔
660
{
1,132✔
661
    if (auto sub_sets = tr->get_table(m_sub_set_table); sub_sets->is_empty()) {
1,132✔
662
        tr->promote_to_write();
1,132✔
663
        clear(*tr);
1,422✔
664
        tr->commit();
665
    }
666
}
1,422✔
667

1,422✔
668
void SubscriptionStore::clear(Transaction& wt)
669
{
670
    auto sub_sets = wt.get_table(m_sub_set_table);
1,422✔
671
    sub_sets->clear();
1,422✔
672
    // There should always be at least one subscription set so that the user can always wait
1,132✔
673
    // for synchronizationon on the result of get_latest().
1,132✔
674
    auto zero_sub = sub_sets->create_object_with_primary_key(Mixed{int64_t(0)});
1,132✔
675
    zero_sub.set(m_sub_set_state, static_cast<int64_t>(SubscriptionSet::State::Pending));
1,132✔
676
    zero_sub.set(m_sub_set_snapshot_version, wt.get_version());
1,422✔
677
}
678

679
SubscriptionSet SubscriptionStore::get_latest()
1,152✔
680
{
1,152✔
681
    auto tr = m_db->start_frozen();
1,152✔
682
    auto sub_sets = tr->get_table(m_sub_set_table);
683
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
684
    REALM_ASSERT(!sub_sets->is_empty());
1,152✔
685

1,152✔
686
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
1,152✔
687
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
1,152✔
688

689
    return SubscriptionSet(weak_from_this(), *tr, latest_obj);
690
}
4,224✔
691

4,224✔
692
SubscriptionSet SubscriptionStore::get_active()
4,224✔
693
{
694
    auto tr = m_db->start_frozen();
4,224✔
695
    return SubscriptionSet(weak_from_this(), *tr, get_active(*tr));
696
}
4,224✔
697

4,224✔
698
Obj SubscriptionStore::get_active(const Transaction& tr)
699
{
4,224✔
700
    auto sub_sets = tr.get_table(m_sub_set_table);
4,224✔
701
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
702
    REALM_ASSERT(!sub_sets->is_empty());
703

1,844✔
704
    DescriptorOrdering descriptor_ordering;
1,844✔
705
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
1,844✔
706
    descriptor_ordering.append_limit(LimitDescriptor{1});
1,844✔
707
    auto res = sub_sets->where()
708
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
709
                   .Or()
4,426✔
710
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
4,426✔
711
                   .find_all(descriptor_ordering);
712

4,426✔
713
    // If there is no active subscription yet, return the zeroth subscription.
714
    if (res.is_empty()) {
4,426✔
715
        return sub_sets->get_object_with_primary_key(0);
4,426✔
716
    }
4,426✔
717
    return res.get_object(0);
4,426✔
718
}
4,426✔
719

4,426✔
720
SubscriptionStore::VersionInfo SubscriptionStore::get_version_info() const
4,426✔
721
{
4,426✔
722
    auto tr = m_db->start_read();
723
    auto sub_sets = tr->get_table(m_sub_set_table);
724
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
4,426✔
725
    REALM_ASSERT(!sub_sets->is_empty());
1,080✔
726

1,080✔
727
    VersionInfo ret;
3,346✔
728
    ret.latest = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
4,426✔
729
    DescriptorOrdering descriptor_ordering;
730
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {false}});
731
    descriptor_ordering.append_limit(LimitDescriptor{1});
64✔
732

64✔
733
    auto res = sub_sets->where()
64✔
734
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Complete))
735
                   .Or()
64✔
736
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
737
                   .find_all(descriptor_ordering);
64✔
738
    ret.active = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
64✔
739

64✔
740
    res = sub_sets->where()
64✔
741
              .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::AwaitingMark))
64✔
742
              .find_all(descriptor_ordering);
743
    ret.pending_mark = res.is_empty() ? SubscriptionSet::EmptyVersion : res.get_object(0).get_primary_key().get_int();
64✔
744

64✔
745
    return ret;
64✔
746
}
64✔
747

64✔
748
util::Optional<SubscriptionStore::PendingSubscription>
64✔
749
SubscriptionStore::get_next_pending_version(int64_t last_query_version) const
750
{
64✔
751
    auto tr = m_db->start_read();
64✔
752
    auto sub_sets = tr->get_table(m_sub_set_table);
64✔
753
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
64✔
754
    REALM_ASSERT(!sub_sets->is_empty());
755

64✔
756
    DescriptorOrdering descriptor_ordering;
64✔
757
    descriptor_ordering.append_sort(SortDescriptor{{{sub_sets->get_primary_key_column()}}, {true}});
758
    auto res = sub_sets->where()
759
                   .greater(sub_sets->get_primary_key_column(), last_query_version)
760
                   .group()
12,916✔
761
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Pending))
12,916✔
762
                   .Or()
12,916✔
763
                   .equal(m_sub_set_state, state_to_storage(SubscriptionSet::State::Bootstrapping))
764
                   .end_group()
12,916✔
765
                   .find_all(descriptor_ordering);
766

12,916✔
767
    if (res.is_empty()) {
12,916✔
768
        return util::none;
12,916✔
769
    }
12,916✔
770

12,916✔
771
    auto obj = res.get_object(0);
12,916✔
772
    auto query_version = obj.get_primary_key().get_int();
12,916✔
773
    auto snapshot_version = obj.get<int64_t>(m_sub_set_snapshot_version);
12,916✔
774
    return PendingSubscription{query_version, static_cast<DB::version_type>(snapshot_version)};
12,916✔
775
}
12,916✔
776

777
std::vector<SubscriptionSet> SubscriptionStore::get_pending_subscriptions()
12,916✔
778
{
10,810✔
779
    std::vector<SubscriptionSet> subscriptions_to_recover;
10,810✔
780
    auto active_sub = get_active();
781
    auto cur_query_version = active_sub.version();
2,106✔
782
    // get a copy of the pending subscription sets since the active version
2,106✔
783
    while (auto next_pending = get_next_pending_version(cur_query_version)) {
2,106✔
784
        cur_query_version = next_pending->query_version;
2,106✔
785
        subscriptions_to_recover.push_back(get_by_version(cur_query_version));
12,916✔
786
    }
787
    return subscriptions_to_recover;
788
}
24✔
789

24✔
790
void SubscriptionStore::notify_all_state_change_notifications(Status status)
24✔
791
{
24✔
792
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
793
    auto to_finish = std::move(m_pending_notifications);
72✔
794
    lk.unlock();
48✔
795

48✔
796
    // Just complete/cancel the pending notifications - this function does not alter the
48✔
797
    // state of any pending subscriptions
24✔
798
    for (auto& req : to_finish) {
24✔
799
        req.promise.set_error(status);
800
    }
801
}
1,386✔
802

1,386✔
803
void SubscriptionStore::reset(Transaction& wt)
1,386✔
804
{
1,386✔
805
    // Clear out and initialize the subscription store
806
    clear(wt);
807

808
    util::CheckedUniqueLock lk(m_pending_notifications_mutex);
1,386✔
809
    auto to_finish = std::move(m_pending_notifications);
24✔
810
    m_min_outstanding_version = 0;
24✔
811
    lk.unlock();
1,386✔
812

813
    for (auto& req : to_finish) {
814
        req.promise.emplace_value(SubscriptionSet::State::Superseded);
20✔
815
    }
816
}
20✔
817

818
void SubscriptionStore::update_state(int64_t version, State new_state, std::optional<std::string_view> error_str)
20✔
819
{
20✔
820
    REALM_ASSERT(error_str.has_value() == (new_state == State::Error));
20✔
821
    REALM_ASSERT(new_state != State::Pending);
822
    REALM_ASSERT(new_state != State::Superseded);
20✔
823

16✔
824
    auto tr = m_db->start_write();
16✔
825
    auto sub_sets = tr->get_table(m_sub_set_table);
20✔
826
    auto obj = sub_sets->get_object_with_primary_key(version);
827
    if (!obj) {
828
        // This can happen either due to a bug in the sync client or due to the
2,052✔
829
        // server sending us an error message for an invalid query version. We
2,052✔
830
        // assume it is the latter here.
2,052✔
831
        throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
2,052✔
832
                           util::format("Invalid state update for nonexistent query version %1", version));
2,052✔
833
    }
×
UNCOV
834

×
UNCOV
835
    auto old_state = state_from_storage(obj.get<int64_t>(m_sub_set_state));
×
836
    switch (new_state) {
837
        case State::Error:
2,052✔
838
            if (old_state == State::Complete) {
28✔
839
                throw RuntimeError(ErrorCodes::SyncProtocolInvariantFailed,
28✔
840
                                   util::format("Received error '%1' for already-completed query version %2. This "
841
                                                "may be due to a queryable field being removed in the server-side "
842
                                                "configuration making the previous subscription set no longer valid.",
28✔
843
                                                *error_str, version));
844
            }
2,024✔
845
            break;
2,024✔
846

2,024✔
847
        case State::Bootstrapping:
UNCOV
848
        case State::AwaitingMark:
✔
UNCOV
849
            REALM_ASSERT(old_state != State::Complete);
×
UNCOV
850
            REALM_ASSERT(old_state != State::Error);
×
UNCOV
851
            break;
×
UNCOV
852

×
853
        case State::Complete:
28✔
854
            supercede_prior_to(tr, version);
UNCOV
855
            break;
✔
856

857
        case State::Uncommitted:
×
858
        case State::Superseded:
×
859
        case State::Pending:
2,052✔
860
            REALM_TERMINATE("Illegal new state for subscription set");
2,052✔
861
    }
862

863
    obj.set(m_sub_set_state, state_to_storage(new_state));
2,032✔
864
    obj.set(m_sub_set_error_str, error_str ? StringData(*error_str) : StringData());
2,032✔
865

2,032✔
866
    tr->commit();
2,032✔
867

868
    process_notifications(new_state, version, error_str.value_or(std::string_view{}));
869
}
2,032✔
870

871
SubscriptionSet SubscriptionStore::get_by_version(int64_t version_id)
2,032✔
872
{
28✔
873
    auto tr = m_db->start_frozen();
28✔
874
    auto sub_sets = tr->get_table(m_sub_set_table);
875
    if (auto obj = sub_sets->get_object_with_primary_key(version_id)) {
876
        return SubscriptionSet(weak_from_this(), *tr, obj);
877
    }
28✔
878

879
    util::CheckedLockGuard lk(m_pending_notifications_mutex);
2,004✔
880
    if (version_id < m_min_outstanding_version) {
2,004✔
881
        return SubscriptionSet(weak_from_this(), version_id, SubscriptionSet::SupersededTag{});
2,004✔
882
    }
883
    throw KeyNotFound(util::format("Subscription set with version %1 not found", version_id));
✔
884
}
UNCOV
885

×
UNCOV
886
SubscriptionSet SubscriptionStore::get_refreshed(ObjKey key, int64_t version, std::optional<DB::VersionID> db_version)
×
887
{
2,032✔
888
    auto tr = m_db->start_frozen(db_version.value_or(VersionID{}));
889
    auto sub_sets = tr->get_table(m_sub_set_table);
890
    if (auto obj = sub_sets->try_get_object(key)) {
2,004✔
891
        return SubscriptionSet(weak_from_this(), *tr, obj);
1,992✔
892
    }
1,992✔
893
    return SubscriptionSet(weak_from_this(), version, SubscriptionSet::SupersededTag{});
2,004✔
894
}
895

896
SubscriptionStore::TableSet SubscriptionStore::get_tables_for_latest(const Transaction& tr) const
2,020✔
897
{
2,020✔
898
    auto sub_sets = tr.get_table(m_sub_set_table);
2,020✔
899
    // There should always be at least one SubscriptionSet - the zeroth subscription set for schema instructions.
900
    REALM_ASSERT(!sub_sets->is_empty());
901

12✔
902
    auto latest_id = sub_sets->max(sub_sets->get_primary_key_column())->get_int();
12✔
903
    auto latest_obj = sub_sets->get_object_with_primary_key(Mixed{latest_id});
12✔
904

905
    TableSet ret;
906
    auto subs = latest_obj.get_linklist(m_sub_set_subscriptions);
28✔
907
    for (size_t idx = 0; idx < subs.size(); ++idx) {
28✔
908
        auto sub_obj = subs.get_object(idx);
28✔
909
        ret.emplace(sub_obj.get<StringData>(m_sub_object_class_name));
28✔
910
    }
28✔
911

912
    return ret;
913
}
UNCOV
914

×
UNCOV
915
void SubscriptionStore::supercede_prior_to(TransactionRef tr, int64_t version_id) const
×
UNCOV
916
{
×
917
    auto sub_sets = tr->get_table(m_sub_set_table);
918
    Query remove_query(sub_sets);
28✔
919
    remove_query.less(sub_sets->get_primary_key_column(), version_id);
28✔
UNCOV
920
    remove_query.remove();
×
UNCOV
921
}
×
UNCOV
922

×
UNCOV
923
MutableSubscriptionSet SubscriptionStore::make_mutable_copy(const SubscriptionSet& set)
×
UNCOV
924
{
×
UNCOV
925
    auto new_tr = m_db->start_write();
×
926

927
    auto sub_sets = new_tr->get_table(m_sub_set_table);
28✔
928
    auto new_pk = sub_sets->max(sub_sets->get_primary_key_column())->get_int() + 1;
28✔
929

28✔
930
    MutableSubscriptionSet new_set_obj(weak_from_this(), std::move(new_tr),
28✔
931
                                       sub_sets->create_object_with_primary_key(Mixed{new_pk}));
932
    for (const auto& sub : set) {
933
        new_set_obj.insert_sub(sub);
2,450✔
934
    }
2,450✔
935

2,450✔
936
    return new_set_obj;
2,450✔
937
}
574✔
938

939
bool SubscriptionStore::would_refresh(DB::version_type version) const noexcept
940
{
941
    return version < m_db->get_version_of_latest_snapshot();
942
}
1,876✔
943

1,876✔
944
int64_t SubscriptionStore::set_active_as_latest(Transaction& wt)
1,876✔
945
{
1,876✔
946
    auto sub_sets = wt.get_table(m_sub_set_table);
947
    auto active = get_active(wt);
948
    // Delete all newer subscription sets, if any
1,238✔
949
    sub_sets->where().greater(sub_sets->get_primary_key_column(), active.get_primary_key().get_int()).remove();
1,238✔
950
    // Mark the active set as complete even if it was previously WaitingForMark
1,238✔
951
    // as we've completed rebootstrapping before calling this.
1,238✔
952
    active.set(m_sub_set_state, state_to_storage(State::Complete));
1,234✔
953
    auto version = active.get_primary_key().get_int();
1,234✔
954

4✔
955
    std::list<NotificationRequest> to_finish;
4✔
956
    {
4✔
957
        util::CheckedLockGuard lock(m_pending_notifications_mutex);
4✔
UNCOV
958
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
×
959
            if (req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete))
4✔
960
                return true;
961
            return req.version != version;
962
        });
1,470✔
963
    }
1,470✔
964

1,470✔
965
    for (auto& req : to_finish) {
1,470✔
966
        req.promise.emplace_value(req.version == version ? State::Complete : State::Superseded);
1,454✔
967
    }
1,454✔
968

16✔
969
    return version;
1,470✔
970
}
971

972
int64_t SubscriptionStore::mark_active_as_complete(Transaction& wt)
12,578✔
973
{
12,578✔
974
    auto active = get_active(wt);
975
    active.set(m_sub_set_state, state_to_storage(State::Complete));
12,578✔
976
    auto version = active.get_primary_key().get_int();
977

12,578✔
978
    std::list<NotificationRequest> to_finish;
12,578✔
979
    {
980
        util::CheckedLockGuard lock(m_pending_notifications_mutex);
12,578✔
981
        splice_if(m_pending_notifications, to_finish, [&](auto& req) {
12,578✔
982
            return req.version == version && state_to_order(req.notify_when) <= state_to_order(State::Complete);
23,752✔
983
        });
11,174✔
984
    }
11,174✔
985

11,174✔
986
    for (auto& req : to_finish) {
987
        req.promise.emplace_value(State::Complete);
12,578✔
988
    }
12,578✔
989

990
    return version;
991
}
1,462✔
992

1,462✔
993
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc