• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2386

04 Jun 2024 11:40PM UTC coverage: 90.844% (-0.003%) from 90.847%
2386

push

Evergreen

web-flow
Merge pull request #7734 from realm/tg/create-object-repl

RCORE-2152 Don't emit transaction log instructions for mutations on newly-created objects

101840 of 180236 branches covered (56.5%)

358 of 362 new or added lines in 6 files covered. (98.9%)

98 existing lines in 19 files now uncovered.

214786 of 236434 relevant lines covered (90.84%)

5684022.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.49
/src/realm/sync/noinst/pending_bootstrap_store.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/noinst/pending_bootstrap_store.hpp"
20

21
#include "realm/binary_data.hpp"
22
#include "realm/chunked_binary.hpp"
23
#include "realm/data_type.hpp"
24
#include "realm/db.hpp"
25
#include "realm/list.hpp"
26
#include "realm/query.hpp"
27
#include "realm/sync/changeset_parser.hpp"
28
#include "realm/sync/noinst/protocol_codec.hpp"
29
#include "realm/sync/noinst/sync_metadata_schema.hpp"
30
#include "realm/sync/protocol.hpp"
31
#include "realm/sync/transform.hpp"
32
#include "realm/util/assert.hpp"
33
#include "realm/util/buffer.hpp"
34
#include "realm/util/compression.hpp"
35
#include "realm/util/logger.hpp"
36
#include <stdexcept>
37

38
namespace realm::sync {
39
namespace {
40
constexpr static int c_schema_version = 1;
41
constexpr static std::string_view c_progress_table("flx_pending_bootstrap_progress");
42
constexpr static std::string_view c_pending_bootstrap_table("flx_pending_bootstrap");
43
constexpr static std::string_view c_pending_changesets_table("flx_pending_bootstrap_changesets");
44
constexpr static std::string_view c_pending_bootstrap_query_version("query_version");
45
constexpr static std::string_view c_pending_bootstrap_changesets("changesets");
46
constexpr static std::string_view c_pending_bootstrap_progress("progress");
47
constexpr static std::string_view c_pending_changesets_remote_version("remote_version");
48
constexpr static std::string_view
49
    c_pending_changesets_last_integrated_client_version("last_integrated_client_version");
50
constexpr static std::string_view c_pending_changesets_origin_file_ident("origin_file_ident");
51
constexpr static std::string_view c_pending_changesets_origin_timestamp("origin_timestamp");
52
constexpr static std::string_view c_pending_changesets_original_size("original_size");
53
constexpr static std::string_view c_pending_changesets_data("data");
54
constexpr static std::string_view c_progress_download_server_version("download_server_version");
55
constexpr static std::string_view c_progress_download_client_version("download_client_version");
56
constexpr static std::string_view c_progress_upload_server_version("upload_server_version");
57
constexpr static std::string_view c_progress_upload_client_version("upload_client_version");
58
constexpr static std::string_view c_progress_latest_server_version("latest_server_version");
59
constexpr static std::string_view c_progress_latest_server_version_salt("latest_server_version_salt");
60

61
} // namespace
62

63
PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger)
64
    : m_db(std::move(db))
762✔
65
    , m_logger(logger)
762✔
66
{
1,522✔
67
    std::vector<SyncMetadataTable> internal_tables{
1,522✔
68
        {&m_table,
1,522✔
69
         c_pending_bootstrap_table,
1,522✔
70
         {&m_query_version, c_pending_bootstrap_query_version, type_Int},
1,522✔
71
         {
1,522✔
72
             {&m_changesets, c_pending_bootstrap_changesets, c_pending_changesets_table, true},
1,522✔
73
             {&m_progress, c_pending_bootstrap_progress, c_progress_table, false},
1,522✔
74
         }},
1,522✔
75
        {&m_progress_table,
1,522✔
76
         c_progress_table,
1,522✔
77
         SyncMetadataTable::IsEmbeddedTag{},
1,522✔
78
         {
1,522✔
79
             {&m_progress_upload_server_version, c_progress_upload_server_version, type_Int},
1,522✔
80
             {&m_progress_upload_client_version, c_progress_upload_client_version, type_Int},
1,522✔
81
             {&m_progress_download_server_version, c_progress_download_server_version, type_Int},
1,522✔
82
             {&m_progress_download_client_version, c_progress_download_client_version, type_Int},
1,522✔
83
             {&m_progress_latest_server_version, c_progress_latest_server_version, type_Int},
1,522✔
84
             {&m_progress_latest_server_version_salt, c_progress_latest_server_version_salt, type_Int},
1,522✔
85
         }},
1,522✔
86
        {&m_changeset_table,
1,522✔
87
         c_pending_changesets_table,
1,522✔
88
         SyncMetadataTable::IsEmbeddedTag{},
1,522✔
89
         {
1,522✔
90
             {&m_changeset_remote_version, c_pending_changesets_remote_version, type_Int},
1,522✔
91
             {&m_changeset_last_integrated_client_version, c_pending_changesets_last_integrated_client_version,
1,522✔
92
              type_Int},
1,522✔
93
             {&m_changeset_origin_file_ident, c_pending_changesets_origin_file_ident, type_Int},
1,522✔
94
             {&m_changeset_origin_timestamp, c_pending_changesets_origin_timestamp, type_Int},
1,522✔
95
             {&m_changeset_original_changeset_size, c_pending_changesets_original_size, type_Int},
1,522✔
96
             {&m_changeset_data, c_pending_changesets_data, type_Binary, true},
1,522✔
97
         }}};
1,522✔
98

99
    auto tr = m_db->start_read();
1,522✔
100
    // Start with a reader so it doesn't try to write until we are ready
101
    SyncMetadataSchemaVersionsReader schema_versions_reader(tr);
1,522✔
102
    if (auto schema_version =
1,522✔
103
            schema_versions_reader.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) {
1,522✔
104
        if (*schema_version != c_schema_version) {
502✔
105
            throw RuntimeError(ErrorCodes::SchemaVersionMismatch,
×
106
                               "Invalid schema version for FLX sync pending bootstrap table group");
×
107
        }
×
108
        load_sync_metadata_schema(tr, &internal_tables);
502✔
109
    }
502✔
110
    else {
1,020✔
111
        tr->promote_to_write();
1,020✔
112
        // Ensure the schema versions table is initialized (may add its own commit)
113
        SyncMetadataSchemaVersions schema_versions(tr);
1,020✔
114
        // Create the metadata schema and set the version (in the same commit)
115
        schema_versions.set_version_for(tr, internal_schema_groups::c_pending_bootstraps, c_schema_version);
1,020✔
116
        create_sync_metadata_schema(tr, &internal_tables);
1,020✔
117
        tr->commit_and_continue_as_read();
1,020✔
118
    }
1,020✔
119
    REALM_ASSERT(m_table);
1,522✔
120

121
    if (auto bootstrap_table = tr->get_table(m_table); !bootstrap_table->is_empty()) {
1,522✔
122
        m_has_pending = true;
40✔
123
    }
40✔
124
    else {
1,482✔
125
        m_has_pending = false;
1,482✔
126
    }
1,482✔
127
}
1,522✔
128

129
void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
130
                                      const _impl::ClientProtocol::ReceivedChangesets& changesets,
131
                                      bool* created_new_batch_out)
132
{
2,148✔
133
    std::vector<util::AppendBuffer<char>> compressed_changesets;
2,148✔
134
    compressed_changesets.reserve(changesets.size());
2,148✔
135

136
    util::compression::CompressMemoryArena arena;
2,148✔
137
    for (auto& changeset : changesets) {
2,164✔
138
        compressed_changesets.emplace_back();
2,164✔
139
        util::compression::allocate_and_compress_nonportable(arena, {changeset.data.get_first_chunk()},
2,164✔
140
                                                             compressed_changesets.back());
2,164✔
141
    }
2,164✔
142

143
    auto tr = m_db->start_write();
2,148✔
144
    bool did_create = false;
2,148✔
145

146
    {
2,148✔
147
        DisableReplication disable_replication(*tr);
2,148✔
148
        auto bootstrap_table = tr->get_table(m_table);
2,148✔
149
        auto incomplete_bootstraps = Query(bootstrap_table).not_equal(m_query_version, query_version).find_all();
2,148✔
150
        incomplete_bootstraps.for_each([&](Obj obj) {
2,148✔
NEW
151
            m_logger.debug(util::LogCategory::changeset, "Clearing incomplete bootstrap for query version %1",
×
NEW
152
                           obj.get<int64_t>(m_query_version));
×
NEW
153
            return IteratorControl::AdvanceToNext;
×
NEW
154
        });
×
155
        incomplete_bootstraps.clear();
2,148✔
156

157
        auto bootstrap_obj = bootstrap_table->create_object_with_primary_key(Mixed{query_version}, &did_create);
2,148✔
158
        if (progress) {
2,148✔
159
            auto progress_obj = bootstrap_obj.create_and_set_linked_object(m_progress);
1,944✔
160
            progress_obj.set(m_progress_latest_server_version, int64_t(progress->latest_server_version.version));
1,944✔
161
            progress_obj.set(m_progress_latest_server_version_salt, int64_t(progress->latest_server_version.salt));
1,944✔
162
            progress_obj.set(m_progress_download_server_version, int64_t(progress->download.server_version));
1,944✔
163
            progress_obj.set(m_progress_download_client_version,
1,944✔
164
                             int64_t(progress->download.last_integrated_client_version));
1,944✔
165
            progress_obj.set(m_progress_upload_server_version,
1,944✔
166
                             int64_t(progress->upload.last_integrated_server_version));
1,944✔
167
            progress_obj.set(m_progress_upload_client_version, int64_t(progress->upload.client_version));
1,944✔
168
        }
1,944✔
169

170
        auto changesets_list = bootstrap_obj.get_linklist(m_changesets);
2,148✔
171
        for (size_t idx = 0; idx < changesets.size(); ++idx) {
4,312✔
172
            auto cur_changeset = changesets_list.create_and_insert_linked_object(changesets_list.size());
2,164✔
173
            cur_changeset.set(m_changeset_remote_version, int64_t(changesets[idx].remote_version));
2,164✔
174
            cur_changeset.set(m_changeset_last_integrated_client_version,
2,164✔
175
                              int64_t(changesets[idx].last_integrated_local_version));
2,164✔
176
            cur_changeset.set(m_changeset_origin_file_ident, int64_t(changesets[idx].origin_file_ident));
2,164✔
177
            cur_changeset.set(m_changeset_origin_timestamp, int64_t(changesets[idx].origin_timestamp));
2,164✔
178
            cur_changeset.set(m_changeset_original_changeset_size, int64_t(changesets[idx].original_changeset_size));
2,164✔
179
            BinaryData compressed_data(compressed_changesets[idx].data(), compressed_changesets[idx].size());
2,164✔
180
            cur_changeset.set(m_changeset_data, compressed_data);
2,164✔
181
        }
2,164✔
182
    }
2,148✔
183

184
    tr->commit();
2,148✔
185

186
    if (created_new_batch_out) {
2,148✔
187
        *created_new_batch_out = did_create;
2,148✔
188
    }
2,148✔
189

190
    if (did_create) {
2,148✔
191
        m_logger.debug(util::LogCategory::changeset, "Created new pending bootstrap object for query version %1",
1,952✔
192
                       query_version);
1,952✔
193
    }
1,952✔
194
    else {
196✔
195
        m_logger.debug(util::LogCategory::changeset, "Added batch to pending bootstrap object for query version %1",
196✔
196
                       query_version);
196✔
197
    }
196✔
198
    if (progress) {
2,148✔
199
        m_logger.debug(util::LogCategory::changeset, "Finalized pending bootstrap object for query version %1",
1,944✔
200
                       query_version);
1,944✔
201
    }
1,944✔
202
    m_has_pending = true;
2,148✔
203
}
2,148✔
204

205
bool PendingBootstrapStore::has_pending()
206
{
7,514✔
207
    return m_has_pending;
7,514✔
208
}
7,514✔
209

210
void PendingBootstrapStore::clear()
211
{
12✔
212
    auto tr = m_db->start_write();
12✔
213
    clear(*tr);
12✔
214
    tr->commit();
12✔
215
}
12✔
216

217
void PendingBootstrapStore::clear(Transaction& wt)
218
{
12✔
219
    auto bootstrap_table = wt.get_table(m_table);
12✔
220
    bootstrap_table->clear();
12✔
221
    m_has_pending = false;
12✔
222
}
12✔
223

224
PendingBootstrapStore::PendingBatch PendingBootstrapStore::peek_pending(size_t limit_in_bytes)
225
{
2,152✔
226
    auto tr = m_db->start_read();
2,152✔
227
    auto bootstrap_table = tr->get_table(m_table);
2,152✔
228
    if (bootstrap_table->is_empty()) {
2,152✔
229
        return {};
4✔
230
    }
4✔
231

232
    // We should only have one pending bootstrap at a time.
233
    REALM_ASSERT(bootstrap_table->size() == 1);
2,148✔
234

235
    auto bootstrap_obj = bootstrap_table->get_object(0);
2,148✔
236
    PendingBatch ret;
2,148✔
237
    ret.query_version = bootstrap_obj.get<int64_t>(m_query_version);
2,148✔
238

239
    if (!bootstrap_obj.is_null(m_progress)) {
2,148✔
240
        auto progress_obj = bootstrap_obj.get_linked_object(m_progress);
2,136✔
241
        SyncProgress progress;
2,136✔
242
        progress.latest_server_version.version = progress_obj.get<int64_t>(m_progress_latest_server_version);
2,136✔
243
        progress.latest_server_version.salt = progress_obj.get<int64_t>(m_progress_latest_server_version_salt);
2,136✔
244
        progress.download.server_version = progress_obj.get<int64_t>(m_progress_download_server_version);
2,136✔
245
        progress.download.last_integrated_client_version =
2,136✔
246
            progress_obj.get<int64_t>(m_progress_download_client_version);
2,136✔
247
        progress.upload.last_integrated_server_version = progress_obj.get<int64_t>(m_progress_upload_server_version);
2,136✔
248
        progress.upload.client_version = progress_obj.get<int64_t>(m_progress_upload_client_version);
2,136✔
249
        ret.progress = std::move(progress);
2,136✔
250
    }
2,136✔
251

252
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
2,148✔
253
    size_t bytes_so_far = 0;
2,148✔
254
    for (size_t idx = 0; idx < changeset_list.size() && bytes_so_far < limit_in_bytes; ++idx) {
4,372✔
255
        auto cur_changeset = changeset_list.get_object(idx);
2,224✔
256
        ret.changeset_data.push_back(util::AppendBuffer<char>());
2,224✔
257
        auto& uncompressed_buffer = ret.changeset_data.back();
2,224✔
258

259
        auto compressed_changeset_data = cur_changeset.get<BinaryData>(m_changeset_data);
2,224✔
260
        ChunkedBinaryInputStream changeset_is(compressed_changeset_data);
2,224✔
261
        auto ec = util::compression::decompress_nonportable(changeset_is, uncompressed_buffer);
2,224✔
262
        if (ec == util::compression::error::decompress_unsupported) {
2,224✔
263
            REALM_TERMINATE(
264
                "Synchronized Realm files with unprocessed pending bootstraps cannot be copied between platforms.");
×
265
        }
×
266
        REALM_ASSERT_3(ec, ==, std::error_code{});
2,224✔
267

268
        RemoteChangeset parsed_changeset;
2,224✔
269
        parsed_changeset.original_changeset_size =
2,224✔
270
            static_cast<size_t>(cur_changeset.get<int64_t>(m_changeset_original_changeset_size));
2,224✔
271
        parsed_changeset.origin_timestamp = cur_changeset.get<int64_t>(m_changeset_origin_timestamp);
2,224✔
272
        parsed_changeset.origin_file_ident = cur_changeset.get<int64_t>(m_changeset_origin_file_ident);
2,224✔
273
        parsed_changeset.remote_version = cur_changeset.get<int64_t>(m_changeset_remote_version);
2,224✔
274
        parsed_changeset.last_integrated_local_version =
2,224✔
275
            cur_changeset.get<int64_t>(m_changeset_last_integrated_client_version);
2,224✔
276
        parsed_changeset.data = BinaryData(uncompressed_buffer.data(), uncompressed_buffer.size());
2,224✔
277
        bytes_so_far += parsed_changeset.data.size();
2,224✔
278
        ret.changesets.push_back(std::move(parsed_changeset));
2,224✔
279
    }
2,224✔
280
    ret.remaining_changesets = changeset_list.size() - ret.changesets.size();
2,148✔
281

282
    return ret;
2,148✔
283
}
2,152✔
284

285
PendingBootstrapStore::PendingBatchStats PendingBootstrapStore::pending_stats()
286
{
2,148✔
287
    auto tr = m_db->start_read();
2,148✔
288
    auto bootstrap_table = tr->get_table(m_table);
2,148✔
289
    if (bootstrap_table->is_empty()) {
2,148✔
290
        return {};
×
291
    }
×
292

293
    REALM_ASSERT(bootstrap_table->size() == 1);
2,148✔
294

295
    auto bootstrap_obj = bootstrap_table->get_object(0);
2,148✔
296
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
2,148✔
297

298
    PendingBatchStats stats;
2,148✔
299
    stats.query_version = bootstrap_obj.get<int64_t>(m_query_version);
2,148✔
300
    stats.pending_changesets = changeset_list.size();
2,148✔
301
    changeset_list.for_each([&](Obj& cur_changeset) {
2,748✔
302
        stats.pending_changeset_bytes +=
2,748✔
303
            static_cast<size_t>(cur_changeset.get<int64_t>(m_changeset_original_changeset_size));
2,748✔
304
        return IteratorControl::AdvanceToNext;
2,748✔
305
    });
2,748✔
306

307
    return stats;
2,148✔
308
}
2,148✔
309

310
void PendingBootstrapStore::pop_front_pending(const TransactionRef& tr, size_t count)
311
{
2,104✔
312
    REALM_ASSERT_3(tr->get_transact_stage(), ==, DB::transact_Writing);
2,104✔
313
    auto bootstrap_table = tr->get_table(m_table);
2,104✔
314
    if (bootstrap_table->is_empty()) {
2,104✔
315
        return;
×
316
    }
×
317
    DisableReplication disable_replication(*tr);
2,104✔
318

319
    // We should only have one pending bootstrap at a time.
320
    REALM_ASSERT(bootstrap_table->size() == 1);
2,104✔
321

322
    auto bootstrap_obj = bootstrap_table->get_object(0);
2,104✔
323
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
2,104✔
324
    REALM_ASSERT_3(changeset_list.size(), >=, count);
2,104✔
325
    if (count == changeset_list.size()) {
2,104✔
326
        changeset_list.clear();
1,928✔
327
    }
1,928✔
328
    else {
176✔
329
        for (size_t idx = 0; idx < count; ++idx) {
360✔
330
            changeset_list.remove(0);
184✔
331
        }
184✔
332
    }
176✔
333

334
    if (changeset_list.is_empty()) {
2,104✔
335
        m_logger.debug(util::LogCategory::changeset, "Removing pending bootstrap obj for query version %1",
1,928✔
336
                       bootstrap_obj.get<int64_t>(m_query_version));
1,928✔
337
        bootstrap_obj.remove();
1,928✔
338
    }
1,928✔
339
    else {
176✔
340
        m_logger.debug(util::LogCategory::changeset,
176✔
341
                       "Removing pending bootstrap batch for query version %1. %2 changeset remaining",
176✔
342
                       bootstrap_obj.get<int64_t>(m_query_version), changeset_list.size());
176✔
343
    }
176✔
344

345
    m_has_pending = !bootstrap_table->is_empty();
2,104✔
346
}
2,104✔
347

348
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc