• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / thomas.goyne_420

20 Jun 2024 08:37PM UTC coverage: 90.964% (-0.002%) from 90.966%
thomas.goyne_420

Pull #7796

Evergreen

tgoyne
Derive upload completion entirely from the state of the history

Rather than tracking a bunch of derived state in-memory, check for upload
completion by checking if there are any unuploaded changesets. This is both
multiprocess-compatible and is more precise than the old checks, which had some
false-negatives.
Pull Request #7796: RCORE-2160 Make upload completion reporting multiprocess-compatible

102120 of 180324 branches covered (56.63%)

112 of 117 new or added lines in 8 files covered. (95.73%)

46 existing lines in 16 files now uncovered.

214645 of 235966 relevant lines covered (90.96%)

5898967.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.03
/src/realm/sync/noinst/client_history_impl.cpp
1
///////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2022 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/sync/noinst/client_history_impl.hpp>
20

21
#include <realm/sync/changeset.hpp>
22
#include <realm/sync/changeset_parser.hpp>
23
#include <realm/sync/instruction_applier.hpp>
24
#include <realm/sync/instruction_replication.hpp>
25
#include <realm/sync/noinst/client_reset.hpp>
26
#include <realm/sync/noinst/client_reset_recovery.hpp>
27
#include <realm/transaction.hpp>
28
#include <realm/util/compression.hpp>
29
#include <realm/util/features.h>
30
#include <realm/util/functional.hpp>
31
#include <realm/util/scope_exit.hpp>
32
#include <realm/version.hpp>
33

34
#include <algorithm>
35
#include <ctime>
36
#include <cstring>
37
#include <utility>
38

39
namespace realm::sync {
40

41
void ClientHistory::set_history_adjustments(
42
    util::Logger& logger, version_type current_version, SaltedFileIdent client_file_ident,
43
    SaltedVersion server_version, const std::vector<_impl::client_reset::RecoveredChange>& recovered_changesets)
44
{
7,736✔
45
    ensure_updated(current_version); // Throws
7,736✔
46
    prepare_for_write();             // Throws
7,736✔
47

48
    version_type client_version = m_sync_history_base_version + sync_history_size();
7,736✔
49
    REALM_ASSERT(client_version == current_version); // For now
7,736✔
50
    Array& root = m_arrays->root;
7,736✔
51
    m_group->set_sync_file_id(client_file_ident.ident); // Throws
7,736✔
52

53
    size_t uploadable_bytes = 0;
7,736✔
54
    if (recovered_changesets.empty()) {
7,736✔
55
        // Either we had nothing to upload or we're discarding the unsynced changes
56
        logger.debug("History adjustments: discarding %1 history entries", sync_history_size());
3,884✔
57
        do_trim_sync_history(sync_history_size()); // Throws
3,884✔
58
    }
3,884✔
59
    else {
3,852✔
60
        // Discard all sync history before the first recovered changeset. This is
61
        // required because we are going to discard our progress information and
62
        // so won't know which history entries have been uploaded already.
63
        auto first_version = recovered_changesets.front().version;
3,852✔
64
        REALM_ASSERT(first_version >= m_sync_history_base_version);
3,852✔
65
        auto discard_count = std::size_t(first_version - m_sync_history_base_version);
3,852✔
66
        do_trim_sync_history(discard_count);
3,852✔
67

68
        if (logger.would_log(util::Logger::Level::debug)) {
3,852✔
69
            logger.debug("History adjustments: trimming %1 history entries and updating the remaining history "
3,804✔
70
                         "entries (%2)",
3,804✔
71
                         discard_count, sync_history_size());
3,804✔
72
            for (size_t i = 0, size = m_arrays->changesets.size(); i < size; ++i) {
19,156✔
73
                logger.debug("- %1: ident(%2) changeset_size(%3) remote_version(%4)", i,
15,352✔
74
                             m_arrays->origin_file_idents.get(i), m_arrays->changesets.get(i).size(),
15,352✔
75
                             m_arrays->remote_versions.get(i));
15,352✔
76
            }
15,352✔
77
        }
3,804✔
78

79
        util::compression::CompressMemoryArena arena;
3,852✔
80
        util::AppendBuffer<char> compressed;
3,852✔
81
        for (auto& [changeset, version] : recovered_changesets) {
7,786✔
82
            uploadable_bytes += changeset.size();
7,786✔
83
            auto i = size_t(version - m_sync_history_base_version);
7,786✔
84
            util::compression::allocate_and_compress_nonportable(arena, changeset, compressed);
7,786✔
85
            m_arrays->changesets.set(i, BinaryData{compressed.data(), compressed.size()}); // Throws
7,786✔
86
            m_arrays->reciprocal_transforms.set(i, BinaryData());
7,786✔
87
        }
7,786✔
88
        // Server version is updated for *every* entry in the sync history to ensure that server versions don't
89
        // decrease.
90
        for (size_t i = 0, size = m_arrays->remote_versions.size(); i < size; ++i) {
19,476✔
91
            m_arrays->remote_versions.set(i, server_version.version);
15,624✔
92
            version_type version = m_sync_history_base_version + i;
15,624✔
93
            logger.debug("Updating %1: client_version(%2) changeset_size(%3) server_version(%4)", i, version + 1,
15,624✔
94
                         m_arrays->changesets.get(i).size(), server_version.version);
15,624✔
95
        }
15,624✔
96
    }
3,852✔
97
    logger.debug("New uploadable bytes after history adjustment: %1", uploadable_bytes);
7,736✔
98

99
    // Client progress versions are set to 0 to signal to the server that we've
100
    // reset our versioning. If we send the actual values, the server would
101
    // complain that the versions (probably) don't correspond to the ones sent
102
    // when downloading the fresh realm.
103
    root.set(s_progress_download_client_version_iip,
7,736✔
104
             RefOrTagged::make_tagged(0)); // Throws
7,736✔
105
    root.set(s_progress_upload_client_version_iip,
7,736✔
106
             RefOrTagged::make_tagged(0)); // Throws
7,736✔
107

108
    root.set(s_client_file_ident_salt_iip,
7,736✔
109
             RefOrTagged::make_tagged(client_file_ident.salt)); // Throws
7,736✔
110
    root.set(s_progress_download_server_version_iip,
7,736✔
111
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,736✔
112
    root.set(s_progress_latest_server_version_iip,
7,736✔
113
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,736✔
114
    root.set(s_progress_latest_server_version_salt_iip,
7,736✔
115
             RefOrTagged::make_tagged(server_version.salt)); // Throws
7,736✔
116
    root.set(s_progress_upload_server_version_iip,
7,736✔
117
             RefOrTagged::make_tagged(server_version.version)); // Throws
7,736✔
118
    root.set(s_progress_downloaded_bytes_iip,
7,736✔
119
             RefOrTagged::make_tagged(0)); // Throws
7,736✔
120
    root.set(s_progress_downloadable_bytes_iip,
7,736✔
121
             RefOrTagged::make_tagged(0)); // Throws
7,736✔
122
    root.set(s_progress_uploaded_bytes_iip,
7,736✔
123
             RefOrTagged::make_tagged(0)); // Throws
7,736✔
124
    root.set(s_progress_uploadable_bytes_iip,
7,736✔
125
             RefOrTagged::make_tagged(uploadable_bytes)); // Throws
7,736✔
126

127
    m_progress_download = {server_version.version, 0};
7,736✔
128
    m_applying_client_reset = true;
7,736✔
129
}
7,736✔
130

131
std::vector<ClientHistory::LocalChange> ClientHistory::get_local_changes(version_type current_version) const
132
{
3,928✔
133
    ensure_updated(current_version); // Throws
3,928✔
134
    std::vector<ClientHistory::LocalChange> changesets;
3,928✔
135
    if (!m_arrays || m_arrays->changesets.is_empty())
3,928✔
136
        return changesets;
×
137

138
    sync::version_type begin_version = 0;
3,928✔
139
    {
3,928✔
140
        sync::version_type local_version;
3,928✔
141
        SaltedFileIdent local_ident;
3,928✔
142
        SyncProgress local_progress;
3,928✔
143
        get_status(local_version, local_ident, local_progress);
3,928✔
144
        begin_version = local_progress.upload.client_version;
3,928✔
145
    }
3,928✔
146

147
    version_type end_version = m_sync_history_base_version + sync_history_size();
3,928✔
148
    if (begin_version < m_sync_history_base_version)
3,928✔
149
        begin_version = m_sync_history_base_version;
194✔
150

151
    for (version_type version = begin_version; version < end_version; ++version) {
24,002✔
152
        std::size_t ndx = std::size_t(version - m_sync_history_base_version);
20,074✔
153
        std::int_fast64_t origin_file_ident = m_arrays->origin_file_idents.get(ndx);
20,074✔
154
        bool not_from_server = (origin_file_ident == 0);
20,074✔
155
        if (not_from_server) {
20,074✔
156
            bool compressed = false;
20,042✔
157
            // find_sync_history_entry() returns 0 to indicate not found and
158
            // otherwise adds 1 to the version, and then get_reciprocal_transform()
159
            // subtracts 1 from the version
160
            if (auto changeset = get_reciprocal_transform(version + 1, compressed); !changeset.empty()) {
20,042✔
161
                changesets.push_back({version, changeset});
7,874✔
162
            }
7,874✔
163
        }
20,042✔
164
    }
20,074✔
165
    return changesets;
3,928✔
166
}
3,928✔
167

168
void ClientHistory::set_local_origin_timestamp_source(util::UniqueFunction<timestamp_type()> source_fn)
169
{
12✔
170
    m_local_origin_timestamp_source = std::move(source_fn);
12✔
171
}
12✔
172

173
// Overriding member function in realm::Replication
174
void ClientReplication::initialize(DB& sg)
175
{
20,244✔
176
    SyncReplication::initialize(sg); // Throws
20,244✔
177
    m_history.initialize(sg);
20,244✔
178
}
20,244✔
179

180

181
// Overriding member function in realm::Replication
182
auto ClientReplication::get_history_type() const noexcept -> HistoryType
183
{
55,466✔
184
    return hist_SyncClient;
55,466✔
185
}
55,466✔
186

187

188
// Overriding member function in realm::Replication
189
int ClientReplication::get_history_schema_version() const noexcept
190
{
20,224✔
191
    return get_client_history_schema_version();
20,224✔
192
}
20,224✔
193

194

195
// Overriding member function in realm::Replication
196
bool ClientReplication::is_upgradable_history_schema(int stored_schema_version) const noexcept
197
{
6✔
198
    if (stored_schema_version == 11) {
6✔
199
        return true;
6✔
200
    }
6✔
201
    return false;
×
202
}
6✔
203

204

205
// Overriding member function in realm::Replication
206
void ClientReplication::upgrade_history_schema(int stored_schema_version)
207
{
4✔
208
    // upgrade_history_schema() is called only when there is a need to upgrade
209
    // (`stored_schema_version < get_server_history_schema_version()`), and only
210
    // when is_upgradable_history_schema() returned true (`stored_schema_version
211
    // >= 1`).
212
    REALM_ASSERT(stored_schema_version < get_client_history_schema_version());
4✔
213
    REALM_ASSERT(stored_schema_version >= 11);
4✔
214
    int orig_schema_version = stored_schema_version;
4✔
215
    int schema_version = orig_schema_version;
4✔
216

217
    if (schema_version < 12) {
4✔
218
        m_history.compress_stored_changesets();
4✔
219
        schema_version = 12;
4✔
220
    }
4✔
221

222
    // NOTE: Future migration steps go here.
223

224
    REALM_ASSERT(schema_version == get_client_history_schema_version());
4✔
225

226
    // Record migration event
227
    m_history.record_current_schema_version(); // Throws
4✔
228
}
4✔
229

230
void ClientHistory::compress_stored_changesets()
231
{
4✔
232
    using gf = _impl::GroupFriend;
4✔
233
    Allocator& alloc = gf::get_alloc(*m_group);
4✔
234
    auto ref = gf::get_history_ref(*m_group);
4✔
235
    Arrays arrays{alloc, m_group, ref};
4✔
236

237
    util::AppendBuffer<char> compressed_buffer;
4✔
238
    util::AppendBuffer<char> decompressed_buffer;
4✔
239
    util::compression::CompressMemoryArena arena;
4✔
240
    auto columns = {&arrays.reciprocal_transforms, &arrays.changesets};
4✔
241
    for (auto column : columns) {
8✔
242
        for (size_t i = 0; i < column->size(); ++i) {
24✔
243
            ChunkedBinaryData data(*column, i);
16✔
244
            if (data.is_null())
16✔
245
                continue;
8✔
246
            data.copy_to(compressed_buffer);
8✔
247
            util::compression::allocate_and_compress_nonportable(arena, compressed_buffer, decompressed_buffer);
8✔
248
            column->set(i, BinaryData{decompressed_buffer.data(), decompressed_buffer.size()}); // Throws
8✔
249
        }
8✔
250
    }
8✔
251
}
4✔
252

253
// Overriding member function in realm::Replication
254
auto ClientReplication::prepare_changeset(const char* data, size_t size, version_type orig_version) -> version_type
255
{
177,114✔
256
    m_history.ensure_updated(orig_version);
177,114✔
257
    m_history.prepare_for_write(); // Throws
177,114✔
258

259
    BinaryData ct_changeset{data, size};
177,114✔
260
    auto& buffer = get_instruction_encoder().buffer();
177,114✔
261
    BinaryData sync_changeset(buffer.data(), buffer.size());
177,114✔
262

263
    return m_history.add_changeset(ct_changeset, sync_changeset); // Throws
177,114✔
264
}
177,114✔
265

266
util::UniqueFunction<SyncReplication::WriteValidator> ClientReplication::make_write_validator(Transaction& tr)
267
{
182,106✔
268
    if (!m_write_validator_factory) {
182,106✔
269
        return {};
169,054✔
270
    }
169,054✔
271

272
    return m_write_validator_factory(tr);
13,052✔
273
}
182,106✔
274

275
void ClientHistory::get_status(version_type& current_client_version, SaltedFileIdent& client_file_ident,
276
                               SyncProgress& progress) const
277
{
29,294✔
278
    TransactionRef rt = m_db->start_read(); // Throws
29,294✔
279
    version_type current_client_version_2 = rt->get_version();
29,294✔
280

281
    SaltedFileIdent client_file_ident_2{rt->get_sync_file_id(), 0};
29,294✔
282
    SyncProgress progress_2;
29,294✔
283
    using gf = _impl::GroupFriend;
29,294✔
284
    if (ref_type ref = gf::get_history_ref(*rt)) {
29,294✔
285
        Array root(m_db->get_alloc());
26,912✔
286
        root.init_from_ref(ref);
26,912✔
287
        client_file_ident_2.salt = salt_type(root.get_as_ref_or_tagged(s_client_file_ident_salt_iip).get_as_int());
26,912✔
288
        progress_2.latest_server_version.version =
26,912✔
289
            version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_iip).get_as_int());
26,912✔
290
        progress_2.latest_server_version.salt =
26,912✔
291
            version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_salt_iip).get_as_int());
26,912✔
292
        progress_2.download.server_version =
26,912✔
293
            version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
26,912✔
294
        progress_2.download.last_integrated_client_version =
26,912✔
295
            version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
26,912✔
296
        progress_2.upload.client_version =
26,912✔
297
            version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
26,912✔
298
        progress_2.upload.last_integrated_server_version =
26,912✔
299
            version_type(root.get_as_ref_or_tagged(s_progress_upload_server_version_iip).get_as_int());
26,912✔
300
    }
26,912✔
301

302
    current_client_version = current_client_version_2;
29,294✔
303
    client_file_ident = client_file_ident_2;
29,294✔
304
    progress = progress_2;
29,294✔
305

306
    REALM_ASSERT(current_client_version >= s_initial_version + 0);
29,294✔
307
    if (current_client_version == s_initial_version + 0)
29,294✔
308
        current_client_version = 0;
2,380✔
309
}
29,294✔
310

311
void ClientHistory::set_client_file_ident(SaltedFileIdent client_file_ident, bool fix_up_object_ids)
312
{
3,514✔
313
    REALM_ASSERT(client_file_ident.ident != 0);
3,514✔
314

315
    TransactionRef wt = m_db->start_write(); // Throws
3,514✔
316
    version_type local_version = wt->get_version();
3,514✔
317
    ensure_updated(local_version); // Throws
3,514✔
318
    prepare_for_write();           // Throws
3,514✔
319

320
    Array& root = m_arrays->root;
3,514✔
321
    REALM_ASSERT(wt->get_sync_file_id() == 0);
3,514✔
322
    wt->set_sync_file_id(client_file_ident.ident);
3,514✔
323
    root.set(s_client_file_ident_salt_iip,
3,514✔
324
             RefOrTagged::make_tagged(client_file_ident.salt)); // Throws
3,514✔
325
    root.set(s_progress_download_client_version_iip, RefOrTagged::make_tagged(0));
3,514✔
326
    root.set(s_progress_upload_client_version_iip, RefOrTagged::make_tagged(0));
3,514✔
327

328
    if (fix_up_object_ids) {
3,514✔
329
        fix_up_client_file_ident_in_stored_changesets(*wt, client_file_ident.ident); // Throws
874✔
330
    }
874✔
331

332
    // Note: This transaction produces an empty changeset. Empty changesets are
333
    // not uploaded to the server.
334
    wt->commit(); // Throws
3,514✔
335
}
3,514✔
336

337

338
// Overriding member function in realm::sync::ClientHistoryBase
339
void ClientHistory::set_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes,
340
                                      VersionInfo& version_info)
341
{
28,902✔
342
    TransactionRef wt = m_db->start_write(); // Throws
28,902✔
343
    version_type local_version = wt->get_version();
28,902✔
344
    ensure_updated(local_version); // Throws
28,902✔
345
    prepare_for_write();           // Throws
28,902✔
346

347
    update_sync_progress(progress, downloadable_bytes); // Throws
28,902✔
348

349
    // Note: This transaction produces an empty changeset. Empty changesets are
350
    // not uploaded to the server.
351
    version_type new_version = wt->commit(); // Throws
28,902✔
352
    version_info.realm_version = new_version;
28,902✔
353
    version_info.sync_version = {new_version, 0};
28,902✔
354
}
28,902✔
355

356
void ClientHistory::find_uploadable_changesets(UploadCursor& upload_progress, version_type end_version,
357
                                               std::vector<UploadChangeset>& uploadable_changesets,
358
                                               version_type& locked_server_version) const
359
{
59,810✔
360
    TransactionRef rt = m_db->start_read(); // Throws
59,810✔
361
    auto& alloc = m_db->get_alloc();
59,810✔
362
    using gf = _impl::GroupFriend;
59,810✔
363
    ref_type ref = gf::get_history_ref(*rt);
59,810✔
364
    REALM_ASSERT(ref);
59,810✔
365

366
    Arrays arrays(alloc, rt.get(), ref);
59,810✔
367
    const auto sync_history_size = arrays.changesets.size();
59,810✔
368
    const auto sync_history_base_version = rt->get_version() - sync_history_size;
59,810✔
369

370
    std::size_t accum_byte_size_soft_limit = 131072;   // 128 KB
59,810✔
371
    std::size_t accum_byte_size_hard_limit = 16777216; // server-imposed limit
59,810✔
372
    std::size_t accum_byte_size = 0;
59,810✔
373

374
    version_type begin_version_2 = std::max(upload_progress.client_version, sync_history_base_version);
59,810✔
375
    version_type end_version_2 = std::max(end_version, sync_history_base_version);
59,810✔
376
    version_type last_integrated_upstream_version = upload_progress.last_integrated_server_version;
59,810✔
377

378
    while (accum_byte_size < accum_byte_size_soft_limit) {
103,062✔
379
        HistoryEntry entry;
102,602✔
380
        version_type last_integrated_upstream_version_2 = last_integrated_upstream_version;
102,602✔
381
        version_type version = find_sync_history_entry(arrays, sync_history_base_version, begin_version_2,
102,602✔
382
                                                       end_version_2, entry, last_integrated_upstream_version_2);
102,602✔
383

384
        if (version == 0) {
102,602✔
385
            begin_version_2 = end_version_2;
59,338✔
386
            last_integrated_upstream_version = last_integrated_upstream_version_2;
59,338✔
387
            break;
59,338✔
388
        }
59,338✔
389

390
        ChunkedBinaryInputStream is(entry.changeset);
43,264✔
391
        size_t size = util::compression::get_uncompressed_size_from_header(is);
43,264✔
392
        if (accum_byte_size + size >= accum_byte_size_hard_limit && !uploadable_changesets.empty())
43,264✔
393
            break;
12✔
394
        accum_byte_size += size;
43,252✔
395
        last_integrated_upstream_version = last_integrated_upstream_version_2;
43,252✔
396
        begin_version_2 = version;
43,252✔
397

398
        UploadChangeset uc;
43,252✔
399
        util::AppendBuffer<char> decompressed;
43,252✔
400
        ChunkedBinaryInputStream is_2(entry.changeset);
43,252✔
401
        auto ec = util::compression::decompress_nonportable(is_2, decompressed);
43,252✔
402
        if (ec == util::compression::error::decompress_unsupported) {
43,252✔
403
            REALM_TERMINATE(
404
                "Synchronized Realm files with unuploaded local changes cannot be copied between platforms.");
×
405
        }
×
406
        REALM_ASSERT_3(ec, ==, std::error_code{});
43,252✔
407

408
        uc.origin_timestamp = entry.origin_timestamp;
43,252✔
409
        uc.origin_file_ident = entry.origin_file_ident;
43,252✔
410
        uc.progress = UploadCursor{version, entry.remote_version};
43,252✔
411
        uc.changeset = BinaryData{decompressed.data(), decompressed.size()};
43,252✔
412
        uc.buffer = decompressed.release().release();
43,252✔
413
        uploadable_changesets.push_back(std::move(uc)); // Throws
43,252✔
414
    }
43,252✔
415

416
    upload_progress = {std::min(begin_version_2, end_version), last_integrated_upstream_version};
59,810✔
417

418
    locked_server_version = arrays.root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int();
59,810✔
419
}
59,810✔
420

421

422
void ClientHistory::integrate_server_changesets(
423
    const SyncProgress& progress, DownloadableProgress downloadable_bytes,
424
    util::Span<const RemoteChangeset> incoming_changesets, VersionInfo& version_info, DownloadBatchState batch_state,
425
    util::Logger& logger, const TransactionRef& transact,
426
    util::UniqueFunction<void(const TransactionRef&, util::Span<Changeset>)> run_in_write_tr)
427
{
24,894✔
428
    REALM_ASSERT(incoming_changesets.size() != 0);
24,894✔
429
    REALM_ASSERT(
24,894✔
430
        (transact->get_transact_stage() == DB::transact_Writing && batch_state != DownloadBatchState::SteadyState) ||
24,894✔
431
        (transact->get_transact_stage() == DB::transact_Reading && batch_state == DownloadBatchState::SteadyState));
24,894✔
432
    std::vector<Changeset> changesets;
24,894✔
433
    changesets.resize(incoming_changesets.size()); // Throws
24,894✔
434

435
    // Parse incoming changesets without holding the write lock unless 'transact' is specified.
436
    try {
24,894✔
437
        for (std::size_t i = 0; i < incoming_changesets.size(); ++i) {
69,770✔
438
            const RemoteChangeset& changeset = incoming_changesets[i];
44,876✔
439
            parse_remote_changeset(changeset, changesets[i]); // Throws
44,876✔
440
            changesets[i].transform_sequence = i;
44,876✔
441
        }
44,876✔
442
    }
24,894✔
443
    catch (const BadChangesetError& e) {
24,894✔
444
        throw IntegrationException(ErrorCodes::BadChangeset,
4✔
445
                                   util::format("Failed to parse received changeset: %1", e.what()),
4✔
446
                                   ProtocolError::bad_changeset);
4✔
447
    }
4✔
448

449
    VersionID new_version{0, 0};
24,890✔
450
    auto num_changesets = incoming_changesets.size();
24,890✔
451
    util::Span<Changeset> changesets_to_integrate(changesets);
24,890✔
452
    const bool allow_lock_release = batch_state == DownloadBatchState::SteadyState;
24,890✔
453

454
    // Ideally, this loop runs only once, but it can run up to `incoming_changesets.size()` times, depending on the
455
    // number of times the sync client yields the write lock to allow the user to commit their changes.
456
    // In each iteration, at least one changeset is transformed and committed.
457
    // In FLX, all changesets are committed at once in the bootstrap phase (i.e, in one iteration).
458
    while (!changesets_to_integrate.empty()) {
49,780✔
459
        if (transact->get_transact_stage() == DB::transact_Reading) {
24,890✔
460
            transact->promote_to_write(); // Throws
22,810✔
461
        }
22,810✔
462
        VersionID old_version = transact->get_version_of_current_transaction();
24,890✔
463
        version_type local_version = old_version.version;
24,890✔
464
        auto sync_file_id = transact->get_sync_file_id();
24,890✔
465
        REALM_ASSERT(sync_file_id != 0);
24,890✔
466

467
        ensure_updated(local_version); // Throws
24,890✔
468
        prepare_for_write();           // Throws
24,890✔
469

470
        std::uint64_t downloaded_bytes_in_transaction = 0;
24,890✔
471
        auto changesets_transformed_count = transform_and_apply_server_changesets(
24,890✔
472
            changesets_to_integrate, transact, logger, downloaded_bytes_in_transaction, allow_lock_release);
24,890✔
473

474
        // downloaded_bytes always contains the total number of downloaded bytes
475
        // from the Realm. downloaded_bytes must be persisted in the Realm, since
476
        // the downloaded changesets are trimmed after use, and since it would be
477
        // expensive to traverse the entire history.
478
        Array& root = m_arrays->root;
24,890✔
479
        auto downloaded_bytes =
24,890✔
480
            std::uint64_t(root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int());
24,890✔
481
        downloaded_bytes += downloaded_bytes_in_transaction;
24,890✔
482
        root.set(s_progress_downloaded_bytes_iip, RefOrTagged::make_tagged(downloaded_bytes)); // Throws
24,890✔
483

484
        const RemoteChangeset& last_changeset = incoming_changesets[changesets_transformed_count - 1];
24,890✔
485
        auto changesets_for_cb = changesets_to_integrate.first(changesets_transformed_count);
24,890✔
486
        changesets_to_integrate = changesets_to_integrate.sub_span(changesets_transformed_count);
24,890✔
487
        incoming_changesets = incoming_changesets.sub_span(changesets_transformed_count);
24,890✔
488

489
        // During the bootstrap phase in flexible sync, the server sends multiple download messages with the same
490
        // synthetic server version that represents synthetic changesets generated from state on the server.
491
        if (batch_state == DownloadBatchState::LastInBatch && changesets_to_integrate.empty()) {
24,890✔
492
            update_sync_progress(progress, downloadable_bytes); // Throws
1,924✔
493
        }
1,924✔
494
        // Always update progress for download messages from steady state.
495
        else if (batch_state == DownloadBatchState::SteadyState && !changesets_to_integrate.empty()) {
22,966✔
496
            auto partial_progress = progress;
×
497
            partial_progress.download.server_version = last_changeset.remote_version;
×
498
            partial_progress.download.last_integrated_client_version = last_changeset.last_integrated_local_version;
×
NEW
499
            update_sync_progress(partial_progress, downloadable_bytes); // Throws
×
500
        }
×
501
        else if (batch_state == DownloadBatchState::SteadyState && changesets_to_integrate.empty()) {
22,966✔
502
            update_sync_progress(progress, downloadable_bytes); // Throws
22,784✔
503
        }
22,784✔
504
        if (run_in_write_tr) {
24,890✔
505
            run_in_write_tr(transact, changesets_for_cb);
24,838✔
506
        }
24,838✔
507

508
        // The reason we can use the `origin_timestamp`, and the `origin_file_ident`
509
        // from the last transformed changeset, and ignore all the other changesets, is
510
        // that these values are actually irrelevant for changesets of remote origin
511
        // stored in the client-side history (for now), except that
512
        // `origin_file_ident` is required to be nonzero, to mark it as having been
513
        // received from the server.
514
        HistoryEntry entry;
24,890✔
515
        entry.origin_timestamp = last_changeset.origin_timestamp;
24,890✔
516
        entry.origin_file_ident = last_changeset.origin_file_ident;
24,890✔
517
        entry.remote_version = last_changeset.remote_version;
24,890✔
518
        add_sync_history_entry(entry); // Throws
24,890✔
519

520
        // Tell prepare_commit()/add_changeset() not to write a history entry for
521
        // this transaction as we already did it.
522
        REALM_ASSERT(!m_applying_server_changeset);
24,890✔
523
        m_applying_server_changeset = true;
24,890✔
524
        // Commit and continue to write if in bootstrap phase and there are still changes to integrate.
525
        if (batch_state == DownloadBatchState::MoreToCome ||
24,890✔
526
            (batch_state == DownloadBatchState::LastInBatch && !changesets_to_integrate.empty())) {
24,890✔
527
            new_version = transact->commit_and_continue_writing(); // Throws
152✔
528
        }
152✔
529
        else {
24,738✔
530
            new_version = transact->commit_and_continue_as_read(); // Throws
24,738✔
531
        }
24,738✔
532

533
        logger.debug(util::LogCategory::changeset, "Integrated %1 changesets out of %2", changesets_transformed_count,
24,890✔
534
                     num_changesets);
24,890✔
535
    }
24,890✔
536

537
    REALM_ASSERT(new_version.version > 0);
24,890✔
538
    REALM_ASSERT(
24,890✔
539
        (batch_state == DownloadBatchState::MoreToCome && transact->get_transact_stage() == DB::transact_Writing) ||
24,890✔
540
        (batch_state != DownloadBatchState::MoreToCome && transact->get_transact_stage() == DB::transact_Reading));
24,890✔
541
    version_info.realm_version = new_version.version;
24,890✔
542
    version_info.sync_version = {new_version.version, 0};
24,890✔
543
}
24,890✔
544

545

546
size_t ClientHistory::transform_and_apply_server_changesets(util::Span<Changeset> changesets_to_integrate,
547
                                                            TransactionRef transact, util::Logger& logger,
548
                                                            std::uint64_t& downloaded_bytes, bool allow_lock_release)
549
{
24,890✔
550
    REALM_ASSERT(transact->get_transact_stage() == DB::transact_Writing);
24,890✔
551

552
    if (!m_replication.apply_server_changes()) {
24,890✔
553
        std::for_each(changesets_to_integrate.begin(), changesets_to_integrate.end(), [&](const Changeset& c) {
54✔
554
            downloaded_bytes += c.original_changeset_size;
54✔
555
        });
54✔
556
        // Skip over all changesets if they don't need to be transformed and applied.
557
        return changesets_to_integrate.size();
54✔
558
    }
54✔
559

560
    version_type local_version = transact->get_version_of_current_transaction().version;
24,836✔
561
    auto sync_file_id = transact->get_sync_file_id();
24,836✔
562

563
    try {
24,836✔
564
        for (auto& changeset : changesets_to_integrate) {
44,844✔
565
            REALM_ASSERT(changeset.last_integrated_remote_version <= local_version);
44,844✔
566
            REALM_ASSERT(changeset.origin_file_ident > 0 && changeset.origin_file_ident != sync_file_id);
44,844✔
567

568
            // It is possible that the synchronization history has been trimmed
569
            // to a point where a prefix of the merge window is no longer
570
            // available, but this can only happen if that prefix consisted
571
            // entirely of upload skippable entries. Since such entries (those
572
            // that are empty or of remote origin) will be skipped by the
573
            // transformer anyway, we can simply clamp the beginning of the
574
            // merge window to the beginning of the synchronization history,
575
            // when this situation occurs.
576
            //
577
            // See trim_sync_history() for further details.
578
            if (changeset.last_integrated_remote_version < m_sync_history_base_version)
44,844✔
579
                changeset.last_integrated_remote_version = m_sync_history_base_version;
33,472✔
580
        }
44,844✔
581

582
        constexpr std::size_t commit_byte_size_limit = 102400; // 100 KB
24,836✔
583

584
        auto changeset_applier = [&](const Changeset* transformed_changeset) -> bool {
44,816✔
585
            InstructionApplier applier{*transact};
44,816✔
586
            {
44,816✔
587
                TempShortCircuitReplication tscr{m_replication};
44,816✔
588
                applier.apply(*transformed_changeset); // Throws
44,816✔
589
            }
44,816✔
590
            downloaded_bytes += transformed_changeset->original_changeset_size;
44,816✔
591

592
            return !(allow_lock_release && m_db->other_writers_waiting_for_lock() &&
44,816✔
593
                     transact->get_commit_size() >= commit_byte_size_limit);
44,816✔
594
        };
44,816✔
595
        sync::Transformer transformer;
24,836✔
596
        auto changesets_transformed_count = transformer.transform_remote_changesets(
24,836✔
597
            *this, sync_file_id, local_version, changesets_to_integrate, changeset_applier, logger); // Throws
24,836✔
598
        return changesets_transformed_count;
24,836✔
599
    }
24,836✔
600
    catch (const BadChangesetError& e) {
24,836✔
601
        throw IntegrationException(ErrorCodes::BadChangeset,
×
602
                                   util::format("Failed to apply received changeset: %1", e.what()),
×
603
                                   ProtocolError::bad_changeset);
×
604
    }
×
605
    catch (const TransformError& e) {
24,836✔
606
        throw IntegrationException(ErrorCodes::BadChangeset,
28✔
607
                                   util::format("Failed to transform received changeset: %1", e.what()),
28✔
608
                                   ProtocolError::bad_changeset);
28✔
609
    }
28✔
610
}
24,836✔
611

612

613
void ClientHistory::get_upload_download_state(DB& db, std::uint_fast64_t& downloaded_bytes,
614
                                              DownloadableProgress& downloadable_bytes,
615
                                              std::uint_fast64_t& uploaded_bytes,
616
                                              std::uint_fast64_t& uploadable_bytes,
617
                                              std::uint_fast64_t& snapshot_version, version_type& uploaded_version)
618
{
80,396✔
619
    TransactionRef rt = db.start_read(); // Throws
80,396✔
620
    version_type current_client_version = rt->get_version();
80,396✔
621

622
    downloaded_bytes = 0;
80,396✔
623
    downloadable_bytes = uint64_t(0);
80,396✔
624
    uploaded_bytes = 0;
80,396✔
625
    uploadable_bytes = 0;
80,396✔
626
    snapshot_version = current_client_version;
80,396✔
627
    uploaded_version = 0;
80,396✔
628

629
    using gf = _impl::GroupFriend;
80,396✔
630
    ref_type ref = gf::get_history_ref(*rt);
80,396✔
631
    if (!ref)
80,396✔
632
        return;
1,930✔
633

634
    Array root(db.get_alloc());
78,466✔
635
    root.init_from_ref(ref);
78,466✔
636
    downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
78,466✔
637
    downloadable_bytes = root.get_as_ref_or_tagged(s_progress_downloadable_bytes_iip).get_as_int();
78,466✔
638
    uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
78,466✔
639
    uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
78,466✔
640

641
    uploaded_version = root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int();
78,466✔
642
    if (uploaded_version == current_client_version)
78,466✔
NEW
643
        return;
×
644

645
    BinaryColumn changesets(db.get_alloc());
78,466✔
646
    changesets.init_from_ref(root.get_as_ref(s_changesets_iip));
78,466✔
647
    IntegerBpTree origin_file_idents(db.get_alloc());
78,466✔
648
    origin_file_idents.init_from_ref(root.get_as_ref(s_origin_file_idents_iip));
78,466✔
649

650
    // `base_version` is the oldest version we have history for. If this is
651
    // greater than uploaded_version, all of the versions in between the two had
652
    // empty changesets and did not need to be uploaded. If this is less than
653
    // uploaded_version, we have changesets which have been uploaded but the
654
    // server has not yet told us we can delete and we may need to use for merging.
655
    auto base_version = current_client_version - changesets.size();
78,466✔
656
    if (uploaded_version < base_version) {
78,466✔
657
        uploaded_version = base_version;
53,492✔
658
    }
53,492✔
659

660
    auto count = size_t(current_client_version - uploaded_version);
78,466✔
661
    for (size_t i = changesets.size() - count; i < changesets.size(); ++i) {
190,590✔
662
        if (origin_file_idents.get(i) == 0) {
156,888✔
663
            size_t pos = 0;
143,728✔
664
            if (changesets.get_at(i, pos).size() != 0)
143,728✔
665
                break;
44,764✔
666
        }
143,728✔
667
        ++uploaded_version;
112,124✔
668
    }
112,124✔
669
}
78,466✔
670

671
void ClientHistory::get_upload_download_state(DB* db, std::uint_fast64_t& downloaded_bytes,
672
                                              std::uint_fast64_t& uploaded_bytes)
673
{
10,340✔
674
    TransactionRef rt = db->start_read(); // Throws
10,340✔
675
    downloaded_bytes = 0;
10,340✔
676
    uploaded_bytes = 0;
10,340✔
677

678
    using gf = _impl::GroupFriend;
10,340✔
679
    if (ref_type ref = gf::get_history_ref(*rt)) {
10,340✔
680
        Array root(db->get_alloc());
7,962✔
681
        root.init_from_ref(ref);
7,962✔
682
        downloaded_bytes = root.get_as_ref_or_tagged(s_progress_downloaded_bytes_iip).get_as_int();
7,962✔
683
        uploaded_bytes = root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int();
7,962✔
684
    }
7,962✔
685
}
10,340✔
686

687
auto ClientHistory::find_history_entry(version_type begin_version, version_type end_version,
688
                                       HistoryEntry& entry) const noexcept -> version_type
689
{
88,142✔
690
    version_type last_integrated_server_version;
88,142✔
691
    return find_sync_history_entry(*m_arrays, m_sync_history_base_version, begin_version, end_version, entry,
88,142✔
692
                                   last_integrated_server_version);
88,142✔
693
}
88,142✔
694

695

696
ChunkedBinaryData ClientHistory::get_reciprocal_transform(version_type version, bool& is_compressed) const
697
{
80,880✔
698
    is_compressed = true;
80,880✔
699
    REALM_ASSERT(version > m_sync_history_base_version);
80,880✔
700

701
    std::size_t index = to_size_t(version - m_sync_history_base_version) - 1;
80,880✔
702
    REALM_ASSERT(index < sync_history_size());
80,880✔
703

704
    ChunkedBinaryData reciprocal{m_arrays->reciprocal_transforms, index};
80,880✔
705
    if (!reciprocal.is_null())
80,880✔
706
        return reciprocal;
530✔
707
    return ChunkedBinaryData{m_arrays->changesets, index};
80,350✔
708
}
80,880✔
709

710

711
void ClientHistory::set_reciprocal_transform(version_type version, BinaryData data)
712
{
2,176✔
713
    REALM_ASSERT(version > m_sync_history_base_version);
2,176✔
714

715
    std::size_t index = size_t(version - m_sync_history_base_version) - 1;
2,176✔
716
    REALM_ASSERT(index < sync_history_size());
2,176✔
717

718
    if (data.is_null()) {
2,176✔
719
        m_arrays->reciprocal_transforms.set(index, BinaryData{"", 0}); // Throws
1,862✔
720
        return;
1,862✔
721
    }
1,862✔
722

723
    auto compressed = util::compression::allocate_and_compress_nonportable(data);
314✔
724
    m_arrays->reciprocal_transforms.set(index, BinaryData{compressed.data(), compressed.size()}); // Throws
314✔
725
}
314✔
726

727

728
auto ClientHistory::find_sync_history_entry(Arrays& arrays, version_type base_version, version_type begin_version,
729
                                            version_type end_version, HistoryEntry& entry,
730
                                            version_type& last_integrated_server_version) noexcept -> version_type
731
{
190,744✔
732
    if (begin_version == 0)
190,744✔
733
        begin_version = s_initial_version + 0;
×
734

735
    REALM_ASSERT(begin_version <= end_version);
190,744✔
736
    REALM_ASSERT(begin_version >= base_version);
190,744✔
737
    REALM_ASSERT(end_version <= base_version + arrays.changesets.size());
190,744✔
738
    std::size_t n = to_size_t(end_version - begin_version);
190,744✔
739
    std::size_t offset = to_size_t(begin_version - base_version);
190,744✔
740
    for (std::size_t i = 0; i < n; ++i) {
330,960✔
741
        std::int_fast64_t origin_file_ident = arrays.origin_file_idents.get(offset + i);
246,426✔
742
        last_integrated_server_version = version_type(arrays.remote_versions.get(offset + i));
246,426✔
743
        bool not_from_server = (origin_file_ident == 0);
246,426✔
744
        if (not_from_server) {
246,426✔
745
            ChunkedBinaryData chunked_changeset(arrays.changesets, offset + i);
176,056✔
746
            if (!chunked_changeset.empty()) {
176,056✔
747
                entry.origin_file_ident = file_ident_type(origin_file_ident);
106,210✔
748
                entry.remote_version = last_integrated_server_version;
106,210✔
749
                entry.origin_timestamp = timestamp_type(arrays.origin_timestamps.get(offset + i));
106,210✔
750
                entry.changeset = chunked_changeset;
106,210✔
751
                return begin_version + i + 1;
106,210✔
752
            }
106,210✔
753
        }
176,056✔
754
    }
246,426✔
755
    return 0;
84,534✔
756
}
190,744✔
757

758
// sum_of_history_entry_sizes calculates the sum of the changeset sizes of the
759
// local history entries that produced a version that succeeds `begin_version`
760
// and precedes `end_version`.
761
std::uint_fast64_t ClientHistory::sum_of_history_entry_sizes(version_type begin_version,
762
                                                             version_type end_version) const noexcept
763
{
53,612✔
764
    if (begin_version >= end_version)
53,612✔
765
        return 0;
12,230✔
766

767
    REALM_ASSERT(m_arrays->changesets.is_attached());
41,382✔
768
    REALM_ASSERT(m_arrays->origin_file_idents.is_attached());
41,382✔
769
    REALM_ASSERT(end_version <= m_sync_history_base_version + sync_history_size());
41,382✔
770

771
    version_type begin_version_2 = begin_version;
41,382✔
772
    version_type end_version_2 = end_version;
41,382✔
773
    clamp_sync_version_range(begin_version_2, end_version_2);
41,382✔
774

775
    std::uint_fast64_t sum_of_sizes = 0;
41,382✔
776

777
    std::size_t n = to_size_t(end_version_2 - begin_version_2);
41,382✔
778
    std::size_t offset = to_size_t(begin_version_2 - m_sync_history_base_version);
41,382✔
779
    for (std::size_t i = 0; i < n; ++i) {
130,642✔
780

781
        // Only local changesets are considered
782
        if (m_arrays->origin_file_idents.get(offset + i) != 0)
89,260✔
783
            continue;
15,194✔
784

785
        ChunkedBinaryData changeset(m_arrays->changesets, offset + i);
74,066✔
786
        ChunkedBinaryInputStream in{changeset};
74,066✔
787
        sum_of_sizes += util::compression::get_uncompressed_size_from_header(in);
74,066✔
788
    }
74,066✔
789

790
    return sum_of_sizes;
41,382✔
791
}
53,612✔
792

793
void ClientHistory::prepare_for_write()
794
{
242,144✔
795
    if (m_arrays) {
242,144✔
796
        REALM_ASSERT(m_arrays->root.size() == s_root_size);
223,682✔
797
        return;
223,682✔
798
    }
223,682✔
799

800
    m_arrays.emplace(*m_db, *m_group);
18,462✔
801
}
18,462✔
802

803

804
Replication::version_type ClientHistory::add_changeset(BinaryData ct_changeset, BinaryData sync_changeset)
805
{
177,112✔
806
    // FIXME: BinaryColumn::set() currently interprets BinaryData(0,0) as
807
    // null. It should probably be changed such that BinaryData(0,0) is always
808
    // interpreted as the empty string. For the purpose of setting null values,
809
    // BinaryColumn::set() should accept values of type Optional<BinaryData>().
810
    if (ct_changeset.is_null())
177,112✔
811
        ct_changeset = BinaryData("", 0);
2,138✔
812
    m_arrays->ct_history.add(ct_changeset); // Throws
177,112✔
813

814
    REALM_ASSERT(!m_applying_server_changeset || !m_applying_client_reset);
177,112✔
815

816
    // If we're applying a changeset from the server then we should have already
817
    // added the history entry and don't need to do so here
818
    if (m_applying_server_changeset) {
177,112✔
819
        // We need to unset this before committing the write, as it's guarded
820
        // by the write lock
821
        m_applying_server_changeset = false;
24,862✔
822
        REALM_ASSERT(m_ct_history_base_version + ct_history_size() ==
24,862✔
823
                     m_sync_history_base_version + sync_history_size());
24,862✔
824
        REALM_ASSERT(sync_changeset.size() == 0);
24,862✔
825
        return m_ct_history_base_version + ct_history_size();
24,862✔
826
    }
24,862✔
827

828
    // We don't generate a changeset for any of the changes made as part of
829
    // applying a client reset as those changes are just bringing us into
830
    // alignment with the new server state
831
    if (m_applying_client_reset) {
152,250✔
832
        m_applying_client_reset = false;
7,736✔
833
        sync_changeset = {};
7,736✔
834
    }
7,736✔
835

836
    HistoryEntry entry;
152,250✔
837
    entry.origin_timestamp = m_local_origin_timestamp_source();
152,250✔
838
    entry.origin_file_ident = 0; // Of local origin
152,250✔
839
    entry.remote_version = m_progress_download.server_version;
152,250✔
840
    entry.changeset = sync_changeset;
152,250✔
841
    add_sync_history_entry(entry); // Throws
152,250✔
842

843
    // uploadable_bytes is updated at every local Realm change. The total
844
    // number of uploadable bytes must be persisted in the Realm, since the
845
    // synchronization history is trimmed. Even if the synchronization
846
    // history wasn't trimmed, it would be expensive to traverse the entire
847
    // history at every access to uploadable bytes.
848
    Array& root = m_arrays->root;
152,250✔
849
    std::uint_fast64_t uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
152,250✔
850
    uploadable_bytes += entry.changeset.size();
152,250✔
851
    root.set(s_progress_uploadable_bytes_iip, RefOrTagged::make_tagged(uploadable_bytes));
152,250✔
852

853
    return m_ct_history_base_version + ct_history_size();
152,250✔
854
}
177,112✔
855

856
void ClientHistory::add_sync_history_entry(const HistoryEntry& entry)
857
{
177,112✔
858
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
177,112✔
859
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
177,112✔
860
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
177,112✔
861
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
177,112✔
862
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
177,112✔
863

864
    if (!entry.changeset.is_null()) {
177,112✔
865
        auto changeset = entry.changeset.get_first_chunk();
136,454✔
866
        auto compressed = util::compression::allocate_and_compress_nonportable(changeset);
136,454✔
867
        m_arrays->changesets.add(BinaryData{compressed.data(), compressed.size()}); // Throws
136,454✔
868
    }
136,454✔
869
    else {
40,658✔
870
        m_arrays->changesets.add(BinaryData("", 0)); // Throws
40,658✔
871
    }
40,658✔
872

873
    m_arrays->reciprocal_transforms.add(BinaryData{});                                            // Throws
177,112✔
874
    m_arrays->remote_versions.insert(realm::npos, std::int_fast64_t(entry.remote_version));       // Throws
177,112✔
875
    m_arrays->origin_file_idents.insert(realm::npos, std::int_fast64_t(entry.origin_file_ident)); // Throws
177,112✔
876
    m_arrays->origin_timestamps.insert(realm::npos, std::int_fast64_t(entry.origin_timestamp));   // Throws
177,112✔
877
}
177,112✔
878

879

880
void ClientHistory::update_sync_progress(const SyncProgress& progress, DownloadableProgress downloadable_bytes)
881
{
53,610✔
882
    Array& root = m_arrays->root;
53,610✔
883

884
    // Progress must never decrease
885
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_latest_server_version_iip).get_as_int());
53,610✔
886
        progress.latest_server_version.version < current) {
53,610✔
887
        throw IntegrationException(ErrorCodes::SyncProtocolInvariantFailed,
×
888
                                   util::format("latest server version cannot decrease (current: %1, new: %2)",
×
889
                                                current, progress.latest_server_version.version),
×
890
                                   ProtocolError::bad_progress);
×
891
    }
×
892
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
53,610✔
893
        progress.download.server_version < current) {
53,610✔
894
        throw IntegrationException(
×
895
            ErrorCodes::SyncProtocolInvariantFailed,
×
896
            util::format("server version of download cursor cannot decrease (current: %1, new: %2)", current,
×
897
                         progress.download.server_version),
×
898
            ProtocolError::bad_progress);
×
899
    }
×
900
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
53,610✔
901
        progress.download.last_integrated_client_version < current) {
53,610✔
902
        throw IntegrationException(
×
903
            ErrorCodes::SyncProtocolInvariantFailed,
×
904
            util::format("last integrated client version of download cursor cannot decrease (current: %1, new: %2)",
×
905
                         current, progress.download.last_integrated_client_version),
×
906
            ProtocolError::bad_progress);
×
907
    }
×
908
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
53,610✔
909
        progress.upload.client_version < current) {
53,610✔
910
        throw IntegrationException(
×
911
            ErrorCodes::SyncProtocolInvariantFailed,
×
912
            util::format("client version of upload cursor cannot decrease (current: %1, new: %2)", current,
×
913
                         progress.upload.client_version),
×
914
            ProtocolError::bad_progress);
×
915
    }
×
916
    const auto last_integrated_server_version = progress.upload.last_integrated_server_version;
53,610✔
917
    if (auto current = version_type(root.get_as_ref_or_tagged(s_progress_upload_server_version_iip).get_as_int());
53,610✔
918
        last_integrated_server_version > 0 && last_integrated_server_version < current) {
53,610✔
919
        throw IntegrationException(
×
920
            ErrorCodes::SyncProtocolInvariantFailed,
×
921
            util::format("last integrated server version of upload cursor cannot decrease (current: %1, new: %2)",
×
922
                         current, last_integrated_server_version),
×
923
            ProtocolError::bad_progress);
×
924
    }
×
925

926
    auto uploaded_bytes = std::uint_fast64_t(root.get_as_ref_or_tagged(s_progress_uploaded_bytes_iip).get_as_int());
53,610✔
927
    auto previous_upload_client_version =
53,610✔
928
        version_type(root.get_as_ref_or_tagged(s_progress_upload_client_version_iip).get_as_int());
53,610✔
929
    uploaded_bytes += sum_of_history_entry_sizes(previous_upload_client_version, progress.upload.client_version);
53,610✔
930

931
    root.set(s_progress_download_server_version_iip,
53,610✔
932
             RefOrTagged::make_tagged(progress.download.server_version)); // Throws
53,610✔
933
    root.set(s_progress_download_client_version_iip,
53,610✔
934
             RefOrTagged::make_tagged(progress.download.last_integrated_client_version)); // Throws
53,610✔
935
    root.set(s_progress_latest_server_version_iip,
53,610✔
936
             RefOrTagged::make_tagged(progress.latest_server_version.version)); // Throws
53,610✔
937
    root.set(s_progress_latest_server_version_salt_iip,
53,610✔
938
             RefOrTagged::make_tagged(progress.latest_server_version.salt)); // Throws
53,610✔
939
    root.set(s_progress_upload_client_version_iip,
53,610✔
940
             RefOrTagged::make_tagged(progress.upload.client_version)); // Throws
53,610✔
941
    if (progress.upload.last_integrated_server_version > 0) {
53,610✔
942
        root.set(s_progress_upload_server_version_iip,
48,234✔
943
                 RefOrTagged::make_tagged(progress.upload.last_integrated_server_version)); // Throws
48,234✔
944
    }
48,234✔
945

946
    root.set(s_progress_downloadable_bytes_iip,
53,610✔
947
             RefOrTagged::make_tagged(downloadable_bytes.as_bytes())); // Throws
53,610✔
948
    root.set(s_progress_uploaded_bytes_iip,
53,610✔
949
             RefOrTagged::make_tagged(uploaded_bytes)); // Throws
53,610✔
950

951
    m_progress_download = progress.download;
53,610✔
952

953
    trim_sync_history(); // Throws
53,610✔
954
}
53,610✔
955

956

957
void ClientHistory::trim_ct_history()
958
{
153,990✔
959
    version_type begin = m_ct_history_base_version;
153,990✔
960
    version_type end = m_version_of_oldest_bound_snapshot;
153,990✔
961
    REALM_ASSERT(end >= begin);
153,990✔
962

963
    std::size_t n = std::size_t(end - begin);
153,990✔
964
    if (n == 0)
153,990✔
965
        return;
18,462✔
966

967
    // The new changeset is always added before set_oldest_bound_version()
968
    // is called. Therefore, the trimming operation can never leave the
969
    // history empty.
970
    REALM_ASSERT(n < ct_history_size());
135,528✔
971

972
    for (std::size_t i = 0; i < n; ++i) {
276,232✔
973
        std::size_t j = (n - 1) - i;
140,704✔
974
        m_arrays->ct_history.erase(j);
140,704✔
975
    }
140,704✔
976

977
    m_ct_history_base_version += n;
135,528✔
978

979
    REALM_ASSERT(m_ct_history_base_version + ct_history_size() == m_sync_history_base_version + sync_history_size());
135,528✔
980
}
135,528✔
981

982

983
// Trimming rules for synchronization history:
984
//
985
// Let C be the latest client version that was integrated on the server prior to
986
// the latest server version currently integrated by the client
987
// (`m_progress_download.last_integrated_client_version`).
988
//
989
// Definition: An *upload skippable history entry* is one whose changeset is
990
// either empty, or of remote origin.
991
//
992
// Then, a history entry, E, can be trimmed away if it precedes C, or E is
993
// upload skippable, and there are no upload nonskippable entries between C and
994
// E.
995
//
996
// Since the history representation is contiguous, it is necessary that the
997
// trimming rule upholds the following invariant:
998
//
999
// > If a changeset can be trimmed, then any earlier changeset can also be
1000
// > trimmed.
1001
//
1002
// Note that C corresponds to the earliest possible beginning of the merge
1003
// window for the next incoming changeset from the server.
1004
void ClientHistory::trim_sync_history()
1005
{
53,610✔
1006
    version_type begin = m_sync_history_base_version;
53,610✔
1007
    version_type end = std::max(m_progress_download.last_integrated_client_version, s_initial_version + 0);
53,610✔
1008
    // Note: At this point, `end` corresponds to C in the description above.
1009

1010
    // `end` (`m_progress_download.last_integrated_client_version`) will precede
1011
    // the beginning of the history, if we trimmed beyond
1012
    // `m_progress_download.last_integrated_client_version` during the previous
1013
    // trimming session. Since new entries, that have now become eligible for
1014
    // scanning, may also be upload skippable, we need to continue the scan from
1015
    // the beginning of the history in that case.
1016
    if (end < begin)
53,610✔
1017
        end = begin;
18,050✔
1018

1019
    // FIXME: It seems like in some cases, a particular history entry that
1020
    // terminates the scan may get examined over and over every time
1021
    // trim_history() is called. For this reason, it seems like it would be
1022
    // worth considering to cache the outcome.
1023

1024
    // FIXME: It seems like there is a significant overlap between what is going
1025
    // on here and in a place like find_uploadable_changesets(). Maybe there is
1026
    // grounds for some refactoring to take that into account, especially, to
1027
    // avoid scanning the same parts of the history for the same information
1028
    // multiple times.
1029

1030
    {
53,610✔
1031
        std::size_t offset = std::size_t(end - begin);
53,610✔
1032
        std::size_t n = std::size_t(sync_history_size() - offset);
53,610✔
1033
        std::size_t i = 0;
53,610✔
1034
        while (i < n) {
95,798✔
1035
            std::int_fast64_t origin_file_ident = m_arrays->origin_file_idents.get(offset + i);
74,762✔
1036
            bool of_local_origin = (origin_file_ident == 0);
74,762✔
1037
            if (of_local_origin) {
74,762✔
1038
                std::size_t pos = 0;
55,762✔
1039
                BinaryData chunk = m_arrays->changesets.get_at(offset + i, pos);
55,762✔
1040
                bool nonempty = (chunk.size() > 0);
55,762✔
1041
                if (nonempty)
55,762✔
1042
                    break; // Not upload skippable
32,574✔
1043
            }
55,762✔
1044
            ++i;
42,188✔
1045
        }
42,188✔
1046
        end += i;
53,610✔
1047
    }
53,610✔
1048

1049
    std::size_t n = std::size_t(end - begin);
53,610✔
1050
    do_trim_sync_history(n); // Throws
53,610✔
1051
}
53,610✔
1052

1053
bool ClientHistory::no_pending_local_changes(version_type version) const
1054
{
172✔
1055
    ensure_updated(version);
172✔
1056
    for (size_t i = 0; i < sync_history_size(); i++) {
420✔
1057
        if (m_arrays->origin_file_idents.get(i) == 0) {
284✔
1058
            std::size_t pos = 0;
280✔
1059
            BinaryData chunk = m_arrays->changesets.get_at(i, pos);
280✔
1060
            if (chunk.size() > 0)
280✔
1061
                return false;
36✔
1062
        }
280✔
1063
    }
284✔
1064
    return true;
136✔
1065
}
172✔
1066

1067
void ClientHistory::do_trim_sync_history(std::size_t n)
1068
{
61,348✔
1069
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
61,348✔
1070
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
61,348✔
1071
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
61,348✔
1072
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
61,348✔
1073
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
61,348✔
1074
    REALM_ASSERT(n <= sync_history_size());
61,348✔
1075

1076
    if (n == sync_history_size()) {
61,348✔
1077
        m_arrays->changesets.clear();
24,922✔
1078
        m_arrays->reciprocal_transforms.clear();
24,922✔
1079
        m_arrays->remote_versions.clear();
24,922✔
1080
        m_arrays->origin_file_idents.clear();
24,922✔
1081
        m_arrays->origin_timestamps.clear();
24,922✔
1082
    }
24,922✔
1083
    else if (n > 0) {
36,426✔
1084
        for (std::size_t i = 0; i < n; ++i) {
72,168✔
1085
            std::size_t j = (n - 1) - i;
53,260✔
1086
            m_arrays->changesets.erase(j); // Throws
53,260✔
1087
        }
53,260✔
1088
        for (std::size_t i = 0; i < n; ++i) {
72,170✔
1089
            std::size_t j = (n - 1) - i;
53,262✔
1090
            m_arrays->reciprocal_transforms.erase(j); // Throws
53,262✔
1091
        }
53,262✔
1092
        for (std::size_t i = 0; i < n; ++i) {
72,170✔
1093
            std::size_t j = (n - 1) - i;
53,262✔
1094
            m_arrays->remote_versions.erase(j); // Throws
53,262✔
1095
        }
53,262✔
1096
        for (std::size_t i = 0; i < n; ++i) {
72,170✔
1097
            std::size_t j = (n - 1) - i;
53,262✔
1098
            m_arrays->origin_file_idents.erase(j); // Throws
53,262✔
1099
        }
53,262✔
1100
        for (std::size_t i = 0; i < n; ++i) {
72,170✔
1101
            std::size_t j = (n - 1) - i;
53,262✔
1102
            m_arrays->origin_timestamps.erase(j); // Throws
53,262✔
1103
        }
53,262✔
1104
    }
18,908✔
1105

1106
    m_sync_history_base_version += n;
61,348✔
1107
}
61,348✔
1108

1109
void ClientHistory::fix_up_client_file_ident_in_stored_changesets(Transaction& group,
1110
                                                                  file_ident_type client_file_ident)
1111
{
874✔
1112
    // Must be in write transaction!
1113

1114
    REALM_ASSERT(client_file_ident != 0);
874✔
1115
    auto promote_global_key = [client_file_ident](GlobalKey* oid) {
874✔
1116
        if (oid->hi() == 0) {
20✔
1117
            // client_file_ident == 0
1118
            *oid = GlobalKey{uint64_t(client_file_ident), oid->lo()};
20✔
1119
            return true;
20✔
1120
        }
20✔
1121
        return false;
×
1122
    };
20✔
1123

1124
    Group::TableNameBuffer buffer;
874✔
1125
    auto get_table_for_class = [&](StringData class_name) -> ConstTableRef {
2,998✔
1126
        return group.get_table(Group::class_name_to_table_name(class_name, buffer));
2,998✔
1127
    };
2,998✔
1128

1129
    util::compression::CompressMemoryArena arena;
874✔
1130
    util::AppendBuffer<char> compressed;
874✔
1131

1132
    // Fix up changesets.
1133
    Array& root = m_arrays->root;
874✔
1134
    uint64_t uploadable_bytes = root.get_as_ref_or_tagged(s_progress_uploadable_bytes_iip).get_as_int();
874✔
1135
    for (size_t i = 0; i < sync_history_size(); ++i) {
4,018✔
1136
        // We could have opened a pre-provisioned realm file. In this case we can skip the entries downloaded
1137
        // from the server.
1138
        if (m_arrays->origin_file_idents.get(i) != 0)
3,144✔
1139
            continue;
×
1140

1141
        ChunkedBinaryData changeset{m_arrays->changesets, i};
3,144✔
1142
        ChunkedBinaryInputStream is{changeset};
3,144✔
1143
        size_t decompressed_size;
3,144✔
1144
        auto decompressed = util::compression::decompress_nonportable_input_stream(is, decompressed_size);
3,144✔
1145
        if (!decompressed)
3,144✔
1146
            continue;
×
1147
        Changeset log;
3,144✔
1148
        parse_changeset(*decompressed, log);
3,144✔
1149

1150
        bool did_modify = false;
3,144✔
1151
        auto last_class_name = InternString::npos;
3,144✔
1152
        ConstTableRef selected_table;
3,144✔
1153
        for (auto instr : log) {
189,470✔
1154
            if (!instr)
189,470✔
1155
                continue;
×
1156

1157
            if (auto obj_instr = instr->get_if<Instruction::ObjectInstruction>()) {
189,470✔
1158
                // Cache the TableRef
1159
                if (obj_instr->table != last_class_name) {
188,074✔
1160
                    StringData class_name = log.get_string(obj_instr->table);
2,998✔
1161
                    last_class_name = obj_instr->table;
2,998✔
1162
                    selected_table = get_table_for_class(class_name);
2,998✔
1163
                }
2,998✔
1164

1165
                // Fix up instructions using GlobalKey to identify objects.
1166
                if (auto global_key = mpark::get_if<GlobalKey>(&obj_instr->object)) {
188,074✔
1167
                    did_modify = promote_global_key(global_key);
16✔
1168
                }
16✔
1169

1170
                // Fix up the payload for Set and ArrayInsert.
1171
                Instruction::Payload* payload = nullptr;
188,074✔
1172
                if (auto set_instr = instr->get_if<Instruction::Update>()) {
188,074✔
1173
                    payload = &set_instr->value;
174,392✔
1174
                }
174,392✔
1175
                else if (auto list_insert_instr = instr->get_if<Instruction::ArrayInsert>()) {
13,682✔
1176
                    payload = &list_insert_instr->value;
228✔
1177
                }
228✔
1178

1179
                if (payload && payload->type == Instruction::Payload::Type::Link) {
188,074✔
1180
                    if (auto global_key = mpark::get_if<GlobalKey>(&payload->data.link.target)) {
44✔
1181
                        did_modify = promote_global_key(global_key);
4✔
1182
                    }
4✔
1183
                }
44✔
1184
            }
188,074✔
1185
        }
189,470✔
1186

1187
        if (did_modify) {
3,144✔
1188
            ChangesetEncoder::Buffer modified;
4✔
1189
            encode_changeset(log, modified);
4✔
1190
            util::compression::allocate_and_compress_nonportable(arena, modified, compressed);
4✔
1191
            m_arrays->changesets.set(i, BinaryData{compressed.data(), compressed.size()}); // Throws
4✔
1192

1193
            uploadable_bytes += modified.size() - decompressed_size;
4✔
1194
        }
4✔
1195
    }
3,144✔
1196

1197
    root.set(s_progress_uploadable_bytes_iip, RefOrTagged::make_tagged(uploadable_bytes));
874✔
1198
}
874✔
1199

1200
void ClientHistory::set_group(Group* group, bool updated)
1201
{
207,956✔
1202
    _impl::History::set_group(group, updated);
207,956✔
1203
    if (m_arrays)
207,956✔
1204
        _impl::GroupFriend::set_history_parent(*m_group, m_arrays->root);
162,338✔
1205
}
207,956✔
1206

1207
void ClientHistory::record_current_schema_version()
1208
{
4✔
1209
    using gf = _impl::GroupFriend;
4✔
1210
    Allocator& alloc = gf::get_alloc(*m_group);
4✔
1211
    auto ref = gf::get_history_ref(*m_group);
4✔
1212
    REALM_ASSERT(ref != 0);
4✔
1213
    Array root{alloc};
4✔
1214
    gf::set_history_parent(*m_group, root);
4✔
1215
    root.init_from_ref(ref);
4✔
1216
    Array schema_versions{alloc};
4✔
1217
    schema_versions.set_parent(&root, s_schema_versions_iip);
4✔
1218
    schema_versions.init_from_parent();
4✔
1219
    version_type snapshot_version = m_db->get_version_of_latest_snapshot();
4✔
1220
    record_current_schema_version(schema_versions, snapshot_version); // Throws
4✔
1221
}
4✔
1222

1223

1224
void ClientHistory::record_current_schema_version(Array& schema_versions, version_type snapshot_version)
1225
{
18,466✔
1226
    static_assert(s_schema_versions_size == 4, "");
18,466✔
1227
    REALM_ASSERT(schema_versions.size() == s_schema_versions_size);
18,466✔
1228

1229
    Allocator& alloc = schema_versions.get_alloc();
18,466✔
1230
    {
18,466✔
1231
        Array sv_schema_versions{alloc};
18,466✔
1232
        sv_schema_versions.set_parent(&schema_versions, s_sv_schema_versions_iip);
18,466✔
1233
        sv_schema_versions.init_from_parent();
18,466✔
1234
        int schema_version = get_client_history_schema_version();
18,466✔
1235
        sv_schema_versions.add(schema_version); // Throws
18,466✔
1236
    }
18,466✔
1237
    {
18,466✔
1238
        Array sv_library_versions{alloc};
18,466✔
1239
        sv_library_versions.set_parent(&schema_versions, s_sv_library_versions_iip);
18,466✔
1240
        sv_library_versions.init_from_parent();
18,466✔
1241
        const char* library_version = REALM_VERSION_STRING;
18,466✔
1242
        std::size_t size = std::strlen(library_version);
18,466✔
1243
        Array value{alloc};
18,466✔
1244
        bool context_flag = false;
18,466✔
1245
        value.create(Array::type_Normal, context_flag, size); // Throws
18,466✔
1246
        _impl::ShallowArrayDestroyGuard adg{&value};
18,466✔
1247
        using uchar = unsigned char;
18,466✔
1248
        for (std::size_t i = 0; i < size; ++i)
147,728✔
1249
            value.set(i, std::int_fast64_t(uchar(library_version[i]))); // Throws
129,262✔
1250
        sv_library_versions.add(std::int_fast64_t(value.get_ref()));    // Throws
18,466✔
1251
        adg.release();                                                  // Ownership transferred to parent array
18,466✔
1252
    }
18,466✔
1253
    {
18,466✔
1254
        Array sv_snapshot_versions{alloc};
18,466✔
1255
        sv_snapshot_versions.set_parent(&schema_versions, s_sv_snapshot_versions_iip);
18,466✔
1256
        sv_snapshot_versions.init_from_parent();
18,466✔
1257
        sv_snapshot_versions.add(std::int_fast64_t(snapshot_version)); // Throws
18,466✔
1258
    }
18,466✔
1259
    {
18,466✔
1260
        Array sv_timestamps{alloc};
18,466✔
1261
        sv_timestamps.set_parent(&schema_versions, s_sv_timestamps_iip);
18,466✔
1262
        sv_timestamps.init_from_parent();
18,466✔
1263
        std::time_t timestamp = std::time(nullptr);
18,466✔
1264
        sv_timestamps.add(std::int_fast64_t(timestamp)); // Throws
18,466✔
1265
    }
18,466✔
1266
}
18,466✔
1267

1268
// Overriding member function in realm::_impl::History
1269
void ClientHistory::update_from_ref_and_version(ref_type ref, version_type version)
1270
{
203,616✔
1271
    if (ref == 0) {
203,616✔
1272
        // No history
1273
        m_ct_history_base_version = version;
18,462✔
1274
        m_sync_history_base_version = version;
18,462✔
1275
        m_arrays.reset();
18,462✔
1276
        m_progress_download = {0, 0};
18,462✔
1277
        return;
18,462✔
1278
    }
18,462✔
1279
    if (REALM_LIKELY(m_arrays)) {
185,154✔
1280
        m_arrays->init_from_ref(ref);
158,862✔
1281
    }
158,862✔
1282
    else {
26,292✔
1283
        m_arrays.emplace(m_db->get_alloc(), m_group, ref);
26,292✔
1284
    }
26,292✔
1285

1286
    m_ct_history_base_version = version - ct_history_size();
185,154✔
1287
    m_sync_history_base_version = version - sync_history_size();
185,154✔
1288
    REALM_ASSERT(m_arrays->changesets.size() == sync_history_size());
185,154✔
1289
    REALM_ASSERT(m_arrays->reciprocal_transforms.size() == sync_history_size());
185,154✔
1290
    REALM_ASSERT(m_arrays->remote_versions.size() == sync_history_size());
185,154✔
1291
    REALM_ASSERT(m_arrays->origin_file_idents.size() == sync_history_size());
185,154✔
1292
    REALM_ASSERT(m_arrays->origin_timestamps.size() == sync_history_size());
185,154✔
1293

1294
    const Array& root = m_arrays->root;
185,154✔
1295
    m_progress_download.server_version =
185,154✔
1296
        version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
185,154✔
1297
    m_progress_download.last_integrated_client_version =
185,154✔
1298
        version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
185,154✔
1299
}
185,154✔
1300

1301

1302
// Overriding member function in realm::_impl::History
1303
void ClientHistory::update_from_parent(version_type current_version)
1304
{
164,914✔
1305
    using gf = _impl::GroupFriend;
164,914✔
1306
    ref_type ref = gf::get_history_ref(*m_group);
164,914✔
1307
    update_from_ref_and_version(ref, current_version); // Throws
164,914✔
1308
}
164,914✔
1309

1310

1311
// Overriding member function in realm::_impl::History
1312
void ClientHistory::get_changesets(version_type begin_version, version_type end_version,
1313
                                   BinaryIterator* iterators) const noexcept
1314
{
63,756✔
1315
    REALM_ASSERT(begin_version <= end_version);
63,756✔
1316
    REALM_ASSERT(begin_version >= m_ct_history_base_version);
63,756✔
1317
    REALM_ASSERT(end_version <= m_ct_history_base_version + ct_history_size());
63,756✔
1318
    std::size_t n = to_size_t(end_version - begin_version);
63,756✔
1319
    REALM_ASSERT(n == 0 || m_arrays);
63,756✔
1320
    std::size_t offset = to_size_t(begin_version - m_ct_history_base_version);
63,756✔
1321
    for (std::size_t i = 0; i < n; ++i)
215,032✔
1322
        iterators[i] = BinaryIterator(&m_arrays->ct_history, offset + i);
151,276✔
1323
}
63,756✔
1324

1325

1326
// Overriding member function in realm::_impl::History
1327
void ClientHistory::set_oldest_bound_version(version_type version)
1328
{
177,110✔
1329
    REALM_ASSERT(version >= m_version_of_oldest_bound_snapshot);
177,110✔
1330
    if (version > m_version_of_oldest_bound_snapshot) {
177,110✔
1331
        m_version_of_oldest_bound_snapshot = version;
153,992✔
1332
        trim_ct_history(); // Throws
153,992✔
1333
    }
153,992✔
1334
}
177,110✔
1335

1336
// Overriding member function in realm::_impl::History
1337
void ClientHistory::verify() const
1338
{
232✔
1339
#ifdef REALM_DEBUG
232✔
1340
    // The size of the continuous transactions history can only be zero when the
1341
    // Realm is in the initial empty state where top-ref is null.
1342
    REALM_ASSERT(ct_history_size() != 0 || m_ct_history_base_version == s_initial_version + 0);
232!
1343

1344
    if (!m_arrays) {
232✔
1345
        REALM_ASSERT(m_progress_download.server_version == 0);
×
1346
        REALM_ASSERT(m_progress_download.last_integrated_client_version == 0);
×
1347
        return;
×
1348
    }
×
1349
    m_arrays->verify();
232✔
1350

1351
    auto& root = m_arrays->root;
232✔
1352
    version_type progress_download_server_version =
232✔
1353
        version_type(root.get_as_ref_or_tagged(s_progress_download_server_version_iip).get_as_int());
232✔
1354
    version_type progress_download_client_version =
232✔
1355
        version_type(root.get_as_ref_or_tagged(s_progress_download_client_version_iip).get_as_int());
232✔
1356
    REALM_ASSERT(progress_download_server_version == m_progress_download.server_version);
232✔
1357
    REALM_ASSERT(progress_download_client_version == m_progress_download.last_integrated_client_version);
232✔
1358
    REALM_ASSERT(progress_download_client_version <= m_sync_history_base_version + sync_history_size());
232✔
1359
    version_type remote_version_of_last_entry = 0;
232✔
1360
    if (auto size = sync_history_size())
232✔
1361
        remote_version_of_last_entry = m_arrays->remote_versions.get(size - 1);
232✔
1362
    REALM_ASSERT(progress_download_server_version >= remote_version_of_last_entry);
232✔
1363

1364
    // Verify that there is no cooked history.
1365
    Array cooked_history{m_db->get_alloc()};
232✔
1366
    cooked_history.set_parent(&root, s_cooked_history_iip);
232✔
1367
    REALM_ASSERT(cooked_history.get_ref_from_parent() == 0);
232✔
1368
#endif // REALM_DEBUG
232✔
1369
}
232✔
1370

1371
ClientHistory::Arrays::Arrays(Allocator& alloc) noexcept
1372
    : root(alloc)
52,382✔
1373
    , ct_history(alloc)
52,382✔
1374
    , changesets(alloc)
52,382✔
1375
    , reciprocal_transforms(alloc)
52,382✔
1376
    , remote_versions(alloc)
52,382✔
1377
    , origin_file_idents(alloc)
52,382✔
1378
    , origin_timestamps(alloc)
52,382✔
1379
{
104,574✔
1380
}
104,574✔
1381

1382
ClientHistory::Arrays::Arrays(DB& db, Group& group)
1383
    : Arrays(db.get_alloc())
9,080✔
1384
{
18,462✔
1385
    auto& alloc = db.get_alloc();
18,462✔
1386
    {
18,462✔
1387
        bool context_flag = false;
18,462✔
1388
        std::size_t size = s_root_size;
18,462✔
1389
        root.create(Array::type_HasRefs, context_flag, size); // Throws
18,462✔
1390
    }
18,462✔
1391
    _impl::DeepArrayDestroyGuard dg{&root};
18,462✔
1392

1393
    ct_history.set_parent(&root, s_ct_history_iip);
18,462✔
1394
    ct_history.create(); // Throws
18,462✔
1395
    changesets.set_parent(&root, s_changesets_iip);
18,462✔
1396
    changesets.create(); // Throws
18,462✔
1397
    reciprocal_transforms.set_parent(&root, s_reciprocal_transforms_iip);
18,462✔
1398
    reciprocal_transforms.create(); // Throws
18,462✔
1399
    remote_versions.set_parent(&root, s_remote_versions_iip);
18,462✔
1400
    remote_versions.create(); // Throws
18,462✔
1401
    origin_file_idents.set_parent(&root, s_origin_file_idents_iip);
18,462✔
1402
    origin_file_idents.create(); // Throws
18,462✔
1403
    origin_timestamps.set_parent(&root, s_origin_timestamps_iip);
18,462✔
1404
    origin_timestamps.create(); // Throws
18,462✔
1405

1406
    { // `schema_versions` table
18,462✔
1407
        Array schema_versions{alloc};
18,462✔
1408
        bool context_flag = false;
18,462✔
1409
        std::size_t size = s_schema_versions_size;
18,462✔
1410
        schema_versions.create(Array::type_HasRefs, context_flag, size); // Throws
18,462✔
1411
        _impl::DeepArrayDestroyGuard adg{&schema_versions};
18,462✔
1412

1413
        auto create_array = [&](NodeHeader::Type type, int ndx_in_parent) {
73,848✔
1414
            MemRef mem = Array::create_empty_array(type, context_flag, alloc);
73,848✔
1415
            ref_type ref = mem.get_ref();
73,848✔
1416
            _impl::DeepArrayRefDestroyGuard ardg{ref, alloc};
73,848✔
1417
            schema_versions.set_as_ref(ndx_in_parent, ref); // Throws
73,848✔
1418
            ardg.release();                                 // Ownership transferred to parent array
73,848✔
1419
        };
73,848✔
1420
        create_array(Array::type_Normal, s_sv_schema_versions_iip);
18,462✔
1421
        create_array(Array::type_HasRefs, s_sv_library_versions_iip);
18,462✔
1422
        create_array(Array::type_Normal, s_sv_snapshot_versions_iip);
18,462✔
1423
        create_array(Array::type_Normal, s_sv_timestamps_iip);
18,462✔
1424

1425
        version_type snapshot_version = db.get_version_of_latest_snapshot();
18,462✔
1426
        record_current_schema_version(schema_versions, snapshot_version);  // Throws
18,462✔
1427
        root.set_as_ref(s_schema_versions_iip, schema_versions.get_ref()); // Throws
18,462✔
1428
        adg.release();                                                     // Ownership transferred to parent array
18,462✔
1429
    }
18,462✔
1430
    _impl::GroupFriend::prepare_history_parent(group, root, Replication::hist_SyncClient,
18,462✔
1431
                                               get_client_history_schema_version(), 0); // Throws
18,462✔
1432
    // Note: gf::prepare_history_parent() also ensures the the root array has a
1433
    // slot for the history ref.
1434
    root.update_parent(); // Throws
18,462✔
1435
    dg.release();
18,462✔
1436
}
18,462✔
1437

1438
ClientHistory::Arrays::Arrays(Allocator& alloc, Group* parent, ref_type ref)
1439
    : Arrays(alloc)
43,302✔
1440
{
86,112✔
1441
    using gf = _impl::GroupFriend;
86,112✔
1442
    root.init_from_ref(ref);
86,112✔
1443
    if (parent)
86,112✔
1444
        gf::set_history_parent(*parent, root);
86,108✔
1445

1446
    ct_history.set_parent(&root, s_ct_history_iip);
86,112✔
1447
    changesets.set_parent(&root, s_changesets_iip);
86,112✔
1448
    reciprocal_transforms.set_parent(&root, s_reciprocal_transforms_iip);
86,112✔
1449
    remote_versions.set_parent(&root, s_remote_versions_iip);
86,112✔
1450
    origin_file_idents.set_parent(&root, s_origin_file_idents_iip);
86,112✔
1451
    origin_timestamps.set_parent(&root, s_origin_timestamps_iip);
86,112✔
1452

1453
    init_from_ref(ref); // Throws
86,112✔
1454

1455
    Array cooked_history{alloc};
86,112✔
1456
    cooked_history.set_parent(&root, s_cooked_history_iip);
86,112✔
1457
    // We should have no cooked history in existing Realms.
1458
    REALM_ASSERT(cooked_history.get_ref_from_parent() == 0);
86,112✔
1459
}
86,112✔
1460

1461
void ClientHistory::Arrays::Arrays::init_from_ref(ref_type ref)
1462
{
244,974✔
1463
    root.init_from_ref(ref);
244,974✔
1464
    REALM_ASSERT(root.size() == s_root_size);
244,974✔
1465
    {
244,974✔
1466
        ref_type ref_2 = root.get_as_ref(s_ct_history_iip);
244,974✔
1467
        ct_history.init_from_ref(ref_2); // Throws
244,974✔
1468
    }
244,974✔
1469
    {
244,974✔
1470
        ref_type ref_2 = root.get_as_ref(s_changesets_iip);
244,974✔
1471
        changesets.init_from_ref(ref_2); // Throws
244,974✔
1472
    }
244,974✔
1473
    {
244,974✔
1474
        ref_type ref_2 = root.get_as_ref(s_reciprocal_transforms_iip);
244,974✔
1475
        reciprocal_transforms.init_from_ref(ref_2); // Throws
244,974✔
1476
    }
244,974✔
1477
    remote_versions.init_from_parent();    // Throws
244,974✔
1478
    origin_file_idents.init_from_parent(); // Throws
244,974✔
1479
    origin_timestamps.init_from_parent();  // Throws
244,974✔
1480
}
244,974✔
1481

1482
void ClientHistory::Arrays::verify() const
1483
{
232✔
1484
#ifdef REALM_DEBUG
232✔
1485
    root.verify();
232✔
1486
    ct_history.verify();
232✔
1487
    changesets.verify();
232✔
1488
    reciprocal_transforms.verify();
232✔
1489
    remote_versions.verify();
232✔
1490
    origin_file_idents.verify();
232✔
1491
    origin_timestamps.verify();
232✔
1492
    REALM_ASSERT(root.size() == s_root_size);
232✔
1493
    REALM_ASSERT(reciprocal_transforms.size() == changesets.size());
232✔
1494
    REALM_ASSERT(remote_versions.size() == changesets.size());
232✔
1495
    REALM_ASSERT(origin_file_idents.size() == changesets.size());
232✔
1496
    REALM_ASSERT(origin_timestamps.size() == changesets.size());
232✔
1497
#endif // REALM_DEBUG
232✔
1498
}
232✔
1499

1500
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc