• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_301264

30 Jul 2024 07:11PM UTC coverage: 91.111% (+0.009%) from 91.102%
github_pull_request_301264

Pull #7936

Evergreen

web-flow
Add support for multi-process subscription state change notifications (#7862)

As with the other multi-process notifications, the core idea here is to
eliminate the in-memory state and produce notifications based entirely on the
current state of the Realm file.

SubscriptionStore::update_state() has been replaced with separate functions for
the specific legal state transitions, which also take a write transaction as a
parameter. These functions are called by PendingBootstrapStore inside the same
write transaction as the bootstrap updates which changed the subscription
state. This is both a minor performance optimization (due to fewer writes) and
eliminates a brief window between the two writes where the Realm file was in an
inconsistent state.

There's a minor functional change here: previously old subscription sets were
superseded when the new one reached the Completed state, and now they are
superseded on AwaitingMark. This aligns it with when the new subscription set
becomes the one which is returned by get_active().
Pull Request #7936: Fix connection callback crashes when reloading with React Native

102800 of 181570 branches covered (56.62%)

216840 of 237996 relevant lines covered (91.11%)

5918493.47 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.46
/src/realm/sync/noinst/sync_metadata_schema.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm, Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/sync/noinst/sync_metadata_schema.hpp>
20

21
#include <realm/data_type.hpp>
22
#include <realm/transaction.hpp>
23
#include <realm/util/flat_map.hpp>
24
#include <stdexcept>
25

26
namespace realm::sync {
27
namespace {
28

29
constexpr static std::string_view c_flx_metadata_table("flx_metadata");
30
constexpr static std::string_view c_sync_internal_schemas_table("sync_internal_schemas");
31
constexpr static std::string_view c_meta_schema_version_field("schema_version");
32
constexpr static std::string_view c_meta_schema_schema_group_field("schema_group_name");
33

34
} // namespace
35

36
void create_sync_metadata_schema(const TransactionRef& tr, std::vector<SyncMetadataTable>* tables)
37
{
19,544✔
38
    util::FlatMap<std::string_view, TableRef> found_tables;
19,544✔
39
    for (auto& table : *tables) {
23,284✔
40
        if (tr->has_table(table.name)) {
23,284✔
41
            throw RuntimeError(
×
42
                ErrorCodes::RuntimeError,
×
43
                util::format("table %1 already existed when creating internal tables for sync", table.name));
×
44
        }
×
45
        TableRef table_ref;
23,284✔
46
        if (table.is_embedded) {
23,284✔
47
            table_ref = tr->add_table(table.name, Table::Type::Embedded);
3,740✔
48
        }
3,740✔
49
        else if (table.pk_info) {
19,544✔
50
            table_ref = tr->add_table_with_primary_key(table.name, table.pk_info->data_type, table.pk_info->name,
19,232✔
51
                                                       table.pk_info->is_optional);
19,232✔
52
            *table.pk_info->key_out = table_ref->get_primary_key_column();
19,232✔
53
        }
19,232✔
54
        else {
312✔
55
            table_ref = tr->add_table(table.name);
312✔
56
        }
312✔
57

58
        found_tables.insert({table.name, table_ref});
23,284✔
59
        *table.key_out = table_ref->get_key();
23,284✔
60
    }
23,284✔
61

62
    for (auto& table : *tables) {
23,284✔
63
        auto& table_ref = found_tables.at(table.name);
23,284✔
64
        for (auto& column : table.columns) {
79,744✔
65
            if (column.data_type == type_Link) {
79,744✔
66
                auto target_table_it = found_tables.find(column.target_table);
3,740✔
67
                if (target_table_it == found_tables.end()) {
3,740✔
68
                    throw LogicError(ErrorCodes::InvalidArgument,
×
69
                                     util::format("cannot link to non-existant table %1 from internal sync table %2",
×
70
                                                  column.target_table, table.name));
×
71
                }
×
72
                if (column.is_list) {
3,740✔
73
                    *column.key_out = table_ref->add_column_list(*target_table_it->second, column.name);
2,520✔
74
                }
2,520✔
75
                else {
1,220✔
76
                    *column.key_out = table_ref->add_column(*target_table_it->second, column.name);
1,220✔
77
                }
1,220✔
78
            }
3,740✔
79
            else {
76,004✔
80
                *column.key_out = table_ref->add_column(column.data_type, column.name, column.is_optional);
76,004✔
81
            }
76,004✔
82
        }
79,744✔
83
    }
23,284✔
84
}
19,544✔
85

86
void load_sync_metadata_schema(const TransactionRef& tr, std::vector<SyncMetadataTable>* tables)
87
{
10,288✔
88
    for (auto& table : *tables) {
11,802✔
89
        auto table_ref = tr->get_table(table.name);
11,802✔
90
        if (!table_ref) {
11,802✔
91
            throw RuntimeError(ErrorCodes::RuntimeError,
×
92
                               util::format("could not find internal sync table %1", table.name));
×
93
        }
×
94

95
        *table.key_out = table_ref->get_key();
11,802✔
96
        if (table.pk_info) {
11,802✔
97
            auto pk_col = table_ref->get_primary_key_column();
10,248✔
98
            if (auto pk_name = table_ref->get_column_name(pk_col); pk_name != table.pk_info->name) {
10,248✔
99
                throw RuntimeError(
×
100
                    ErrorCodes::RuntimeError,
×
101
                    util::format(
×
102
                        "primary key name of sync internal table %1 does not match (stored: %2, defined: %3)",
×
103
                        table.name, pk_name, table.pk_info->name));
×
104
            }
×
105
            if (auto pk_type = table_ref->get_column_type(pk_col); pk_type != table.pk_info->data_type) {
10,248✔
106
                throw RuntimeError(
×
107
                    ErrorCodes::RuntimeError,
×
108
                    util::format(
×
109
                        "primary key type of sync internal table %1 does not match (stored: %2, defined: %3)",
×
110
                        table.name, pk_type, table.pk_info->data_type));
×
111
            }
×
112
            if (auto is_nullable = table_ref->is_nullable(pk_col); is_nullable != table.pk_info->is_optional) {
10,248✔
113
                throw RuntimeError(
×
114
                    ErrorCodes::RuntimeError,
×
115
                    util::format(
×
116
                        "primary key nullabilty of sync internal table %1 does not match (stored: %2, defined: %3)",
×
117
                        table.name, is_nullable, table.pk_info->is_optional));
×
118
            }
×
119
            *table.pk_info->key_out = pk_col;
10,248✔
120
        }
10,248✔
121
        else if (table.is_embedded && !table_ref->is_embedded()) {
1,554✔
122
            throw RuntimeError(ErrorCodes::RuntimeError,
×
123
                               util::format("internal sync table %1 should be embedded, but is not", table.name));
×
124
        }
×
125

126
        if (table.columns.size() + size_t(table.pk_info ? 1 : 0) != table_ref->get_column_count()) {
11,802✔
127
            throw RuntimeError(
×
128
                ErrorCodes::RuntimeError,
×
129
                util::format("sync internal table %1 has a different number of columns than its schema", table.name));
×
130
        }
×
131

132
        for (auto& col : table.columns) {
26,748✔
133
            auto col_key = table_ref->get_column_key(col.name);
26,748✔
134
            if (!col_key) {
26,748✔
135
                throw RuntimeError(
×
136
                    ErrorCodes::RuntimeError,
×
137
                    util::format("column %1 is missing in sync internal table %2", col.name, table.name));
×
138
            }
×
139

140
            auto found_col_type = table_ref->get_column_type(col_key);
26,748✔
141
            if (found_col_type != col.data_type) {
26,748✔
142
                throw RuntimeError(
×
143
                    ErrorCodes::RuntimeError,
×
144
                    util::format("column %1 in sync internal table %2 is the wrong type", col.name, table.name));
×
145
            }
×
146

147
            if (col.is_optional != table_ref->is_nullable(col_key)) {
26,748✔
148
                throw RuntimeError(
×
149
                    ErrorCodes::RuntimeError,
×
150
                    util::format("column %1 in sync internal table %2 has different nullabilty than in its schema",
×
151
                                 col.name, table.name));
×
152
            }
×
153

154
            if (col.data_type == type_Link) {
26,748✔
155
                if (table_ref->get_link_target(col_key)->get_name() != col.target_table) {
1,514✔
156
                    RuntimeError(ErrorCodes::RuntimeError,
×
157
                                 util::format("column %1 in sync internal table %2 links to the wrong table %3",
×
158
                                              col.name, table.name, table_ref->get_link_target(col_key)->get_name()));
×
159
                }
×
160
            }
1,514✔
161
            *col.key_out = col_key;
26,748✔
162
        }
26,748✔
163
    }
11,802✔
164
}
10,288✔
165

166
SyncMetadataSchemaVersionsReader::SyncMetadataSchemaVersionsReader(const TransactionRef& tr)
167
{
45,046✔
168
    std::vector<SyncMetadataTable> unified_schema_version_table_def{
45,046✔
169
        {&m_table,
45,046✔
170
         c_sync_internal_schemas_table,
45,046✔
171
         {&m_schema_group_field, c_meta_schema_schema_group_field, type_String},
45,046✔
172
         {{&m_version_field, c_meta_schema_version_field, type_Int}}}};
45,046✔
173

174
    // Any type of transaction is allowed, including frozen and write, as long as it supports reading
175
    REALM_ASSERT_EX(tr->get_transact_stage() != DB::transact_Ready, tr->get_transact_stage());
45,046✔
176
    // If the legacy_meta_table exists, then this table hasn't been converted and
177
    // the metadata schema versions information has not been upgraded/not accurate
178
    if (tr->has_table(c_flx_metadata_table)) {
45,046✔
179
        return;
20✔
180
    }
20✔
181

182
    if (tr->has_table(c_sync_internal_schemas_table)) {
45,026✔
183
        // Load m_table with the table/schema information
184
        load_sync_metadata_schema(tr, &unified_schema_version_table_def);
7,898✔
185
    }
7,898✔
186
}
45,026✔
187

188
std::optional<int64_t> SyncMetadataSchemaVersionsReader::get_legacy_version(const TransactionRef& tr)
189
{
11,908✔
190
    if (!tr->has_table(c_flx_metadata_table)) {
11,908✔
191
        return std::nullopt;
11,892✔
192
    }
11,892✔
193

194
    TableKey legacy_table_key;
16✔
195
    ColKey legacy_version_key;
16✔
196
    std::vector<SyncMetadataTable> legacy_table_def{
16✔
197
        {&legacy_table_key, c_flx_metadata_table, {{&legacy_version_key, c_meta_schema_version_field, type_Int}}}};
16✔
198

199
    // Convert the legacy table to the regular schema versions table if it exists
200
    load_sync_metadata_schema(tr, &legacy_table_def);
16✔
201

202
    if (auto legacy_meta_table = tr->get_table(legacy_table_key);
16✔
203
        legacy_meta_table && legacy_meta_table->size() > 0) {
16✔
204
        auto legacy_obj = legacy_meta_table->get_object(0);
16✔
205
        return legacy_obj.get<int64_t>(legacy_version_key);
16✔
206
    }
16✔
207

208
    return std::nullopt;
×
209
}
16✔
210

211
std::optional<int64_t> SyncMetadataSchemaVersionsReader::get_version_for(const TransactionRef& tr,
212
                                                                         std::string_view schema_group_name)
213
{
34,476✔
214
    if (!m_table) {
34,476✔
215
        // The legacy version only applies to the subscription store, don't query otherwise
216
        if (schema_group_name == internal_schema_groups::c_flx_subscription_store) {
28,208✔
217
            if (auto legacy_version = get_legacy_version(tr)) {
1,276✔
218
                return legacy_version;
4✔
219
            }
4✔
220
        }
1,276✔
221
        return util::none;
28,204✔
222
    }
28,208✔
223

224
    auto schema_versions = tr->get_table(m_table);
6,268✔
225
    auto obj_key = schema_versions->find_primary_key(Mixed{StringData(schema_group_name)});
6,268✔
226
    if (!obj_key) {
6,268✔
227
        return util::none;
3,832✔
228
    }
3,832✔
229
    auto metadata_obj = schema_versions->get_object(obj_key);
2,436✔
230
    if (!metadata_obj) {
2,436✔
231
        return util::none;
×
232
    }
×
233

234
    return metadata_obj.get<int64_t>(m_version_field);
2,436✔
235
}
2,436✔
236

237
SyncMetadataSchemaVersions::SyncMetadataSchemaVersions(const TransactionRef& tr)
238
    : SyncMetadataSchemaVersionsReader(tr)
5,312✔
239
{
10,632✔
240
    std::vector<SyncMetadataTable> unified_schema_version_table_def{
10,632✔
241
        {&m_table,
10,632✔
242
         c_sync_internal_schemas_table,
10,632✔
243
         {&m_schema_group_field, c_meta_schema_schema_group_field, type_String},
10,632✔
244
         {{&m_version_field, c_meta_schema_version_field, type_Int}}}};
10,632✔
245

246
    DB::TransactStage orig = tr->get_transact_stage();
10,632✔
247
    bool modified = false;
10,632✔
248

249
    // Read and write transactions are allowed, but not frozen
250
    REALM_ASSERT_EX((orig == DB::transact_Reading || orig == DB::transact_Writing), orig);
10,632✔
251
    // If the versions table exists, then m_table would have been initialized by the reader constructor
252
    // If the versions table doesn't exist, then initialize it now
253
    if (!m_table) {
10,632✔
254
        // table should have already been initialized or needs to be created,
255
        // but re-initialize in case it isn't (e.g. both unified and legacy tables exist in DB)
256
        if (REALM_UNLIKELY(tr->has_table(c_sync_internal_schemas_table))) {
8,936✔
257
            load_sync_metadata_schema(tr, &unified_schema_version_table_def);
4✔
258
        }
4✔
259
        else {
8,932✔
260
            // Only write the versions table if it doesn't exist
261
            if (tr->get_transact_stage() != DB::transact_Writing) {
8,932✔
262
                tr->promote_to_write();
16✔
263
            }
16✔
264
            create_sync_metadata_schema(tr, &unified_schema_version_table_def);
8,932✔
265
            modified = true;
8,932✔
266
        }
8,932✔
267
    }
8,936✔
268

269
    if (auto legacy_version = get_legacy_version(tr)) {
10,632✔
270
        // Migrate from just having a subscription store metadata table to having multiple table groups with multiple
271
        // versions.
272
        if (tr->get_transact_stage() != DB::transact_Writing) {
12✔
273
            tr->promote_to_write();
4✔
274
        }
4✔
275
        // Only the flx subscription store can potentially have the legacy metadata table
276
        set_version_for(tr, internal_schema_groups::c_flx_subscription_store, *legacy_version);
12✔
277
        tr->remove_table(c_flx_metadata_table);
12✔
278
        modified = true;
12✔
279
    }
12✔
280
    if (!modified)
10,632✔
281
        return; // nothing to commit
1,696✔
282

283
    // Commit and revert to the original transact stage
284
    if (orig == DB::transact_Reading)
8,936✔
285
        tr->commit_and_continue_as_read();
20✔
286
    else
8,916✔
287
        tr->commit_and_continue_writing();
8,916✔
288
}
8,936✔
289

290
void SyncMetadataSchemaVersions::set_version_for(const TransactionRef& tr, std::string_view schema_group_name,
291
                                                 int64_t version)
292
{
10,644✔
293
    REALM_ASSERT(m_table);
10,644✔
294

295
    auto schema_versions = tr->get_table(m_table);
10,644✔
296
    auto metadata_obj = schema_versions->create_object_with_primary_key(Mixed{StringData(schema_group_name)});
10,644✔
297
    metadata_obj.set(m_version_field, version);
10,644✔
298
}
10,644✔
299

300
} // namespace realm::sync
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc