• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_491

09 Aug 2024 04:34PM UTC coverage: 89.577% (-1.5%) from 91.087%
thomas.goyne_491

Pull #7967

Evergreen

tgoyne
Actually check for unuplaoded changes in no_pending_local_changes()

We can have local changesets stored which have already been uploaded and
acknoledged by the server, so checking all of the changesets is incorrect. We
need to instead only check changesets for versions after the current position
of the upload cursor.
Pull Request #7967: RCORE-2232 Actually check for unuploaded changes in no_pending_local_changes()

90956 of 164876 branches covered (55.17%)

37 of 38 new or added lines in 2 files covered. (97.37%)

42 existing lines in 8 files now uncovered.

145956 of 162940 relevant lines covered (89.58%)

8094301.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.0
/src/realm/sync/noinst/client_history_impl.cpp
1
///////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2022 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/sync/noinst/client_history_impl.hpp>
20

21
#include <realm/sync/changeset.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23
#include <realm/sync/instruction_applier.hpp>
24
#include <realm/sync/instruction_replication.hpp>
25
#include <realm/sync/noinst/client_reset.hpp>
26
#include <realm/sync/noinst/client_reset_recovery.hpp>
27
#include <realm/transaction.hpp>
28
#include <realm/util/compression.hpp>
29
#include <realm/util/features.h>
30
#include <realm/util/functional.hpp>
31
#include <realm/util/scope_exit.hpp>
32
#include <realm/version.hpp>
33

34
#include <algorithm>
35
#include <ctime>
36
#include <cstring>
37
#include <utility>
38

39
namespace realm::sync {
40

41
void ClientHistory::set_history_adjustments(
42
    util::Logger& logger, version_type current_version, SaltedFileIdent client_file_ident,
43
    SaltedVersion server_version, const std::vector<_impl::client_reset::RecoveredChange>& recovered_changesets)
44
{
7,872✔
45
    ensure_updated(current_version); // Throws
7,872✔
46
    prepare_for_write();             // Throws
7,872✔
47

48
    version_type client_version = m_sync_history_base_version + sync_history_size();
7,872✔
49
    REALM_ASSERT(client_version == current_version); // For now
7,872✔
50
    Array& root = m_arrays->root;
7,872✔
51
    m_group->set_sync_file_id(client_file_ident.ident); // Throws
7,872✔
52

53
    size_t uploadable_bytes = 0;
7,872✔
54
    if (recovered_changesets.empty()) {
7,872✔
55
        // Either we had nothing to upload or we're discarding the unsynced changes
56
        logger.debug("History adjustments: discarding %1 history entries", sync_history_size());
4,016✔
57
        do_trim_sync_history(sync_history_size()); // Throws
4,016✔
58
    }
4,016✔
59
    else {
3,856✔
60
        // Discard all sync history before the first recovered changeset. This is
61
        // required because we are going to discard our progress information and
62
        // so won't know which history entries have been uploaded already.
63
        auto first_version = recovered_changesets.front().version;
3,856✔
64
        REALM_ASSERT(first_version >= m_sync_history_base_version);
3,856✔
65
        auto discard_count = std::size_t(first_version - m_sync_history_base_version);
3,856✔
66
        do_trim_sync_history(discard_count);
3,856✔
67

68
        if (logger.would_log(util::Logger::Level::debug)) {
3,856✔
69
            logger.debug("History adjustments: trimming %1 history entries and updating the remaining history "
3,808✔
70
                         "entries (%2)",
3,808✔
71
                         discard_count, sync_history_size());
3,808✔
72
            for (size_t i = 0, size = m_arrays->changesets.size(); i < size; ++i) {
15,436✔
73
                logger.debug("- %1: ident(%2) changeset_size(%3) remote_version(%4)", i,
11,628✔
74
                             m_arrays->origin_file_idents.get(i), m_arrays->changesets.get(i).size(),
11,628✔
75
                             m_arrays->remote_versions.get(i));
11,628✔
76
            }
11,628✔
77
        }
3,808✔
78

79
        util::compression::CompressMemoryArena arena;
3,856✔
80
        util::AppendBuffer<char> compressed;
3,856✔
81
        for (auto& [changeset, version] : recovered_changesets) {
7,790✔
82
            uploadable_bytes += changeset.size();
7,790✔
83
            auto i = size_t(version - m_sync_history_base_version);
7,790✔
84
            util::compression::allocate_and_compress_nonportable(arena, changeset, compressed);
7,790✔
85
            m_arrays->changesets.set(i, BinaryData{compressed.data(), compressed.size()}); // Throws
7,790✔
86
            m_arrays->reciprocal_transforms.set(i, BinaryData());
7,790✔
87
        }
7,790✔
88
        // Server version is updated for *every* entry in the sync history to ensure that server versions don't
89
        // decrease.
90
        for (size_t i = 0, size = m_arrays->remote_versions.size(); i < size; ++i) {
15,668✔
91
            m_arrays->remote_versions.set(i, server_version.version);
11,812✔
92
            version_type version = m_sync_history_base_version + i;
11,812✔
93
            logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version + 1,
11,812✔
94
                         m_arrays->changesets.get(i).size(), server_version.version);
11,812✔
95
        }
11,812✔
96
    }
3,856✔
97
    logger.debug("New uploadable bytes after history adjustment: %1", uploadable_bytes);
7,872✔
98

99
    // Client progress versions are set to 0 to signal to the server that we've
100
    // reset our versioning. If we send the actual values, the server would
101
    // complain that the versions (probably) don't correspond to the ones sent
102
    // when downloading the fresh realm.
103
    root.set(s_progress_download_client_version_iip,
7,872✔
104
             RefOrTagged::make_tagged(0)); // Throws
7,872✔
105
    root.set(s_progress_upload_client_version_iip,
7,872✔
106
             RefOrTagged::make_tagged(0)); // Throws
7,872✔
107

108
    root.set(s_client_file_ident_salt_iip,
7,872✔
109
             RefOrTagged::make_tagged(client_file_ident.salt)); // Throws
7,872✔
110
    root.set(s_progress_download_server_version_iip,
7,872✔
111
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,872✔
112
    root.set(s_progress_latest_server_version_iip,
7,872✔
113
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,872✔
114
    root.set(s_progress_latest_server_version_salt_iip,
7,872✔
115
             RefOrTagged::make_tagged(server_version.salt)); // Throws
7,872✔
116
    root.set(s_progress_upload_server_version_iip,
7,872✔
117
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,872✔
118
    root.set(s_progress_downloaded_bytes_iip,
7,872✔
119
             RefOrTagged::make_tagged(0)); // Throws
7,872✔
120
    root.set(s_progress_downloadable_bytes_iip,
7,872✔
121
             RefOrTagged::make_tagged(0)); // Throws
7,872✔
122
    root.set(s_progress_uploaded_bytes_iip,
7,872✔
123
             RefOrTagged::make_tagged(0)); // Throws
7,872✔
124
    root.set(s_progress_uploadable_bytes_iip,
7,872✔
125
             RefOrTagged::make_tagged(uploadable_bytes)); // Throws
7,872✔
126

127
    m_progress_download = {server_version.version, 0};
7,872✔
128
    m_applying_client_reset = true;
7,872✔
129
}
7,872✔
130

131
std::vector<ClientHistory::LocalChange> ClientHistory::get_local_changes(version_type current_version) const
132
{
3,976✔
133
    ensure_updated(current_version); // Throws
3,976✔
134
    std::vector<ClientHistory::LocalChange> changesets;
3,976✔
135
    if (!m_arrays || m_arrays->changesets.is_empty())
3,976✔
136
        return changesets;
×
137

138
    sync::version_type begin_version = 0;
3,976✔
139
    {
3,976✔
140
        sync::version_type local_version;
3,976✔
141
        SaltedFileIdent local_ident;
3,976✔
142
        SyncProgress local_progress;
3,976✔
143
        get_status(local_version, local_ident, local_progress);
3,976✔
144
        begin_version = local_progress.upload.client_version;
3,976✔
145
    }
3,976✔
146

147
    version_type end_version = m_sync_history_base_version + sync_history_size();
3,976✔
148
    if (begin_version < m_sync_history_base_version)
3,976✔
149
        begin_version = m_sync_history_base_version;
236✔
150

151
    for (version_type version = begin_version; version < end_version; ++version) {
20,300✔
152
        std::size_t ndx = std::size_t(version - m_sync_history_base_version);
16,324✔
153
        std::int_fast64_t origin_file_ident = m_arrays->origin_file_idents.get(ndx);
16,324✔
154
        bool not_from_server = (origin_file_ident == 0);
16,324✔
155
        if (not_from_server) {
16,324✔
156
            bool compressed = false;
16,256✔
157
            // find_sync_history_entry() returns 0 to indicate not found and
158
            // otherwise adds 1 to the version, and then get_reciprocal_transform()
159
            // subtracts 1 from the version
160
            if (auto changeset = get_reciprocal_transform(version + 1, compressed); !changeset.empty()) {
16,256✔
161
                changesets.push_back({version, changeset});
7,878✔
162
            }
7,878✔
163
        }
16,256✔
164
    }
16,324✔
165
    return changesets;
3,976✔
166
}
3,976✔
167

168
void ClientHistory::set_local_origin_timestamp_source(util::UniqueFunction<timestamp_type()> source_fn)
169
{
24✔
170
    m_local_origin_timestamp_source = std::move(source_fn);
24✔
171
}
24✔
172

173
// Overriding member function in realm::Replication
174
void ClientReplication::initialize(DB& sg)
175
{
20,496✔
176
    SyncReplication::initialize(sg); // Throws
20,496✔
177
    m_history.initialize(sg);
20,496✔
178
}
20,496✔
179

180

181
// Overriding member function in realm::Replication
182
auto ClientReplication::get_history_type() const noexcept -> HistoryType
183
{
55,892✔
184
    return hist_SyncClient;
55,892✔
185
}
55,892✔
186

187

188
// Overriding member function in realm::Replication
189
int ClientReplication::get_history_schema_version() const noexcept
190
{
20,482✔
191
    return get_client_history_schema_version();
20,482✔
192
}
20,482✔
193

194

195
// Overriding member function in realm::Replication
196
bool ClientReplication::is_upgradable_history_schema(int stored_schema_version) const noexcept
197
{
6✔
198
    if (stored_schema_version == 11) {
6✔
199
        return true;
6✔
200
    }
6✔
201
    return false;
×
202
}
6✔
203

204

205
// Overriding member function in realm::Replication
206
void ClientReplication::upgrade_history_schema(int stored_schema_version)
207
{
4✔
208
    // upgrade_history_schema() is called only when there is a need to upgrade
209
    // (`stored_schema_version < get_server_history_schema_version()`), and only
210
    // when is_upgradable_history_schema() returned true (`stored_schema_version
211
    // >= 1`).
212
    REALM_ASSERT(stored_schema_version < get_client_history_schema_version());
4✔
213
    REALM_ASSERT(stored_schema_version >= 11);
4✔
214
    int orig_schema_version = stored_schema_version;
4✔
215
    int schema_version = orig_schema_version;
4✔
216

217
    if (schema_version < 12) {
4✔
218
        m_history.compress_stored_changesets();
4✔
219
        schema_version = 12;
4✔
220
    }
4✔
221

222
    // NOTE: Future migration steps go here.
223

224
    REALM_ASSERT(schema_version == get_client_history_schema_version());
4✔
225

226
    // Record migration event
227
    m_history.record_current_schema_version(); // Throws
4✔
228
}
4✔
229

230
void ClientHistory::compress_stored_changesets()
231
{
4✔
232
    using gf = _impl::GroupFriend;
4✔
233
    Allocator& alloc = gf::get_alloc(*m_group);
4✔
234
    auto ref = gf::get_history_ref(*m_group);
4✔
235
    Arrays arrays{alloc, m_group, ref};
4✔
236

237
    util::AppendBuffer<char> compressed_buffer;
4✔
238
    util::AppendBuffer<char> decompressed_buffer;
4✔
239
    util::compression::CompressMemoryArena arena;
4✔
240
    auto columns = {&arrays.reciprocal_transforms, &arrays.changesets};
4✔
241
    for (auto column : columns) {
8✔
242
        for (size_t i = 0; i < column->size(); ++i) {
24✔
243
            ChunkedBinaryData data(*column, i);
16✔
244
            if (data.is_null())
16✔
245
                continue;
8✔
246
            data.copy_to(compressed_buffer);
8✔
247
            util::compression::allocate_and_compress_nonportable(arena, compressed_buffer, decompressed_buffer);
8✔
248
            column->set(i, BinaryData{decompressed_buffer.data(), decompressed_buffer.size()}); // Throws
8✔
249
        }
8✔
250
    }
8✔
251
}
4✔
252

253
// Overriding member function in realm::Replication
254
auto ClientReplication::prepare_changeset(const char* data, size_t size, version_type orig_version) -> version_type
255
{
174,368✔
256
    m_history.ensure_updated(orig_version);
174,368✔
257
    m_history.prepare_for_write(); // Throws
174,368✔
258

259
    BinaryData ct_changeset{data, size};
174,368✔
260
    auto& buffer = get_instruction_encoder().buffer();
174,368✔
261
    BinaryData sync_changeset(buffer.data(), buffer.size());
174,368✔
262

263
    return m_history.add_changeset(ct_changeset, sync_changeset); // Throws
174,368✔
264
}
174,368✔
265

266
util::UniqueFunction<SyncReplication::WriteValidator> ClientReplication::make_write_validator(Transaction& tr)
267
{
179,270✔
268
    if (!m_write_validator_factory) {
179,270✔
269
        return {};
163,854✔
270
    }
163,854✔
271

272
    return m_write_validator_factory(tr);
15,416✔
273
}
179,270✔
274

275
void ClientHistory::get_status(version_type& current_client_version, SaltedFileIdent& client_file_ident,
276
                               SyncProgress& progress) const
277
{
30,500✔
278
    TransactionRef rt = m_db->start_read(); // Throws
30,500✔
279
    version_type current_client_version_2 = rt->get_version();
30,500✔
280

281
    SaltedFileIdent client_file_ident_2{rt->get_sync_file_id(), 0};
30,500✔
282
    SyncProgress progress_2;
30,500✔
283
    using gf = _impl::GroupFriend;
30,500✔
284
    if (ref_type ref = gf::get_history_ref(*rt)) {
30,500✔
285
        Array root(m_db->get_alloc());
28,068✔
286
        root.init_from_ref(ref);
28,068✔
287
        client_file_ident_2.salt = salt_type(root.get_as_ref_or_tagged(s_client_file_ident_salt_iip).get_as_int());
28,068✔
288
        progress_2.latest_server_version.version =
28,068✔
289
            version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_iip).get_as_int());
28,068✔
290
        progress_2.latest_server_version.salt =
28,068✔
291
            version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_salt_iip).get_as_int());
28,068✔
292
        progress_2.download.server_version =
28,068✔
293
            version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
28,068✔
294
        progress_2.download.last_integrated_client_version =
28,068✔
295
            version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
28,068✔
296
        progress_2.upload.client_version =
28,068✔
297
            version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
28,068✔
298
        progress_2.upload.last_integrated_server_version =
28,068✔
299
            version_type(root.get_as_ref_or_tagged(s_progress_upload_server_version_iip).get_as_int());
28,068✔
300
    }
28,068✔
301

302
    current_client_version = current_client_version_2;
30,500✔
303
    client_file_ident = client_file_ident_2;
30,500✔
304
    progress = progress_2;
30,500✔
305

306
    REALM_ASSERT(current_client_version >= s_initial_version + 0);
30,500✔
307
    if (current_client_version == s_initial_version + 0)
30,500✔
308
        current_client_version = 0;
2,430✔
309
}
30,500✔
310

311
void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids)
312
{
3,752✔
313
    REALM_ASSERT(client_file_ident.ident != 0);
3,752✔
314

315
    TransactionRef wt = m_db->start_write(); // Throws
3,752✔
316
    version_type local_version = wt->get_version();
3,752✔
317
    ensure_updated(local_version); // Throws
3,752✔
318
    prepare_for_write();           // Throws
3,752✔
319

320
    Array& root = m_arrays->root;
3,752✔
321
    REALM_ASSERT(wt->get_sync_file_id() == 0);
3,752✔
322
    wt->set_sync_file_id(client_file_ident.ident);
3,752✔
323
    root.set(s_client_file_ident_salt_iip,
3,752✔
324
             RefOrTagged::make_tagged(client_file_ident.salt)); // Throws
3,752✔
325
    root.set(s_progress_download_client_version_iip, RefOrTagged::make_tagged(0));
3,752✔
326
    root.set(s_progress_upload_client_version_iip, RefOrTagged::make_tagged(0));
3,752✔
327

328
    if (fix_up_object_ids) {
3,752✔
329
        fix_up_client_file_ident_in_stored_changesets(*wt, client_file_ident.ident); // Throws
886✔
330
    }
886✔
331

332
    // Note: This transaction produces an empty changeset. Empty changesets are
333
    // not uploaded to the server.
334
    wt->commit(); // Throws
3,752✔
335
}
3,752✔
336

337

338
// Overriding member function in realm::sync::ClientHistoryBase
339
void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
340
                                      VersionInfo& version_info)
341
{
29,428✔
342
    TransactionRef wt = m_db->start_write(); // Throws
29,428✔
343
    version_type local_version = wt->get_version();
29,428✔
344
    ensure_updated(local_version); // Throws
29,428✔
345
    prepare_for_write();           // Throws
29,428✔
346

347
    update_sync_progress(progress, downloadable_bytes); // Throws
29,428✔
348

349
    // Note: This transaction produces an empty changeset. Empty changesets are
350
    // not uploaded to the server.
351
    version_type new_version = wt->commit(); // Throws
29,428✔
352
    version_info.realm_version = new_version;
29,428✔
353
    version_info.sync_version = {new_version, 0};
29,428✔
354
}
29,428✔
355

356
void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, version_type end_version,
357
                                               std::vector<UploadChangeset>& uploadable_changesets,
358
                                               version_type& locked_server_version) const
359
{
61,668✔
360
    TransactionRef rt = m_db->start_read(); // Throws
61,668✔
361
    auto& alloc = m_db->get_alloc();
61,668✔
362
    using gf = _impl::GroupFriend;
61,668✔
363
    ref_type ref = gf::get_history_ref(*rt);
61,668✔
364
    REALM_ASSERT(ref);
61,668✔
365

366
    Arrays arrays(alloc, rt.get(), ref);
61,668✔
367
    const auto sync_history_size = arrays.changesets.size();
61,668✔
368
    const auto sync_history_base_version = rt->get_version() - sync_history_size;
61,668✔
369

370
    std::size_t accum_byte_size_soft_limit = 131072;   // 128 KB
61,668✔
371
    std::size_t accum_byte_size_hard_limit = 16777216; // server-imposed limit
61,668✔
372
    std::size_t accum_byte_size = 0;
61,668✔
373

374
    version_type begin_version_2 = std::max(upload_progress.client_version, sync_history_base_version);
61,668✔
375
    version_type end_version_2 = std::max(end_version, sync_history_base_version);
61,668✔
376
    version_type last_integrated_upstream_version = upload_progress.last_integrated_server_version;
61,668✔
377

378
    while (accum_byte_size < accum_byte_size_soft_limit) {
104,882✔
379
        HistoryEntry entry;
104,388✔
380
        version_type last_integrated_upstream_version_2 = last_integrated_upstream_version;
104,388✔
381
        version_type version = find_sync_history_entry(arrays, sync_history_base_version, begin_version_2,
104,388✔
382
                                                       end_version_2, entry, last_integrated_upstream_version_2);
104,388✔
383

384
        if (version == 0) {
104,388✔
385
            begin_version_2 = end_version_2;
61,162✔
386
            last_integrated_upstream_version = last_integrated_upstream_version_2;
61,162✔
387
            break;
61,162✔
388
        }
61,162✔
389

390
        ChunkedBinaryInputStream is(entry.changeset);
43,226✔
391
        size_t size = util::compression::get_uncompressed_size_from_header(is);
43,226✔
392
        if (accum_byte_size + size >= accum_byte_size_hard_limit && !uploadable_changesets.empty())
43,226✔
393
            break;
12✔
394
        accum_byte_size += size;
43,214✔
395
        last_integrated_upstream_version = last_integrated_upstream_version_2;
43,214✔
396
        begin_version_2 = version;
43,214✔
397

398
        UploadChangeset uc;
43,214✔
399
        util::AppendBuffer<char> decompressed;
43,214✔
400
        ChunkedBinaryInputStream is_2(entry.changeset);
43,214✔
401
        auto ec = util::compression::decompress_nonportable(is_2, decompressed);
43,214✔
402
        if (ec == util::compression::error::decompress_unsupported) {
43,214✔
403
            REALM_TERMINATE(
404
                "Synchronized Realm files with unuploaded local changes cannot be copied between platforms.");
×
405
        }
×
406
        REALM_ASSERT_3(ec, ==, std::error_code{});
43,214✔
407

408
        uc.origin_timestamp = entry.origin_timestamp;
43,214✔
409
        uc.origin_file_ident = entry.origin_file_ident;
43,214✔
410
        uc.progress = UploadCursor{version, entry.remote_version};
43,214✔
411
        uc.changeset = BinaryData{decompressed.data(), decompressed.size()};
43,214✔
412
        uc.buffer = decompressed.release().release();
43,214✔
413
        uploadable_changesets.push_back(std::move(uc)); // Throws
43,214✔
414
    }
43,214✔
415

416
    upload_progress = {std::min(begin_version_2, end_version), last_integrated_upstream_version};
61,668✔
417

418
    locked_server_version = arrays.root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int();
61,668✔
419
}
61,668✔
420

421

422
void ClientHistory::integrate_server_changesets(
423
    const SyncProgress& progress, DownloadableProgress downloadable_bytes,
424
    util::Span<const RemoteChangeset> incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state,
425
    util::Logger& logger, const TransactionRef& transact,
426
    util::UniqueFunction<void(const Transaction&, util::Span<Changeset>)> run_in_write_tr)
427
{
26,090✔
428
    REALM_ASSERT(incoming_changesets.size() != 0);
26,090✔
429
    REALM_ASSERT(
26,090✔
430
        (transact->get_transact_stage() == DB::transact_Writing && batch_state != DownloadBatchState::SteadyState) ||
26,090✔
431
        (transact->get_transact_stage() == DB::transact_Reading && batch_state == DownloadBatchState::SteadyState));
26,090✔
432
    std::vector<Changeset> changesets;
26,090✔
433
    changesets.resize(incoming_changesets.size()); // Throws
26,090✔
434

435
    // Parse incoming changesets without holding the write lock unless 'transact' is specified.
436
    try {
26,090✔
437
        for (std::size_t i = 0; i < incoming_changesets.size(); ++i) {
73,772✔
438
            const RemoteChangeset& changeset = incoming_changesets[i];
47,682✔
439
            parse_remote_changeset(changeset, changesets[i]); // Throws
47,682✔
440
            changesets[i].transform_sequence = i;
47,682✔
441
        }
47,682✔
442
    }
26,090✔
443
    catch (const BadChangesetError& e) {
26,090✔
444
        throw IntegrationException(ErrorCodes::BadChangeset,
4✔
445
                                   util::format("Failed to parse received changeset: %1", e.what()),
4✔
446
                                   ProtocolError::bad_changeset);
4✔
447
    }
4✔
448

449
    VersionID new_version{0, 0};
26,088✔
450
    auto num_changesets = incoming_changesets.size();
26,088✔
451
    util::Span<Changeset> changesets_to_integrate(changesets);
26,088✔
452
    const bool allow_lock_release = batch_state == DownloadBatchState::SteadyState;
26,088✔
453

454
    // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on the
455
    // number of times the sync client yields the write lock to allow the user to commit their changes.
456
    // In each iteration, at least one changeset is transformed and committed.
457
    // In FLX, all changesets are committed at once in the bootstrap phase (i.e, in one iteration).
458
    while (!changesets_to_integrate.empty()) {
52,178✔
459
        if (transact->get_transact_stage() == DB::transact_Reading) {
26,090✔
460
            transact->promote_to_write(); // Throws
23,386✔
461
        }
23,386✔
462
        VersionID old_version = transact->get_version_of_current_transaction();
26,090✔
463
        version_type local_version = old_version.version;
26,090✔
464
        auto sync_file_id = transact->get_sync_file_id();
26,090✔
465
        REALM_ASSERT(sync_file_id != 0);
26,090✔
466

467
        ensure_updated(local_version); // Throws
26,090✔
468
        prepare_for_write();           // Throws
26,090✔
469

470
        std::uint64_t downloaded_bytes_in_transaction = 0;
26,090✔
471
        auto changesets_transformed_count = transform_and_apply_server_changesets(
26,090✔
472
            changesets_to_integrate, transact, logger, downloaded_bytes_in_transaction, allow_lock_release);
26,090✔
473

474
        // downloaded_bytes always contains the total number of downloaded bytes
475
        // from the Realm. downloaded_bytes must be persisted in the Realm, since
476
        // the downloaded changesets are trimmed after use, and since it would be
477
        // expensive to traverse the entire history.
478
        Array& root = m_arrays->root;
26,090✔
479
        auto downloaded_bytes =
26,090✔
480
            std::uint64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int());
26,090✔
481
        downloaded_bytes += downloaded_bytes_in_transaction;
26,090✔
482
        root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws
26,090✔
483

484
        const RemoteChangeset& last_changeset = incoming_changesets[changesets_transformed_count - 1];
26,090✔
485
        auto changesets_for_cb = changesets_to_integrate.first(changesets_transformed_count);
26,090✔
486
        changesets_to_integrate = changesets_to_integrate.sub_span(changesets_transformed_count);
26,090✔
487
        incoming_changesets = incoming_changesets.sub_span(changesets_transformed_count);
26,090✔
488

489
        // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
490
        // synthetic server version that represents synthetic changesets generated from state on the server.
491
        if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
26,090✔
492
            update_sync_progress(progress, downloadable_bytes); // Throws
2,392✔
493
        }
2,392✔
494
        // Always update progress for download messages from steady state.
495
        else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
23,698✔
496
            auto partial_progress = progress;
×
497
            partial_progress.download.server_version = last_changeset.remote_version;
×
498
            partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
×
499
            update_sync_progress(partial_progress, downloadable_bytes); // Throws
×
500
        }
×
501
        else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
23,698✔
502
            update_sync_progress(progress, downloadable_bytes); // Throws
23,364✔
503
        }
23,364✔
504
        if (run_in_write_tr) {
26,090✔
505
            run_in_write_tr(*transact, changesets_for_cb);
26,024✔
506
        }
26,024✔
507

508
        // The reason we can use the `origin_timestamp`, and the `origin_file_ident`
509
        // from the last transformed changeset, and ignore all the other changesets, is
510
        // that these values are actually irrelevant for changesets of remote origin
511
        // stored in the client-side history (for now), except that
512
        // `origin_file_ident` is required to be nonzero, to mark it as having been
513
        // received from the server.
514
        HistoryEntry entry;
26,090✔
515
        entry.origin_timestamp = last_changeset.origin_timestamp;
26,090✔
516
        entry.origin_file_ident = last_changeset.origin_file_ident;
26,090✔
517
        entry.remote_version = last_changeset.remote_version;
26,090✔
518
        add_sync_history_entry(entry); // Throws
26,090✔
519

520
        // Tell prepare_commit()/add_changeset() not to write a history entry for
521
        // this transaction as we already did it.
522
        REALM_ASSERT(!m_applying_server_changeset);
26,090✔
523
        m_applying_server_changeset = true;
26,090✔
524
        // Commit and continue to write if in bootstrap phase and there are still changes to integrate.
525
        if (batch_state == DownloadBatchState::MoreToCome ||
26,090✔
526
            (batch_state == DownloadBatchState::LastInBatch && !changesets_to_integrate.empty())) {
26,090✔
527
            new_version = transact->commit_and_continue_writing(); // Throws
308✔
528
        }
308✔
529
        else {
25,782✔
530
            new_version = transact->commit_and_continue_as_read(); // Throws
25,782✔
531
        }
25,782✔
532

533
        logger.debug(util::LogCategory::changeset, "Integrated %1 changesets out of %2", changesets_transformed_count,
26,090✔
534
                     num_changesets);
26,090✔
535
    }
26,090✔
536

537
    REALM_ASSERT(new_version.version > 0);
26,088✔
538
    REALM_ASSERT(
26,088✔
539
        (batch_state == DownloadBatchState::MoreToCome && transact->get_transact_stage() == DB::transact_Writing) ||
26,088✔
540
        (batch_state != DownloadBatchState::MoreToCome && transact->get_transact_stage() == DB::transact_Reading));
26,088✔
541
    version_info.realm_version = new_version.version;
26,088✔
542
    version_info.sync_version = {new_version.version, 0};
26,088✔
543
}
26,088✔
544

545

546
size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset> changesets_to_integrate,
547
                                                            TransactionRef transact, util::Logger& logger,
548
                                                            std::uint64_t& downloaded_bytes, bool allow_lock_release)
549
{
26,090✔
550
    REALM_ASSERT(transact->get_transact_stage() == DB::transact_Writing);
26,090✔
551

552
    if (!m_replication.apply_server_changes()) {
26,090✔
553
        std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset& c) {
54✔
554
            downloaded_bytes += c.original_changeset_size;
54✔
555
        });
54✔
556
        // Skip over all changesets if they don't need to be transformed and applied.
557
        return changesets_to_integrate.size();
54✔
558
    }
54✔
559

560
    version_type local_version = transact->get_version_of_current_transaction().version;
26,036✔
561
    auto sync_file_id = transact->get_sync_file_id();
26,036✔
562

563
    try {
26,036✔
564
        for (auto& changeset : changesets_to_integrate) {
47,608✔
565
            REALM_ASSERT(changeset.last_integrated_remote_version <= local_version);
47,608✔
566
            REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id);
47,608✔
567

568
            // It is possible that the synchronization history has been trimmed
569
            // to a point where a prefix of the merge window is no longer
570
            // available, but this can only happen if that prefix consisted
571
            // entirely of upload skippable entries. Since such entries (those
572
            // that are empty or of remote origin) will be skipped by the
573
            // transformer anyway, we can simply clamp the beginning of the
574
            // merge window to the beginning of the synchronization history,
575
            // when this situation occurs.
576
            //
577
            // See trim_sync_history() for further details.
578
            if (changeset.last_integrated_remote_version < m_sync_history_base_version)
47,608✔
579
                changeset.last_integrated_remote_version = m_sync_history_base_version;
37,558✔
580
        }
47,608✔
581

582
        constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB
26,036✔
583

584
        auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool {
47,596✔
585
            InstructionApplier applier{*transact};
47,596✔
586
            {
47,596✔
587
                TempShortCircuitReplication tscr{m_replication};
47,596✔
588
                applier.apply(*transformed_changeset); // Throws
47,596✔
589
            }
47,596✔
590
            downloaded_bytes += transformed_changeset->original_changeset_size;
47,596✔
591

592
            return !(allow_lock_release && m_db->other_writers_waiting_for_lock() &&
47,596✔
593
                     transact->get_commit_size() >= commit_byte_size_limit);
47,596✔
594
        };
47,596✔
595
        sync::Transformer transformer;
26,036✔
596
        auto changesets_transformed_count = transformer.transform_remote_changesets(
26,036✔
597
            *this, sync_file_id, local_version, changesets_to_integrate, changeset_applier, logger); // Throws
26,036✔
598
        return changesets_transformed_count;
26,036✔
599
    }
26,036✔
600
    catch (const BadChangesetError& e) {
26,036✔
601
        throw IntegrationException(ErrorCodes::BadChangeset,
×
602
                                   util::format("Failed to apply received changeset: %1", e.what()),
×
603
                                   ProtocolError::bad_changeset);
×
604
    }
×
605
    catch (const TransformError& e) {
26,036✔
606
        throw IntegrationException(ErrorCodes::BadChangeset,
28✔
607
                                   util::format("Failed to transform received changeset: %1", e.what()),
28✔
608
                                   ProtocolError::bad_changeset);
28✔
609
    }
28✔
610
}
26,036✔
611

612

613
void ClientHistory::get_upload_download_state(Transaction& rt, Allocator& alloc, std::uint_fast64_t& downloaded_bytes,
614
                                              DownloadableProgress& downloadable_bytes,
615
                                              std::uint_fast64_t& uploaded_bytes,
616
                                              std::uint_fast64_t& uploadable_bytes,
617
                                              std::uint_fast64_t& snapshot_version, version_type& uploaded_version)
618
{
58,666✔
619
    version_type current_client_version = rt.get_version();
58,666✔
620

621
    downloaded_bytes = 0;
58,666✔
622
    downloadable_bytes = uint64_t(0);
58,666✔
623
    uploaded_bytes = 0;
58,666✔
624
    uploadable_bytes = 0;
58,666✔
625
    snapshot_version = current_client_version;
58,666✔
626
    uploaded_version = 0;
58,666✔
627

628
    using gf = _impl::GroupFriend;
58,666✔
629
    ref_type ref = gf::get_history_ref(rt);
58,666✔
630
    if (!ref)
58,666✔
631
        return;
70✔
632

633
    Array root(alloc);
58,596✔
634
    root.init_from_ref(ref);
58,596✔
635
    downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
58,596✔
636
    downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
58,596✔
637
    uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
58,596✔
638
    uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
58,596✔
639

640
    uploaded_version = root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int();
58,596✔
641
    if (uploaded_version == current_client_version)
58,596✔
642
        return;
×
643

644
    BinaryColumn changesets(alloc);
58,596✔
645
    changesets.init_from_ref(root.get_as_ref(s_changesets_iip));
58,596✔
646
    IntegerBpTree origin_file_idents(alloc);
58,596✔
647
    origin_file_idents.init_from_ref(root.get_as_ref(s_origin_file_idents_iip));
58,596✔
648

649
    // `base_version` is the oldest version we have history for. If this is
650
    // greater than uploaded_version, all of the versions in between the two had
651
    // empty changesets and did not need to be uploaded. If this is less than
652
    // uploaded_version, we have changesets which have been uploaded but the
653
    // server has not yet told us we can delete and we may need to use for merging.
654
    auto base_version = current_client_version - changesets.size();
58,596✔
655
    if (uploaded_version < base_version) {
58,596✔
656
        uploaded_version = base_version;
34,822✔
657
    }
34,822✔
658

659
    auto count = size_t(current_client_version - uploaded_version);
58,596✔
660
    for (size_t i = changesets.size() - count; i < changesets.size(); ++i) {
141,118✔
661
        if (origin_file_idents.get(i) == 0) {
109,954✔
662
            size_t pos = 0;
96,608✔
663
            if (changesets.get_at(i, pos).size() != 0)
96,608✔
664
                break;
27,432✔
665
        }
96,608✔
666
        ++uploaded_version;
82,522✔
667
    }
82,522✔
668
}
58,596✔
669

670
void ClientHistory::get_upload_download_state(DB* db, std::uint_fast64_t& downloaded_bytes,
671
                                              std::uint_fast64_t& uploaded_bytes)
672
{
10,772✔
673
    TransactionRef rt = db->start_read(); // Throws
10,772✔
674
    downloaded_bytes = 0;
10,772✔
675
    uploaded_bytes = 0;
10,772✔
676

677
    using gf = _impl::GroupFriend;
10,772✔
678
    if (ref_type ref = gf::get_history_ref(*rt)) {
10,772✔
679
        Array root(db->get_alloc());
8,342✔
680
        root.init_from_ref(ref);
8,342✔
681
        downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
8,342✔
682
        uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
8,342✔
683
    }
8,342✔
684
}
10,772✔
685

686
auto ClientHistory::find_history_entry(version_type begin_version, version_type end_version,
687
                                       HistoryEntry& entry) const noexcept -> version_type
688
{
91,642✔
689
    version_type last_integrated_server_version;
91,642✔
690
    return find_sync_history_entry(*m_arrays, m_sync_history_base_version, begin_version, end_version, entry,
91,642✔
691
                                   last_integrated_server_version);
91,642✔
692
}
91,642✔
693

694

695
ChunkedBinaryData ClientHistory::get_reciprocal_transform(version_type version, bool& is_compressed) const
696
{
79,642✔
697
    is_compressed = true;
79,642✔
698
    REALM_ASSERT(version > m_sync_history_base_version);
79,642✔
699

700
    std::size_t index = to_size_t(version - m_sync_history_base_version) - 1;
79,642✔
701
    REALM_ASSERT(index < sync_history_size());
79,642✔
702

703
    ChunkedBinaryData reciprocal{m_arrays->reciprocal_transforms, index};
79,642✔
704
    if (!reciprocal.is_null())
79,642✔
705
        return reciprocal;
506✔
706
    return ChunkedBinaryData{m_arrays->changesets, index};
79,136✔
707
}
79,642✔
708

709

710
void ClientHistory::set_reciprocal_transform(version_type version, BinaryData data)
711
{
2,252✔
712
    REALM_ASSERT(version > m_sync_history_base_version);
2,252✔
713

714
    std::size_t index = size_t(version - m_sync_history_base_version) - 1;
2,252✔
715
    REALM_ASSERT(index < sync_history_size());
2,252✔
716

717
    if (data.size() == 0) {
2,252✔
718
        m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws
2,066✔
719
        return;
2,066✔
720
    }
2,066✔
721

722
    auto compressed = util::compression::allocate_and_compress_nonportable(data);
186✔
723
    m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws
186✔
724
}
186✔
725

726

727
auto ClientHistory::find_sync_history_entry(Arrays& arrays, version_type base_version, version_type begin_version,
728
                                            version_type end_version, HistoryEntry& entry,
729
                                            version_type& last_integrated_server_version) noexcept -> version_type
730
{
196,030✔
731
    if (begin_version == 0)
196,030✔
732
        begin_version = s_initial_version + 0;
×
733

734
    REALM_ASSERT(begin_version <= end_version);
196,030✔
735
    REALM_ASSERT(begin_version >= base_version);
196,030✔
736
    REALM_ASSERT(end_version <= base_version + arrays.changesets.size());
196,030✔
737
    std::size_t n = to_size_t(end_version - begin_version);
196,030✔
738
    std::size_t offset = to_size_t(begin_version - base_version);
196,030✔
739
    for (std::size_t i = 0; i < n; ++i) {
347,648✔
740
        std::int_fast64_t origin_file_ident = arrays.origin_file_idents.get(offset + i);
260,074✔
741
        last_integrated_server_version = version_type(arrays.remote_versions.get(offset + i));
260,074✔
742
        bool not_from_server = (origin_file_ident == 0);
260,074✔
743
        if (not_from_server) {
260,074✔
744
            ChunkedBinaryData chunked_changeset(arrays.changesets, offset + i);
184,804✔
745
            if (!chunked_changeset.empty()) {
184,804✔
746
                entry.origin_file_ident = file_ident_type(origin_file_ident);
108,456✔
747
                entry.remote_version = last_integrated_server_version;
108,456✔
748
                entry.origin_timestamp = timestamp_type(arrays.origin_timestamps.get(offset + i));
108,456✔
749
                entry.changeset = chunked_changeset;
108,456✔
750
                return begin_version + i + 1;
108,456✔
751
            }
108,456✔
752
        }
184,804✔
753
    }
260,074✔
754
    return 0;
87,574✔
755
}
196,030✔
756

757
// sum_of_history_entry_sizes calculates the sum of the changeset sizes of the
758
// local history entries that produced a version that succeeds `begin_version`
759
// and precedes `end_version`.
760
std::uint_fast64_t ClientHistory::sum_of_history_entry_sizes(version_type begin_version,
761
                                                             version_type end_version) const noexcept
762
{
55,180✔
763
    if (begin_version >= end_version)
55,180✔
764
        return 0;
13,706✔
765

766
    REALM_ASSERT(m_arrays->changesets.is_attached());
41,474✔
767
    REALM_ASSERT(m_arrays->origin_file_idents.is_attached());
41,474✔
768
    REALM_ASSERT(end_version <= m_sync_history_base_version + sync_history_size());
41,474✔
769

770
    version_type begin_version_2 = begin_version;
41,474✔
771
    version_type end_version_2 = end_version;
41,474✔
772
    clamp_sync_version_range(begin_version_2, end_version_2);
41,474✔
773

774
    std::uint_fast64_t sum_of_sizes = 0;
41,474✔
775

776
    std::size_t n = to_size_t(end_version_2 - begin_version_2);
41,474✔
777
    std::size_t offset = to_size_t(begin_version_2 - m_sync_history_base_version);
41,474✔
778
    for (std::size_t i = 0; i < n; ++i) {
130,940✔
779

780
        // Only local changesets are considered
781
        if (m_arrays->origin_file_idents.get(offset + i) != 0)
89,466✔
782
            continue;
15,798✔
783

784
        ChunkedBinaryData changeset(m_arrays->changesets, offset + i);
73,668✔
785
        ChunkedBinaryInputStream in{changeset};
73,668✔
786
        sum_of_sizes += util::compression::get_uncompressed_size_from_header(in);
73,668✔
787
    }
73,668✔
788

789
    return sum_of_sizes;
41,474✔
790
}
55,180✔
791

792
void ClientHistory::prepare_for_write()
793
{
241,498✔
794
    if (m_arrays) {
241,498✔
795
        REALM_ASSERT(m_arrays->root.size() == s_root_size);
222,804✔
796
        return;
222,804✔
797
    }
222,804✔
798

799
    m_arrays.emplace(*m_db, *m_group);
18,694✔
800
}
18,694✔
801

802

803
Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset, BinaryData sync_changeset)
804
{
174,366✔
805
    // FIXME: BinaryColumn::set() currently interprets BinaryData(0,0) as
806
    // null. It should probably be changed such that BinaryData(0,0) is always
807
    // interpreted as the empty string. For the purpose of setting null values,
808
    // BinaryColumn::set() should accept values of type Optional<BinaryData>().
809
    if (ct_changeset.is_null())
174,366✔
810
        ct_changeset = BinaryData("", 0);
2,232✔
811
    m_arrays->ct_history.add(ct_changeset); // Throws
174,366✔
812

813
    REALM_ASSERT(!m_applying_server_changeset || !m_applying_client_reset);
174,366✔
814

815
    // If we're applying a changeset from the server then we should have already
816
    // added the history entry and don't need to do so here
817
    if (m_applying_server_changeset) {
174,366✔
818
        // We need to unset this before committing the write, as it's guarded
819
        // by the write lock
820
        m_applying_server_changeset = false;
26,062✔
821
        REALM_ASSERT(m_ct_history_base_version + ct_history_size() ==
26,062✔
822
                     m_sync_history_base_version + sync_history_size());
26,062✔
823
        REALM_ASSERT(sync_changeset.size() == 0);
26,062✔
824
        return m_ct_history_base_version + ct_history_size();
26,062✔
825
    }
26,062✔
826

827
    // We don't generate a changeset for any of the changes made as part of
828
    // applying a client reset as those changes are just bringing us into
829
    // alignment with the new server state
830
    if (m_applying_client_reset) {
148,304✔
831
        m_applying_client_reset = false;
7,872✔
832
        sync_changeset = {};
7,872✔
833
    }
7,872✔
834

835
    HistoryEntry entry;
148,304✔
836
    entry.origin_timestamp = m_local_origin_timestamp_source();
148,304✔
837
    entry.origin_file_ident = 0; // Of local origin
148,304✔
838
    entry.remote_version = m_progress_download.server_version;
148,304✔
839
    entry.changeset = sync_changeset;
148,304✔
840
    add_sync_history_entry(entry); // Throws
148,304✔
841

842
    // uploadable_bytes is updated at every local Realm change. The total
843
    // number of uploadable bytes must be persisted in the Realm, since the
844
    // synchronization history is trimmed. Even if the synchronization
845
    // history wasn't trimmed, it would be expensive to traverse the entire
846
    // history at every access to uploadable bytes.
847
    Array& root = m_arrays->root;
148,304✔
848
    std::uint_fast64_t uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
148,304✔
849
    uploadable_bytes += entry.changeset.size();
148,304✔
850
    root.set(s_progress_uploadable_bytes_iip, RefOrTagged::make_tagged(uploadable_bytes));
148,304✔
851

852
    return m_ct_history_base_version + ct_history_size();
148,304✔
853
}
174,366✔
854

855
void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
856
{
174,362✔
857
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
174,362✔
858
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
174,362✔
859
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
174,362✔
860
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
174,362✔
861
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
174,362✔
862

863
    if (!entry.changeset.is_null()) {
174,362✔
864
        auto changeset = entry.changeset.get_first_chunk();
131,032✔
865
        auto compressed = util::compression::allocate_and_compress_nonportable(changeset);
131,032✔
866
        m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws
131,032✔
867
    }
131,032✔
868
    else {
43,330✔
869
        m_arrays->changesets.add(BinaryData("", 0)); // Throws
43,330✔
870
    }
43,330✔
871

872
    m_arrays->reciprocal_transforms.add(BinaryData{});                                            // Throws
174,362✔
873
    m_arrays->remote_versions.insert(realm::npos, std::int_fast64_t(entry.remote_version));       // Throws
174,362✔
874
    m_arrays->origin_file_idents.insert(realm::npos, std::int_fast64_t(entry.origin_file_ident)); // Throws
174,362✔
875
    m_arrays->origin_timestamps.insert(realm::npos, std::int_fast64_t(entry.origin_timestamp));   // Throws
174,362✔
876
}
174,362✔
877

878

879
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
880
{
55,182✔
881
    Array& root = m_arrays->root;
55,182✔
882

883
    // Progress must never decrease
884
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_iip).get_as_int());
55,182✔
885
        progress.latest_server_version.version < current) {
55,182✔
886
        throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
887
                                   util::format("latest server version cannot decrease (current: %1, new: %2)",
×
888
                                                current, progress.latest_server_version.version),
×
889
                                   ProtocolError::bad_progress);
×
890
    }
×
891
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
55,182✔
892
        progress.download.server_version < current) {
55,182✔
893
        throw IntegrationException(
×
894
            ErrorCodes::SyncProtocolInvariantFailed,
×
895
            util::format("server version of download cursor cannot decrease (current: %1, new: %2)", current,
×
896
                         progress.download.server_version),
×
897
            ProtocolError::bad_progress);
×
898
    }
×
899
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
55,182✔
900
        progress.download.last_integrated_client_version < current) {
55,182✔
901
        throw IntegrationException(
×
902
            ErrorCodes::SyncProtocolInvariantFailed,
×
903
            util::format("last integrated client version of download cursor cannot decrease (current: %1, new: %2)",
×
904
                         current, progress.download.last_integrated_client_version),
×
905
            ProtocolError::bad_progress);
×
906
    }
×
907
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
55,182✔
908
        progress.upload.client_version < current) {
55,182✔
909
        throw IntegrationException(
×
910
            ErrorCodes::SyncProtocolInvariantFailed,
×
911
            util::format("client version of upload cursor cannot decrease (current: %1, new: %2)", current,
×
912
                         progress.upload.client_version),
×
913
            ProtocolError::bad_progress);
×
914
    }
×
915
    const auto last_integrated_server_version = progress.upload.last_integrated_server_version;
55,182✔
916
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_upload_server_version_iip).get_as_int());
55,182✔
917
        last_integrated_server_version > 0 && last_integrated_server_version < current) {
55,182✔
918
        throw IntegrationException(
×
919
            ErrorCodes::SyncProtocolInvariantFailed,
×
920
            util::format("last integrated server version of upload cursor cannot decrease (current: %1, new: %2)",
×
921
                         current, last_integrated_server_version),
×
922
            ProtocolError::bad_progress);
×
923
    }
×
924

925
    auto uploaded_bytes = std::uint_fast64_t(root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int());
55,182✔
926
    auto previous_upload_client_version =
55,182✔
927
        version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
55,182✔
928
    uploaded_bytes += sum_of_history_entry_sizes(previous_upload_client_version, progress.upload.client_version);
55,182✔
929

930
    root.set(s_progress_download_server_version_iip,
55,182✔
931
             RefOrTagged::make_tagged(progress.download.server_version)); // Throws
55,182✔
932
    root.set(s_progress_download_client_version_iip,
55,182✔
933
             RefOrTagged::make_tagged(progress.download.last_integrated_client_version)); // Throws
55,182✔
934
    root.set(s_progress_latest_server_version_iip,
55,182✔
935
             RefOrTagged::make_tagged(progress.latest_server_version.version)); // Throws
55,182✔
936
    root.set(s_progress_latest_server_version_salt_iip,
55,182✔
937
             RefOrTagged::make_tagged(progress.latest_server_version.salt)); // Throws
55,182✔
938
    root.set(s_progress_upload_client_version_iip,
55,182✔
939
             RefOrTagged::make_tagged(progress.upload.client_version)); // Throws
55,182✔
940
    if (progress.upload.last_integrated_server_version > 0) {
55,182✔
941
        root.set(s_progress_upload_server_version_iip,
49,342✔
942
                 RefOrTagged::make_tagged(progress.upload.last_integrated_server_version)); // Throws
49,342✔
943
    }
49,342✔
944

945
    root.set(s_progress_downloadable_bytes_iip,
55,182✔
946
             RefOrTagged::make_tagged(downloadable_bytes.as_bytes())); // Throws
55,182✔
947
    root.set(s_progress_uploaded_bytes_iip,
55,182✔
948
             RefOrTagged::make_tagged(uploaded_bytes)); // Throws
55,182✔
949

950
    m_progress_download = progress.download;
55,182✔
951

952
    trim_sync_history(); // Throws
55,182✔
953
}
55,182✔
954

955
void ClientHistory::set_download_progress(Transaction& tr, DownloadableProgress p)
956
{
3,968✔
957
    using gf = _impl::GroupFriend;
3,968✔
958
    ref_type ref = gf::get_history_ref(tr);
3,968✔
959
    REALM_ASSERT(ref);
3,968✔
960
    Array root(gf::get_alloc(tr));
3,968✔
961
    root.init_from_ref(ref);
3,968✔
962
    gf::set_history_parent(tr, root);
3,968✔
963
    REALM_ASSERT(root.size() > s_progress_uploadable_bytes_iip);
3,968✔
964
    root.set(s_progress_downloadable_bytes_iip,
3,968✔
965
             RefOrTagged::make_tagged(p.as_bytes())); // Throws
3,968✔
966
}
3,968✔
967

968
void ClientHistory::trim_ct_history()
969
{
157,164✔
970
    version_type begin = m_ct_history_base_version;
157,164✔
971
    version_type end = m_version_of_oldest_bound_snapshot;
157,164✔
972
    REALM_ASSERT(end >= begin);
157,164✔
973

974
    std::size_t n = std::size_t(end - begin);
157,164✔
975
    if (n == 0)
157,164✔
976
        return;
18,694✔
977

978
    // The new changeset is always added before set_oldest_bound_version()
979
    // is called. Therefore, the trimming operation can never leave the
980
    // history empty.
981
    REALM_ASSERT(n < ct_history_size());
138,470✔
982

983
    for (std::size_t i = 0; i < n; ++i) {
282,972✔
984
        std::size_t j = (n - 1) - i;
144,502✔
985
        m_arrays->ct_history.erase(j);
144,502✔
986
    }
144,502✔
987

988
    m_ct_history_base_version += n;
138,470✔
989

990
    REALM_ASSERT(m_ct_history_base_version + ct_history_size() == m_sync_history_base_version + sync_history_size());
138,470✔
991
}
138,470✔
992

993

994
// Trimming rules for synchronization history:
995
//
996
// Let C be the latest client version that was integrated on the server prior to
997
// the latest server version currently integrated by the client
998
// (`m_progress_download.last_integrated_client_version`).
999
//
1000
// Definition: An *upload skippable history entry* is one whose changeset is
1001
// either empty, or of remote origin.
1002
//
1003
// Then, a history entry, E, can be trimmed away if it precedes C, or E is
1004
// upload skippable, and there are no upload nonskippable entries between C and
1005
// E.
1006
//
1007
// Since the history representation is contiguous, it is necessary that the
1008
// trimming rule upholds the following invariant:
1009
//
1010
// > If a changeset can be trimmed, then any earlier changeset can also be
1011
// > trimmed.
1012
//
1013
// Note that C corresponds to the earliest possible beginning of the merge
1014
// window for the next incoming changeset from the server.
1015
void ClientHistory::trim_sync_history()
1016
{
55,180✔
1017
    version_type begin = m_sync_history_base_version;
55,180✔
1018
    version_type end = std::max(m_progress_download.last_integrated_client_version, s_initial_version + 0);
55,180✔
1019
    // Note: At this point, `end` corresponds to C in the description above.
1020

1021
    // `end` (`m_progress_download.last_integrated_client_version`) will precede
1022
    // the beginning of the history, if we trimmed beyond
1023
    // `m_progress_download.last_integrated_client_version` during the previous
1024
    // trimming session. Since new entries, that have now become eligible for
1025
    // scanning, may also be upload skippable, we need to continue the scan from
1026
    // the beginning of the history in that case.
1027
    if (end < begin)
55,180✔
1028
        end = begin;
19,010✔
1029

1030
    // FIXME: It seems like in some cases, a particular history entry that
1031
    // terminates the scan may get examined over and over every time
1032
    // trim_history() is called. For this reason, it seems like it would be
1033
    // worth considering to cache the outcome.
1034

1035
    // FIXME: It seems like there is a significant overlap between what is going
1036
    // on here and in a place like find_uploadable_changesets(). Maybe there is
1037
    // grounds for some refactoring to take that into account, especially, to
1038
    // avoid scanning the same parts of the history for the same information
1039
    // multiple times.
1040

1041
    {
55,180✔
1042
        std::size_t offset = std::size_t(end - begin);
55,180✔
1043
        std::size_t n = std::size_t(sync_history_size() - offset);
55,180✔
1044
        std::size_t i = 0;
55,180✔
1045
        while (i < n) {
101,488✔
1046
            std::int_fast64_t origin_file_ident = m_arrays->origin_file_idents.get(offset + i);
79,710✔
1047
            bool of_local_origin = (origin_file_ident == 0);
79,710✔
1048
            if (of_local_origin) {
79,710✔
1049
                std::size_t pos = 0;
59,932✔
1050
                BinaryData chunk = m_arrays->changesets.get_at(offset + i, pos);
59,932✔
1051
                bool nonempty = (chunk.size() > 0);
59,932✔
1052
                if (nonempty)
59,932✔
1053
                    break; // Not upload skippable
33,402✔
1054
            }
59,932✔
1055
            ++i;
46,308✔
1056
        }
46,308✔
1057
        end += i;
55,180✔
1058
    }
55,180✔
1059

1060
    std::size_t n = std::size_t(end - begin);
55,180✔
1061
    do_trim_sync_history(n); // Throws
55,180✔
1062
}
55,180✔
1063

1064
bool ClientHistory::no_pending_local_changes(version_type version) const
1065
{
184✔
1066
    ensure_updated(version);
184✔
1067
    size_t base_version = 0;
184✔
1068
    auto upload_client_version =
184✔
1069
        version_type(m_arrays->root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
184✔
1070
    if (upload_client_version > m_sync_history_base_version)
184✔
NEW
1071
        base_version = size_t(upload_client_version - m_sync_history_base_version);
×
1072
    for (size_t i = base_version; i < sync_history_size(); i++) {
436✔
1073
        if (m_arrays->origin_file_idents.get(i) == 0) {
292✔
1074
            std::size_t pos = 0;
288✔
1075
            BinaryData chunk = m_arrays->changesets.get_at(i, pos);
288✔
1076
            if (chunk.size() > 0)
288✔
1077
                return false;
40✔
1078
        }
288✔
1079
    }
292✔
1080
    return true;
144✔
1081
}
184✔
1082

1083
void ClientHistory::do_trim_sync_history(std::size_t n)
1084
{
63,056✔
1085
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
63,056✔
1086
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
63,056✔
1087
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
63,056✔
1088
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
63,056✔
1089
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
63,056✔
1090
    REALM_ASSERT(n <= sync_history_size());
63,056✔
1091

1092
    if (n == sync_history_size()) {
63,056✔
1093
        m_arrays->changesets.clear();
25,798✔
1094
        m_arrays->reciprocal_transforms.clear();
25,798✔
1095
        m_arrays->remote_versions.clear();
25,798✔
1096
        m_arrays->origin_file_idents.clear();
25,798✔
1097
        m_arrays->origin_timestamps.clear();
25,798✔
1098
    }
25,798✔
1099
    else if (n > 0) {
37,258✔
1100
        for (std::size_t i = 0; i < n; ++i) {
73,462✔
1101
            std::size_t j = (n - 1) - i;
54,342✔
1102
            m_arrays->changesets.erase(j); // Throws
54,342✔
1103
        }
54,342✔
1104
        for (std::size_t i = 0; i < n; ++i) {
73,462✔
1105
            std::size_t j = (n - 1) - i;
54,342✔
1106
            m_arrays->reciprocal_transforms.erase(j); // Throws
54,342✔
1107
        }
54,342✔
1108
        for (std::size_t i = 0; i < n; ++i) {
73,462✔
1109
            std::size_t j = (n - 1) - i;
54,342✔
1110
            m_arrays->remote_versions.erase(j); // Throws
54,342✔
1111
        }
54,342✔
1112
        for (std::size_t i = 0; i < n; ++i) {
73,462✔
1113
            std::size_t j = (n - 1) - i;
54,342✔
1114
            m_arrays->origin_file_idents.erase(j); // Throws
54,342✔
1115
        }
54,342✔
1116
        for (std::size_t i = 0; i < n; ++i) {
73,460✔
1117
            std::size_t j = (n - 1) - i;
54,340✔
1118
            m_arrays->origin_timestamps.erase(j); // Throws
54,340✔
1119
        }
54,340✔
1120
    }
19,120✔
1121

1122
    m_sync_history_base_version += n;
63,056✔
1123
}
63,056✔
1124

1125
void ClientHistory::fix_up_client_file_ident_in_stored_changesets(Transaction& group,
1126
                                                                  file_ident_type client_file_ident)
1127
{
886✔
1128
    // Must be in write transaction!
1129

1130
    REALM_ASSERT(client_file_ident != 0);
886✔
1131
    auto promote_global_key = [client_file_ident](GlobalKey* oid) {
886✔
1132
        if (oid->hi() == 0) {
20✔
1133
            // client_file_ident == 0
1134
            *oid = GlobalKey{uint64_t(client_file_ident), oid->lo()};
20✔
1135
            return true;
20✔
1136
        }
20✔
1137
        return false;
×
1138
    };
20✔
1139

1140
    Group::TableNameBuffer buffer;
886✔
1141
    auto get_table_for_class = [&](StringData class_name) -> ConstTableRef {
3,088✔
1142
        return group.get_table(Group::class_name_to_table_name(class_name, buffer));
3,088✔
1143
    };
3,088✔
1144

1145
    util::compression::CompressMemoryArena arena;
886✔
1146
    util::AppendBuffer<char> compressed;
886✔
1147

1148
    // Fix up changesets.
1149
    Array& root = m_arrays->root;
886✔
1150
    uint64_t uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
886✔
1151
    for (size_t i = 0; i < sync_history_size(); ++i) {
4,172✔
1152
        // We could have opened a pre-provisioned realm file. In this case we can skip the entries downloaded
1153
        // from the server.
1154
        if (m_arrays->origin_file_idents.get(i) != 0)
3,286✔
1155
            continue;
×
1156

1157
        ChunkedBinaryData changeset{m_arrays->changesets, i};
3,286✔
1158
        ChunkedBinaryInputStream is{changeset};
3,286✔
1159
        size_t decompressed_size;
3,286✔
1160
        auto decompressed = util::compression::decompress_nonportable_input_stream(is, decompressed_size);
3,286✔
1161
        if (!decompressed)
3,286✔
1162
            continue;
×
1163
        Changeset log;
3,286✔
1164
        parse_changeset(*decompressed, log);
3,286✔
1165

1166
        bool did_modify = false;
3,286✔
1167
        auto last_class_name = InternString::npos;
3,286✔
1168
        ConstTableRef selected_table;
3,286✔
1169
        for (auto instr : log) {
189,590✔
1170
            if (!instr)
189,590✔
1171
                continue;
×
1172

1173
            if (auto obj_instr = instr->get_if<Instruction::ObjectInstruction>()) {
189,590✔
1174
                // Cache the TableRef
1175
                if (obj_instr->table != last_class_name) {
188,182✔
1176
                    StringData class_name = log.get_string(obj_instr->table);
3,088✔
1177
                    last_class_name = obj_instr->table;
3,088✔
1178
                    selected_table = get_table_for_class(class_name);
3,088✔
1179
                }
3,088✔
1180

1181
                // Fix up instructions using GlobalKey to identify objects.
1182
                if (auto global_key = mpark::get_if<GlobalKey>(&obj_instr->object)) {
188,182✔
1183
                    did_modify = promote_global_key(global_key);
16✔
1184
                }
16✔
1185

1186
                // Fix up the payload for Set and ArrayInsert.
1187
                Instruction::Payload* payload = nullptr;
188,182✔
1188
                if (auto set_instr = instr->get_if<Instruction::Update>()) {
188,182✔
1189
                    payload = &set_instr->value;
174,482✔
1190
                }
174,482✔
1191
                else if (auto list_insert_instr = instr->get_if<Instruction::ArrayInsert>()) {
13,700✔
1192
                    payload = &list_insert_instr->value;
236✔
1193
                }
236✔
1194

1195
                if (payload && payload->type == Instruction::Payload::Type::Link) {
188,182✔
1196
                    if (auto global_key = mpark::get_if<GlobalKey>(&payload->data.link.target)) {
44✔
1197
                        did_modify = promote_global_key(global_key);
4✔
1198
                    }
4✔
1199
                }
44✔
1200
            }
188,182✔
1201
        }
189,590✔
1202

1203
        if (did_modify) {
3,286✔
1204
            ChangesetEncoder::Buffer modified;
4✔
1205
            encode_changeset(log, modified);
4✔
1206
            util::compression::allocate_and_compress_nonportable(arena, modified, compressed);
4✔
1207
            m_arrays->changesets.set(i, BinaryData{compressed.data(), compressed.size()}); // Throws
4✔
1208

1209
            uploadable_bytes += modified.size() - decompressed_size;
4✔
1210
        }
4✔
1211
    }
3,286✔
1212

1213
    root.set(s_progress_uploadable_bytes_iip, RefOrTagged::make_tagged(uploadable_bytes));
886✔
1214
}
886✔
1215

1216
void ClientHistory::set_group(Group* group, bool updated)
1217
{
205,266✔
1218
    _impl::History::set_group(group, updated);
205,266✔
1219
    if (m_arrays)
205,266✔
1220
        _impl::GroupFriend::set_history_parent(*m_group, m_arrays->root);
159,402✔
1221
}
205,266✔
1222

1223
void ClientHistory::record_current_schema_version()
1224
{
4✔
1225
    using gf = _impl::GroupFriend;
4✔
1226
    Allocator& alloc = gf::get_alloc(*m_group);
4✔
1227
    auto ref = gf::get_history_ref(*m_group);
4✔
1228
    REALM_ASSERT(ref != 0);
4✔
1229
    Array root{alloc};
4✔
1230
    gf::set_history_parent(*m_group, root);
4✔
1231
    root.init_from_ref(ref);
4✔
1232
    Array schema_versions{alloc};
4✔
1233
    schema_versions.set_parent(&root, s_schema_versions_iip);
4✔
1234
    schema_versions.init_from_parent();
4✔
1235
    version_type snapshot_version = m_db->get_version_of_latest_snapshot();
4✔
1236
    record_current_schema_version(schema_versions, snapshot_version); // Throws
4✔
1237
}
4✔
1238

1239

1240
void ClientHistory::record_current_schema_version(Array& schema_versions, version_type snapshot_version)
1241
{
18,696✔
1242
    static_assert(s_schema_versions_size == 4, "");
18,696✔
1243
    REALM_ASSERT(schema_versions.size() == s_schema_versions_size);
18,696✔
1244

1245
    Allocator& alloc = schema_versions.get_alloc();
18,696✔
1246
    {
18,696✔
1247
        Array sv_schema_versions{alloc};
18,696✔
1248
        sv_schema_versions.set_parent(&schema_versions, s_sv_schema_versions_iip);
18,696✔
1249
        sv_schema_versions.init_from_parent();
18,696✔
1250
        int schema_version = get_client_history_schema_version();
18,696✔
1251
        sv_schema_versions.add(schema_version); // Throws
18,696✔
1252
    }
18,696✔
1253
    {
18,696✔
1254
        Array sv_library_versions{alloc};
18,696✔
1255
        sv_library_versions.set_parent(&schema_versions, s_sv_library_versions_iip);
18,696✔
1256
        sv_library_versions.init_from_parent();
18,696✔
1257
        const char* library_version = REALM_VERSION_STRING;
18,696✔
1258
        std::size_t size = std::strlen(library_version);
18,696✔
1259
        Array value{alloc};
18,696✔
1260
        bool context_flag = false;
18,696✔
1261
        value.create(Array::type_Normal, context_flag, size); // Throws
18,696✔
1262
        _impl::ShallowArrayDestroyGuard adg{&value};
18,696✔
1263
        using uchar = unsigned char;
18,696✔
1264
        for (std::size_t i = 0; i < size; ++i)
149,582✔
1265
            value.set(i, std::int_fast64_t(uchar(library_version[i]))); // Throws
130,886✔
1266
        sv_library_versions.add(std::int_fast64_t(value.get_ref()));    // Throws
18,696✔
1267
        adg.release();                                                  // Ownership transferred to parent array
18,696✔
1268
    }
18,696✔
1269
    {
18,696✔
1270
        Array sv_snapshot_versions{alloc};
18,696✔
1271
        sv_snapshot_versions.set_parent(&schema_versions, s_sv_snapshot_versions_iip);
18,696✔
1272
        sv_snapshot_versions.init_from_parent();
18,696✔
1273
        sv_snapshot_versions.add(std::int_fast64_t(snapshot_version)); // Throws
18,696✔
1274
    }
18,696✔
1275
    {
18,696✔
1276
        Array sv_timestamps{alloc};
18,696✔
1277
        sv_timestamps.set_parent(&schema_versions, s_sv_timestamps_iip);
18,696✔
1278
        sv_timestamps.init_from_parent();
18,696✔
1279
        std::time_t timestamp = std::time(nullptr);
18,696✔
1280
        sv_timestamps.add(std::int_fast64_t(timestamp)); // Throws
18,696✔
1281
    }
18,696✔
1282
}
18,696✔
1283

1284
// Overriding member function in realm::_impl::History
1285
void ClientHistory::update_from_ref_and_version(ref_type ref, version_type version)
1286
{
201,124✔
1287
    if (ref == 0) {
201,124✔
1288
        // No history
1289
        m_ct_history_base_version = version;
18,692✔
1290
        m_sync_history_base_version = version;
18,692✔
1291
        m_arrays.reset();
18,692✔
1292
        m_progress_download = {0, 0};
18,692✔
1293
        return;
18,692✔
1294
    }
18,692✔
1295
    if (REALM_LIKELY(m_arrays)) {
182,432✔
1296
        m_arrays->init_from_ref(ref);
155,978✔
1297
    }
155,978✔
1298
    else {
26,454✔
1299
        m_arrays.emplace(m_db->get_alloc(), m_group, ref);
26,454✔
1300
    }
26,454✔
1301

1302
    m_ct_history_base_version = version - ct_history_size();
182,432✔
1303
    m_sync_history_base_version = version - sync_history_size();
182,432✔
1304
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
182,432✔
1305
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
182,432✔
1306
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
182,432✔
1307
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
182,432✔
1308
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
182,432✔
1309

1310
    const Array& root = m_arrays->root;
182,432✔
1311
    m_progress_download.server_version =
182,432✔
1312
        version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
182,432✔
1313
    m_progress_download.last_integrated_client_version =
182,432✔
1314
        version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
182,432✔
1315
}
182,432✔
1316

1317

1318
// Overriding member function in realm::_impl::History
1319
void ClientHistory::update_from_parent(version_type current_version)
1320
{
161,614✔
1321
    using gf = _impl::GroupFriend;
161,614✔
1322
    ref_type ref = gf::get_history_ref(*m_group);
161,614✔
1323
    update_from_ref_and_version(ref, current_version); // Throws
161,614✔
1324
}
161,614✔
1325

1326

1327
// Overriding member function in realm::_impl::History
1328
void ClientHistory::get_changesets(version_type begin_version, version_type end_version,
1329
                                   BinaryIterator* iterators) const noexcept
1330
{
65,176✔
1331
    REALM_ASSERT(begin_version <= end_version);
65,176✔
1332
    REALM_ASSERT(begin_version >= m_ct_history_base_version);
65,176✔
1333
    REALM_ASSERT(end_version <= m_ct_history_base_version + ct_history_size());
65,176✔
1334
    std::size_t n = to_size_t(end_version - begin_version);
65,176✔
1335
    REALM_ASSERT(n == 0 || m_arrays);
65,176✔
1336
    std::size_t offset = to_size_t(begin_version - m_ct_history_base_version);
65,176✔
1337
    for (std::size_t i = 0; i < n; ++i)
182,608✔
1338
        iterators[i] = BinaryIterator(&m_arrays->ct_history, offset + i);
117,432✔
1339
}
65,176✔
1340

1341

1342
// Overriding member function in realm::_impl::History
1343
void ClientHistory::set_oldest_bound_version(version_type version)
1344
{
174,364✔
1345
    REALM_ASSERT(version >= m_version_of_oldest_bound_snapshot);
174,364✔
1346
    if (version > m_version_of_oldest_bound_snapshot) {
174,364✔
1347
        m_version_of_oldest_bound_snapshot = version;
157,164✔
1348
        trim_ct_history(); // Throws
157,164✔
1349
    }
157,164✔
1350
}
174,364✔
1351

1352
// Overriding member function in realm::_impl::History
1353
void ClientHistory::verify() const
1354
{
232✔
1355
#ifdef REALM_DEBUG
232✔
1356
    // The size of the continuous transactions history can only be zero when the
1357
    // Realm is in the initial empty state where top-ref is null.
1358
    REALM_ASSERT(ct_history_size() != 0 || m_ct_history_base_version == s_initial_version + 0);
232!
1359

1360
    if (!m_arrays) {
232✔
1361
        REALM_ASSERT(m_progress_download.server_version == 0);
×
1362
        REALM_ASSERT(m_progress_download.last_integrated_client_version == 0);
×
1363
        return;
×
1364
    }
×
1365
    m_arrays->verify();
232✔
1366

1367
    auto& root = m_arrays->root;
232✔
1368
    version_type progress_download_server_version =
232✔
1369
        version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
232✔
1370
    version_type progress_download_client_version =
232✔
1371
        version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
232✔
1372
    REALM_ASSERT(progress_download_server_version == m_progress_download.server_version);
232✔
1373
    REALM_ASSERT(progress_download_client_version == m_progress_download.last_integrated_client_version);
232✔
1374
    REALM_ASSERT(progress_download_client_version <= m_sync_history_base_version + sync_history_size());
232✔
1375
    version_type remote_version_of_last_entry = 0;
232✔
1376
    if (auto size = sync_history_size())
232✔
1377
        remote_version_of_last_entry = m_arrays->remote_versions.get(size - 1);
232✔
1378
    REALM_ASSERT(progress_download_server_version >= remote_version_of_last_entry);
232✔
1379

1380
    // Verify that there is no cooked history.
1381
    Array cooked_history{m_db->get_alloc()};
232✔
1382
    cooked_history.set_parent(&root, s_cooked_history_iip);
232✔
1383
    REALM_ASSERT(cooked_history.get_ref_from_parent() == 0);
232✔
1384
#endif // REALM_DEBUG
232✔
1385
}
232✔
1386

1387
ClientHistory::Arrays::Arrays(Allocator& alloc) noexcept
1388
    : root(alloc)
53,402✔
1389
    , ct_history(alloc)
53,402✔
1390
    , changesets(alloc)
53,402✔
1391
    , reciprocal_transforms(alloc)
53,402✔
1392
    , remote_versions(alloc)
53,402✔
1393
    , origin_file_idents(alloc)
53,402✔
1394
    , origin_timestamps(alloc)
53,402✔
1395
{
106,816✔
1396
}
106,816✔
1397

1398
ClientHistory::Arrays::Arrays(DB& db, Group& group)
1399
    : Arrays(db.get_alloc())
9,196✔
1400
{
18,694✔
1401
    auto& alloc = db.get_alloc();
18,694✔
1402
    {
18,694✔
1403
        bool context_flag = false;
18,694✔
1404
        std::size_t size = s_root_size;
18,694✔
1405
        root.create(Array::type_HasRefs, context_flag, size); // Throws
18,694✔
1406
    }
18,694✔
1407
    _impl::DeepArrayDestroyGuard dg{&root};
18,694✔
1408

1409
    ct_history.set_parent(&root, s_ct_history_iip);
18,694✔
1410
    ct_history.create(); // Throws
18,694✔
1411
    changesets.set_parent(&root, s_changesets_iip);
18,694✔
1412
    changesets.create(); // Throws
18,694✔
1413
    reciprocal_transforms.set_parent(&root, s_reciprocal_transforms_iip);
18,694✔
1414
    reciprocal_transforms.create(); // Throws
18,694✔
1415
    remote_versions.set_parent(&root, s_remote_versions_iip);
18,694✔
1416
    remote_versions.create(); // Throws
18,694✔
1417
    origin_file_idents.set_parent(&root, s_origin_file_idents_iip);
18,694✔
1418
    origin_file_idents.create(); // Throws
18,694✔
1419
    origin_timestamps.set_parent(&root, s_origin_timestamps_iip);
18,694✔
1420
    origin_timestamps.create(); // Throws
18,694✔
1421

1422
    { // `schema_versions` table
18,694✔
1423
        Array schema_versions{alloc};
18,694✔
1424
        bool context_flag = false;
18,694✔
1425
        std::size_t size = s_schema_versions_size;
18,694✔
1426
        schema_versions.create(Array::type_HasRefs, context_flag, size); // Throws
18,694✔
1427
        _impl::DeepArrayDestroyGuard adg{&schema_versions};
18,694✔
1428

1429
        auto create_array = [&](NodeHeader::Type type, int ndx_in_parent) {
74,774✔
1430
            MemRef mem = Array::create_empty_array(type, context_flag, alloc);
74,774✔
1431
            ref_type ref = mem.get_ref();
74,774✔
1432
            _impl::DeepArrayRefDestroyGuard ardg{ref, alloc};
74,774✔
1433
            schema_versions.set_as_ref(ndx_in_parent, ref); // Throws
74,774✔
1434
            ardg.release();                                 // Ownership transferred to parent array
74,774✔
1435
        };
74,774✔
1436
        create_array(Array::type_Normal, s_sv_schema_versions_iip);
18,694✔
1437
        create_array(Array::type_HasRefs, s_sv_library_versions_iip);
18,694✔
1438
        create_array(Array::type_Normal, s_sv_snapshot_versions_iip);
18,694✔
1439
        create_array(Array::type_Normal, s_sv_timestamps_iip);
18,694✔
1440

1441
        version_type snapshot_version = db.get_version_of_latest_snapshot();
18,694✔
1442
        record_current_schema_version(schema_versions, snapshot_version);  // Throws
18,694✔
1443
        root.set_as_ref(s_schema_versions_iip, schema_versions.get_ref()); // Throws
18,694✔
1444
        adg.release();                                                     // Ownership transferred to parent array
18,694✔
1445
    }
18,694✔
1446
    _impl::GroupFriend::prepare_history_parent(group, root, Replication::hist_SyncClient,
18,694✔
1447
                                               get_client_history_schema_version(), 0); // Throws
18,694✔
1448
    // Note: gf::prepare_history_parent() also ensures the the root array has a
1449
    // slot for the history ref.
1450
    root.update_parent(); // Throws
18,694✔
1451
    dg.release();
18,694✔
1452
}
18,694✔
1453

1454
ClientHistory::Arrays::Arrays(Allocator& alloc, Group* parent, ref_type ref)
1455
    : Arrays(alloc)
44,206✔
1456
{
88,122✔
1457
    using gf = _impl::GroupFriend;
88,122✔
1458
    root.init_from_ref(ref);
88,122✔
1459
    if (parent)
88,122✔
1460
        gf::set_history_parent(*parent, root);
88,116✔
1461

1462
    ct_history.set_parent(&root, s_ct_history_iip);
88,122✔
1463
    changesets.set_parent(&root, s_changesets_iip);
88,122✔
1464
    reciprocal_transforms.set_parent(&root, s_reciprocal_transforms_iip);
88,122✔
1465
    remote_versions.set_parent(&root, s_remote_versions_iip);
88,122✔
1466
    origin_file_idents.set_parent(&root, s_origin_file_idents_iip);
88,122✔
1467
    origin_timestamps.set_parent(&root, s_origin_timestamps_iip);
88,122✔
1468

1469
    init_from_ref(ref); // Throws
88,122✔
1470

1471
    Array cooked_history{alloc};
88,122✔
1472
    cooked_history.set_parent(&root, s_cooked_history_iip);
88,122✔
1473
    // We should have no cooked history in existing Realms.
1474
    REALM_ASSERT(cooked_history.get_ref_from_parent() == 0);
88,122✔
1475
}
88,122✔
1476

1477
void ClientHistory::Arrays::Arrays::init_from_ref(ref_type ref)
1478
{
244,096✔
1479
    root.init_from_ref(ref);
244,096✔
1480
    REALM_ASSERT(root.size() == s_root_size);
244,096✔
1481
    {
244,096✔
1482
        ref_type ref_2 = root.get_as_ref(s_ct_history_iip);
244,096✔
1483
        ct_history.init_from_ref(ref_2); // Throws
244,096✔
1484
    }
244,096✔
1485
    {
244,096✔
1486
        ref_type ref_2 = root.get_as_ref(s_changesets_iip);
244,096✔
1487
        changesets.init_from_ref(ref_2); // Throws
244,096✔
1488
    }
244,096✔
1489
    {
244,096✔
1490
        ref_type ref_2 = root.get_as_ref(s_reciprocal_transforms_iip);
244,096✔
1491
        reciprocal_transforms.init_from_ref(ref_2); // Throws
244,096✔
1492
    }
244,096✔
1493
    remote_versions.init_from_parent();    // Throws
244,096✔
1494
    origin_file_idents.init_from_parent(); // Throws
244,096✔
1495
    origin_timestamps.init_from_parent();  // Throws
244,096✔
1496
}
244,096✔
1497

1498
void ClientHistory::Arrays::verify() const
1499
{
232✔
1500
#ifdef REALM_DEBUG
232✔
1501
    root.verify();
232✔
1502
    ct_history.verify();
232✔
1503
    changesets.verify();
232✔
1504
    reciprocal_transforms.verify();
232✔
1505
    remote_versions.verify();
232✔
1506
    origin_file_idents.verify();
232✔
1507
    origin_timestamps.verify();
232✔
1508
    REALM_ASSERT(root.size() == s_root_size);
232✔
1509
    REALM_ASSERT(reciprocal_transforms.size() == changesets.size());
232✔
1510
    REALM_ASSERT(remote_versions.size() == changesets.size());
232✔
1511
    REALM_ASSERT(origin_file_idents.size() == changesets.size());
232✔
1512
    REALM_ASSERT(origin_timestamps.size() == changesets.size());
232✔
1513
#endif // REALM_DEBUG
232✔
1514
}
232✔
1515

1516
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc