• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1786

28 Oct 2023 12:35PM UTC coverage: 91.562% (-0.02%) from 91.582%
1786

push

Evergreen

web-flow
Improve configurations for sanitized builds (#6911)

* Refactor sanitizer flags for different build types:

** Enable address sanitizer for msvc
** Allow to build with sanitizer for diffent optimized build (also Debug)
** Make RelASAN, RelTSAN, RelUSAN, RelUSAN just shortcuts for half-optimized builds

* Fix usage of moved object for fuzz tester
* Check asan/tsan on macos x64/arm64
* Check asan with msvc 2019
* Remove Jenkins sanitized builders replaced by evergreen configs
* Work-around stack-use-after-scope with msvc2019 and mpark
* Fix crash on check with staled ColKeys
* fix a buffer overrun in a test
* fix a race in async_open_realm test util
* Add some logger related test fixes
* Work around catch2 limmitation with not thread safe asserts and TSAN races
* Run multiprocesses tests under sanitizers
* add assert for an error reported by undefined sanitizer
* Workaround uv scheduler main thread only constraint for callbacks called from non main thread and requesting a realm

---------

Co-authored-by: James Stone <james.stone@mongodb.com>

94310 of 173648 branches covered (0.0%)

54 of 63 new or added lines in 15 files covered. (85.71%)

2212 existing lines in 52 files now uncovered.

230602 of 251853 relevant lines covered (91.56%)

6943670.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.69
/src/realm/sync/noinst/pending_bootstrap_store.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include "realm/sync/noinst/pending_bootstrap_store.hpp"
20

21
#include "realm/binary_data.hpp"
22
#include "realm/chunked_binary.hpp"
23
#include "realm/data_type.hpp"
24
#include "realm/db.hpp"
25
#include "realm/list.hpp"
26
#include "realm/query.hpp"
27
#include "realm/sync/changeset_parser.hpp"
28
#include "realm/sync/noinst/protocol_codec.hpp"
29
#include "realm/sync/noinst/sync_metadata_schema.hpp"
30
#include "realm/sync/protocol.hpp"
31
#include "realm/sync/transform.hpp"
32
#include "realm/util/assert.hpp"
33
#include "realm/util/buffer.hpp"
34
#include "realm/util/compression.hpp"
35
#include "realm/util/logger.hpp"
36
#include <stdexcept>
37

38
namespace realm::sync {
39
namespace {
40
constexpr static int c_schema_version = 1;
41
constexpr static std::string_view c_progress_table("flx_pending_bootstrap_progress");
42
constexpr static std::string_view c_pending_bootstrap_table("flx_pending_bootstrap");
43
constexpr static std::string_view c_pending_changesets_table("flx_pending_bootstrap_changesets");
44
constexpr static std::string_view c_pending_bootstrap_query_version("query_version");
45
constexpr static std::string_view c_pending_bootstrap_changesets("changesets");
46
constexpr static std::string_view c_pending_bootstrap_progress("progress");
47
constexpr static std::string_view c_pending_changesets_remote_version("remote_version");
48
constexpr static std::string_view
49
    c_pending_changesets_last_integrated_client_version("last_integrated_client_version");
50
constexpr static std::string_view c_pending_changesets_origin_file_ident("origin_file_ident");
51
constexpr static std::string_view c_pending_changesets_origin_timestamp("origin_timestamp");
52
constexpr static std::string_view c_pending_changesets_original_size("original_size");
53
constexpr static std::string_view c_pending_changesets_data("data");
54
constexpr static std::string_view c_progress_download_server_version("download_server_version");
55
constexpr static std::string_view c_progress_download_client_version("download_client_version");
56
constexpr static std::string_view c_progress_upload_server_version("upload_server_version");
57
constexpr static std::string_view c_progress_upload_client_version("upload_client_version");
58
constexpr static std::string_view c_progress_latest_server_version("latest_server_version");
59
constexpr static std::string_view c_progress_latest_server_version_salt("latest_server_version_salt");
60

61
} // namespace
62

63
PendingBootstrapStore::PendingBootstrapStore(DBRef db, util::Logger& logger)
64
    : m_db(std::move(db))
65
    , m_logger(logger)
66
{
1,018✔
67
    std::vector<SyncMetadataTable> internal_tables{
1,018✔
68
        {&m_table,
1,018✔
69
         c_pending_bootstrap_table,
1,018✔
70
         {&m_query_version, c_pending_bootstrap_query_version, type_Int},
1,018✔
71
         {
1,018✔
72
             {&m_changesets, c_pending_bootstrap_changesets, c_pending_changesets_table, true},
1,018✔
73
             {&m_progress, c_pending_bootstrap_progress, c_progress_table, false},
1,018✔
74
         }},
1,018✔
75
        {&m_progress_table,
1,018✔
76
         c_progress_table,
1,018✔
77
         SyncMetadataTable::IsEmbeddedTag{},
1,018✔
78
         {
1,018✔
79
             {&m_progress_upload_server_version, c_progress_upload_server_version, type_Int},
1,018✔
80
             {&m_progress_upload_client_version, c_progress_upload_client_version, type_Int},
1,018✔
81
             {&m_progress_download_server_version, c_progress_download_server_version, type_Int},
1,018✔
82
             {&m_progress_download_client_version, c_progress_download_client_version, type_Int},
1,018✔
83
             {&m_progress_latest_server_version, c_progress_latest_server_version, type_Int},
1,018✔
84
             {&m_progress_latest_server_version_salt, c_progress_latest_server_version_salt, type_Int},
1,018✔
85
         }},
1,018✔
86
        {&m_changeset_table,
1,018✔
87
         c_pending_changesets_table,
1,018✔
88
         SyncMetadataTable::IsEmbeddedTag{},
1,018✔
89
         {
1,018✔
90
             {&m_changeset_remote_version, c_pending_changesets_remote_version, type_Int},
1,018✔
91
             {&m_changeset_last_integrated_client_version, c_pending_changesets_last_integrated_client_version,
1,018✔
92
              type_Int},
1,018✔
93
             {&m_changeset_origin_file_ident, c_pending_changesets_origin_file_ident, type_Int},
1,018✔
94
             {&m_changeset_origin_timestamp, c_pending_changesets_origin_timestamp, type_Int},
1,018✔
95
             {&m_changeset_original_changeset_size, c_pending_changesets_original_size, type_Int},
1,018✔
96
             {&m_changeset_data, c_pending_changesets_data, type_Binary, true},
1,018✔
97
         }}};
1,018✔
98

510✔
99
    auto tr = m_db->start_read();
1,018✔
100
    SyncMetadataSchemaVersions schema_versions(tr);
1,018✔
101
    if (auto schema_version = schema_versions.get_version_for(tr, internal_schema_groups::c_pending_bootstraps)) {
1,018✔
102
        if (*schema_version != c_schema_version) {
306✔
UNCOV
103
            throw RuntimeError(ErrorCodes::SchemaVersionMismatch,
×
UNCOV
104
                               "Invalid schema version for FLX sync pending bootstrap table group");
×
UNCOV
105
        }
×
106
        load_sync_metadata_schema(tr, &internal_tables);
306✔
107
    }
306✔
108
    else {
712✔
109
        tr->promote_to_write();
712✔
110
        create_sync_metadata_schema(tr, &internal_tables);
712✔
111
        schema_versions.set_version_for(tr, internal_schema_groups::c_pending_bootstraps, c_schema_version);
712✔
112
        tr->commit_and_continue_as_read();
712✔
113
    }
712✔
114

510✔
115
    if (auto bootstrap_table = tr->get_table(m_table); !bootstrap_table->is_empty()) {
1,018✔
116
        m_has_pending = true;
32✔
117
    }
32✔
118
    else {
986✔
119
        m_has_pending = false;
986✔
120
    }
986✔
121
}
1,018✔
122

123
void PendingBootstrapStore::add_batch(int64_t query_version, util::Optional<SyncProgress> progress,
124
                                      const _impl::ClientProtocol::ReceivedChangesets& changesets,
125
                                      bool* created_new_batch_out)
126
{
1,578✔
127
    std::vector<util::AppendBuffer<char>> compressed_changesets;
1,578✔
128
    compressed_changesets.reserve(changesets.size());
1,578✔
129

790✔
130
    util::compression::CompressMemoryArena arena;
1,578✔
131
    for (auto& changeset : changesets) {
1,594✔
132
        compressed_changesets.emplace_back();
1,594✔
133
        util::compression::allocate_and_compress_nonportable(arena, {changeset.data.get_first_chunk()},
1,594✔
134
                                                             compressed_changesets.back());
1,594✔
135
    }
1,594✔
136

790✔
137
    auto tr = m_db->start_write();
1,578✔
138
    auto bootstrap_table = tr->get_table(m_table);
1,578✔
139
    auto incomplete_bootstraps = Query(bootstrap_table).not_equal(m_query_version, query_version).find_all();
1,578✔
140
    incomplete_bootstraps.for_each([&](Obj obj) {
790✔
UNCOV
141
        m_logger.debug("Clearing incomplete bootstrap for query version %1", obj.get<int64_t>(m_query_version));
×
UNCOV
142
        return IteratorControl::AdvanceToNext;
×
UNCOV
143
    });
×
144
    incomplete_bootstraps.clear();
1,578✔
145

790✔
146
    bool did_create = false;
1,578✔
147
    auto bootstrap_obj = bootstrap_table->create_object_with_primary_key(Mixed{query_version}, &did_create);
1,578✔
148
    if (progress) {
1,578✔
149
        auto progress_obj = bootstrap_obj.create_and_set_linked_object(m_progress);
1,414✔
150
        progress_obj.set(m_progress_latest_server_version, int64_t(progress->latest_server_version.version));
1,414✔
151
        progress_obj.set(m_progress_latest_server_version_salt, int64_t(progress->latest_server_version.salt));
1,414✔
152
        progress_obj.set(m_progress_download_server_version, int64_t(progress->download.server_version));
1,414✔
153
        progress_obj.set(m_progress_download_client_version,
1,414✔
154
                         int64_t(progress->download.last_integrated_client_version));
1,414✔
155
        progress_obj.set(m_progress_upload_server_version, int64_t(progress->upload.last_integrated_server_version));
1,414✔
156
        progress_obj.set(m_progress_upload_client_version, int64_t(progress->upload.client_version));
1,414✔
157
    }
1,414✔
158

790✔
159
    auto changesets_list = bootstrap_obj.get_linklist(m_changesets);
1,578✔
160
    for (size_t idx = 0; idx < changesets.size(); ++idx) {
3,172✔
161
        auto cur_changeset = changesets_list.create_and_insert_linked_object(changesets_list.size());
1,594✔
162
        cur_changeset.set(m_changeset_remote_version, int64_t(changesets[idx].remote_version));
1,594✔
163
        cur_changeset.set(m_changeset_last_integrated_client_version,
1,594✔
164
                          int64_t(changesets[idx].last_integrated_local_version));
1,594✔
165
        cur_changeset.set(m_changeset_origin_file_ident, int64_t(changesets[idx].origin_file_ident));
1,594✔
166
        cur_changeset.set(m_changeset_origin_timestamp, int64_t(changesets[idx].origin_timestamp));
1,594✔
167
        cur_changeset.set(m_changeset_original_changeset_size, int64_t(changesets[idx].original_changeset_size));
1,594✔
168
        BinaryData compressed_data(compressed_changesets[idx].data(), compressed_changesets[idx].size());
1,594✔
169
        cur_changeset.set(m_changeset_data, compressed_data);
1,594✔
170
    }
1,594✔
171

790✔
172
    tr->commit();
1,578✔
173

790✔
174
    if (created_new_batch_out) {
1,578✔
175
        *created_new_batch_out = did_create;
1,578✔
176
    }
1,578✔
177

790✔
178
    if (did_create) {
1,578✔
179
        m_logger.debug("Created new pending bootstrap object for query version %1", query_version);
1,422✔
180
    }
1,422✔
181
    else {
156✔
182
        m_logger.debug("Added batch to pending bootstrap object for query version %1", query_version);
156✔
183
    }
156✔
184
    if (progress) {
1,578✔
185
        m_logger.debug("Finalized pending bootstrap object for query version %1", query_version);
1,414✔
186
    }
1,414✔
187
    m_has_pending = true;
1,578✔
188
}
1,578✔
189

190
bool PendingBootstrapStore::has_pending()
191
{
5,376✔
192
    return m_has_pending;
5,376✔
193
}
5,376✔
194

195
void PendingBootstrapStore::clear()
196
{
12✔
197
    auto tr = m_db->start_write();
12✔
198
    auto bootstrap_table = tr->get_table(m_table);
12✔
199
    bootstrap_table->clear();
12✔
200
    m_has_pending = false;
12✔
201
    tr->commit();
12✔
202
}
12✔
203

204
PendingBootstrapStore::PendingBatch PendingBootstrapStore::peek_pending(size_t limit_in_bytes)
205
{
1,574✔
206
    auto tr = m_db->start_read();
1,574✔
207
    auto bootstrap_table = tr->get_table(m_table);
1,574✔
208
    if (bootstrap_table->is_empty()) {
1,574✔
209
        return {};
4✔
210
    }
4✔
211

786✔
212
    // We should only have one pending bootstrap at a time.
786✔
213
    REALM_ASSERT(bootstrap_table->size() == 1);
1,570✔
214

786✔
215
    auto bootstrap_obj = bootstrap_table->get_object(0);
1,570✔
216
    PendingBatch ret;
1,570✔
217
    ret.query_version = bootstrap_obj.get<int64_t>(m_query_version);
1,570✔
218

786✔
219
    if (!bootstrap_obj.is_null(m_progress)) {
1,570✔
220
        auto progress_obj = bootstrap_obj.get_linked_object(m_progress);
1,558✔
221
        SyncProgress progress;
1,558✔
222
        progress.latest_server_version.version = progress_obj.get<int64_t>(m_progress_latest_server_version);
1,558✔
223
        progress.latest_server_version.salt = progress_obj.get<int64_t>(m_progress_latest_server_version_salt);
1,558✔
224
        progress.download.server_version = progress_obj.get<int64_t>(m_progress_download_server_version);
1,558✔
225
        progress.download.last_integrated_client_version =
1,558✔
226
            progress_obj.get<int64_t>(m_progress_download_client_version);
1,558✔
227
        progress.upload.last_integrated_server_version = progress_obj.get<int64_t>(m_progress_upload_server_version);
1,558✔
228
        progress.upload.client_version = progress_obj.get<int64_t>(m_progress_upload_client_version);
1,558✔
229
        ret.progress = std::move(progress);
1,558✔
230
    }
1,558✔
231

786✔
232
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
1,570✔
233
    size_t bytes_so_far = 0;
1,570✔
234
    for (size_t idx = 0; idx < changeset_list.size() && bytes_so_far < limit_in_bytes; ++idx) {
3,196✔
235
        auto cur_changeset = changeset_list.get_object(idx);
1,626✔
236
        ret.changeset_data.push_back(util::AppendBuffer<char>());
1,626✔
237
        auto& uncompressed_buffer = ret.changeset_data.back();
1,626✔
238

814✔
239
        auto compressed_changeset_data = cur_changeset.get<BinaryData>(m_changeset_data);
1,626✔
240
        ChunkedBinaryInputStream changeset_is(compressed_changeset_data);
1,626✔
241
        auto ec = util::compression::decompress_nonportable(changeset_is, uncompressed_buffer);
1,626✔
242
        if (ec == util::compression::error::decompress_unsupported) {
1,626✔
UNCOV
243
            REALM_TERMINATE(
×
UNCOV
244
                "Synchronized Realm files with unprocessed pending bootstraps cannot be copied between platforms.");
×
UNCOV
245
        }
×
246
        REALM_ASSERT_3(ec, ==, std::error_code{});
1,626✔
247

814✔
248
        Transformer::RemoteChangeset parsed_changeset;
1,626✔
249
        parsed_changeset.original_changeset_size =
1,626✔
250
            static_cast<size_t>(cur_changeset.get<int64_t>(m_changeset_original_changeset_size));
1,626✔
251
        parsed_changeset.origin_timestamp = cur_changeset.get<int64_t>(m_changeset_origin_timestamp);
1,626✔
252
        parsed_changeset.origin_file_ident = cur_changeset.get<int64_t>(m_changeset_origin_file_ident);
1,626✔
253
        parsed_changeset.remote_version = cur_changeset.get<int64_t>(m_changeset_remote_version);
1,626✔
254
        parsed_changeset.last_integrated_local_version =
1,626✔
255
            cur_changeset.get<int64_t>(m_changeset_last_integrated_client_version);
1,626✔
256
        parsed_changeset.data = BinaryData(uncompressed_buffer.data(), uncompressed_buffer.size());
1,626✔
257
        bytes_so_far += parsed_changeset.data.size();
1,626✔
258
        ret.changesets.push_back(std::move(parsed_changeset));
1,626✔
259
    }
1,626✔
260
    ret.remaining_changesets = changeset_list.size() - ret.changesets.size();
1,570✔
261

786✔
262
    return ret;
1,570✔
263
}
1,570✔
264

265
PendingBootstrapStore::PendingBatchStats PendingBootstrapStore::pending_stats()
266
{
1,418✔
267
    auto tr = m_db->start_read();
1,418✔
268
    auto bootstrap_table = tr->get_table(m_table);
1,418✔
269
    if (bootstrap_table->is_empty()) {
1,418✔
UNCOV
270
        return {};
×
UNCOV
271
    }
×
272

710✔
273
    REALM_ASSERT(bootstrap_table->size() == 1);
1,418✔
274

710✔
275
    auto bootstrap_obj = bootstrap_table->get_object(0);
1,418✔
276
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
1,418✔
277

710✔
278
    PendingBatchStats stats;
1,418✔
279
    stats.query_version = bootstrap_obj.get<int64_t>(m_query_version);
1,418✔
280
    stats.pending_changesets = changeset_list.size();
1,418✔
281
    changeset_list.for_each([&](Obj& cur_changeset) {
1,586✔
282
        stats.pending_changeset_bytes +=
1,586✔
283
            static_cast<size_t>(cur_changeset.get<int64_t>(m_changeset_original_changeset_size));
1,586✔
284
        return IteratorControl::AdvanceToNext;
1,586✔
285
    });
1,586✔
286

710✔
287
    return stats;
1,418✔
288
}
1,418✔
289

290
void PendingBootstrapStore::pop_front_pending(const TransactionRef& tr, size_t count)
291
{
1,534✔
292
    REALM_ASSERT_3(tr->get_transact_stage(), ==, DB::transact_Writing);
1,534✔
293
    auto bootstrap_table = tr->get_table(m_table);
1,534✔
294
    if (bootstrap_table->is_empty()) {
1,534✔
UNCOV
295
        return;
×
UNCOV
296
    }
×
297

768✔
298
    // We should only have one pending bootstrap at a time.
768✔
299
    REALM_ASSERT(bootstrap_table->size() == 1);
1,534✔
300

768✔
301
    auto bootstrap_obj = bootstrap_table->get_object(0);
1,534✔
302
    auto changeset_list = bootstrap_obj.get_linklist(m_changesets);
1,534✔
303
    REALM_ASSERT_3(changeset_list.size(), >=, count);
1,534✔
304
    if (count == changeset_list.size()) {
1,534✔
305
        changeset_list.clear();
1,398✔
306
    }
1,398✔
307
    else {
136✔
308
        for (size_t idx = 0; idx < count; ++idx) {
280✔
309
            changeset_list.remove(0);
144✔
310
        }
144✔
311
    }
136✔
312

768✔
313
    if (changeset_list.is_empty()) {
1,534✔
314
        m_logger.debug("Removing pending bootstrap obj for query version %1",
1,398✔
315
                       bootstrap_obj.get<int64_t>(m_query_version));
1,398✔
316
        bootstrap_obj.remove();
1,398✔
317
    }
1,398✔
318
    else {
136✔
319
        m_logger.debug("Removing pending bootstrap batch for query version %1. %2 changeset remaining",
136✔
320
                       bootstrap_obj.get<int64_t>(m_query_version), changeset_list.size());
136✔
321
    }
136✔
322

768✔
323
    m_has_pending = (bootstrap_table->is_empty() == false);
1,534✔
324
}
1,534✔
325

326
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc