• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1817

04 Nov 2023 12:29AM UTC coverage: 91.695% (+0.04%) from 91.66%
1817

push

Evergreen

web-flow
Use a single write transaction for DiscardLocal client resets on FLX realms (#7110)

Updating the subscription store in a separate write transaction from the
recovery means that we temporarily commit an invalid state. If the application
crashes between committing the client reset diff and updating the subscription
store, the next launch of the application would try to use the now-invalid
pending subscriptions that should have been discarded.

92122 of 168844 branches covered (0.0%)

141 of 146 new or added lines in 7 files covered. (96.58%)

59 existing lines in 12 files now uncovered.

230819 of 251726 relevant lines covered (91.69%)

6481779.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.69
/src/realm/sync/noinst/client_reset.cpp
1
///////////////////////////////////////////////////////////////////////////
2
//
3
// Copyright 2021 Realm Inc.
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License");
6
// you may not use this file except in compliance with the License.
7
// You may obtain a copy of the License at
8
//
9
// http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS,
13
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
// See the License for the specific language governing permissions and
15
// limitations under the License.
16
//
17
////////////////////////////////////////////////////////////////////////////
18

19
#include <realm/transaction.hpp>
20
#include <realm/dictionary.hpp>
21
#include <realm/object_converter.hpp>
22
#include <realm/table_view.hpp>
23
#include <realm/set.hpp>
24

25
#include <realm/sync/history.hpp>
26
#include <realm/sync/changeset_parser.hpp>
27
#include <realm/sync/instruction_applier.hpp>
28
#include <realm/sync/noinst/client_history_impl.hpp>
29
#include <realm/sync/noinst/client_reset.hpp>
30
#include <realm/sync/noinst/client_reset_recovery.hpp>
31
#include <realm/sync/subscriptions.hpp>
32

33
#include <realm/util/compression.hpp>
34

35
#include <algorithm>
36
#include <chrono>
37
#include <vector>
38

39
using namespace realm;
40
using namespace _impl;
41
using namespace sync;
42

43
namespace realm {
44

45
std::ostream& operator<<(std::ostream& os, const ClientResyncMode& mode)
46
{
21,152✔
47
    switch (mode) {
21,152✔
48
        case ClientResyncMode::Manual:
✔
49
            os << "Manual";
×
50
            break;
×
51
        case ClientResyncMode::DiscardLocal:
10,564✔
52
            os << "DiscardLocal";
10,564✔
53
            break;
10,564✔
54
        case ClientResyncMode::Recover:
10,508✔
55
            os << "Recover";
10,508✔
56
            break;
10,508✔
57
        case ClientResyncMode::RecoverOrDiscard:
80✔
58
            os << "RecoverOrDiscard";
80✔
59
            break;
80✔
60
    }
21,152✔
61
    return os;
21,152✔
62
}
21,152✔
63

64
} // namespace realm
65

66
namespace realm::_impl::client_reset {
67

68
static inline bool should_skip_table(const Transaction& group, TableKey key)
69
{
221,368✔
70
    return !group.table_is_public(key);
221,368✔
71
}
221,368✔
72

73
void transfer_group(const Transaction& group_src, Transaction& group_dst, util::Logger& logger,
74
                    bool allow_schema_additions)
75
{
6,756✔
76
    logger.debug("transfer_group, src size = %1, dst size = %2, allow_schema_additions = %3", group_src.size(),
6,756✔
77
                 group_dst.size(), allow_schema_additions);
6,756✔
78

3,378✔
79
    // Turn off the sync history tracking during state transfer since it will be thrown
3,378✔
80
    // away immediately after anyways. This reduces the memory footprint of a client reset.
3,378✔
81
    ClientReplication* client_repl = dynamic_cast<ClientReplication*>(group_dst.get_replication());
6,756✔
82
    REALM_ASSERT_RELEASE(client_repl);
6,756✔
83
    TempShortCircuitReplication sync_history_guard(*client_repl);
6,756✔
84

3,378✔
85
    // Find all tables in dst that should be removed.
3,378✔
86
    std::set<std::string> tables_to_remove;
6,756✔
87
    for (auto table_key : group_dst.get_table_keys()) {
30,272✔
88
        if (should_skip_table(group_dst, table_key))
30,272✔
89
            continue;
14,288✔
90
        StringData table_name = group_dst.get_table_name(table_key);
15,984✔
91
        logger.debug("key = %1, table_name = %2", table_key.value, table_name);
15,984✔
92
        ConstTableRef table_src = group_src.get_table(table_name);
15,984✔
93
        if (!table_src) {
15,984✔
94
            logger.debug("Table '%1' will be removed", table_name);
40✔
95
            tables_to_remove.insert(table_name);
40✔
96
            continue;
40✔
97
        }
40✔
98
        // Check whether the table type is the same.
7,972✔
99
        TableRef table_dst = group_dst.get_table(table_key);
15,944✔
100
        auto pk_col_src = table_src->get_primary_key_column();
15,944✔
101
        auto pk_col_dst = table_dst->get_primary_key_column();
15,944✔
102
        bool has_pk_src = bool(pk_col_src);
15,944✔
103
        bool has_pk_dst = bool(pk_col_dst);
15,944✔
104
        if (has_pk_src != has_pk_dst) {
15,944✔
105
            throw ClientResetFailed(util::format("Client reset requires a primary key column in %1 table '%2'",
×
106
                                                 (has_pk_src ? "dest" : "source"), table_name));
×
107
        }
×
108
        if (!has_pk_src)
15,944✔
109
            continue;
648✔
110

7,648✔
111
        // Now the tables both have primary keys. Check type.
7,648✔
112
        if (pk_col_src.get_type() != pk_col_dst.get_type()) {
15,296✔
113
            throw ClientResetFailed(
4✔
114
                util::format("Client reset found incompatible primary key types (%1 vs %2) on '%3'",
4✔
115
                             pk_col_src.get_type(), pk_col_dst.get_type(), table_name));
4✔
116
        }
4✔
117
        // Check collection type, nullability etc. but having an index doesn't matter;
7,646✔
118
        ColumnAttrMask pk_col_src_attr = pk_col_src.get_attrs();
15,292✔
119
        ColumnAttrMask pk_col_dst_attr = pk_col_dst.get_attrs();
15,292✔
120
        pk_col_src_attr.reset(ColumnAttr::col_attr_Indexed);
15,292✔
121
        pk_col_dst_attr.reset(ColumnAttr::col_attr_Indexed);
15,292✔
122
        if (pk_col_src_attr != pk_col_dst_attr) {
15,292✔
123
            throw ClientResetFailed(
×
124
                util::format("Client reset found incompatible primary key attributes (%1 vs %2) on '%3'",
×
125
                             pk_col_src.value, pk_col_dst.value, table_name));
×
126
        }
×
127
        // Check name.
7,646✔
128
        StringData pk_col_name_src = table_src->get_column_name(pk_col_src);
15,292✔
129
        StringData pk_col_name_dst = table_dst->get_column_name(pk_col_dst);
15,292✔
130
        if (pk_col_name_src != pk_col_name_dst) {
15,292✔
131
            throw ClientResetFailed(
×
132
                util::format("Client reset requires equal pk column names but '%1' != '%2' on '%3'", pk_col_name_src,
×
133
                             pk_col_name_dst, table_name));
×
134
        }
×
135
        // The table survives.
7,646✔
136
        logger.debug("Table '%1' will remain", table_name);
15,292✔
137
    }
15,292✔
138

3,378✔
139
    // If there have been any tables marked for removal stop.
3,378✔
140
    // We consider two possible options for recovery:
3,378✔
141
    // 1: Remove the tables. But this will generate destructive schema
3,378✔
142
    //    schema changes that the local Realm cannot advance through.
3,378✔
143
    //    Since this action will fail down the line anyway, give up now.
3,378✔
144
    // 2: Keep the tables locally and ignore them. But the local app schema
3,378✔
145
    //    still has these classes and trying to modify anything in them will
3,378✔
146
    //    create sync instructions on tables that sync doesn't know about.
3,378✔
147
    // As an exception in recovery mode, we assume that the corresponding
3,378✔
148
    // additive schema changes will be part of the recovery upload. If they
3,378✔
149
    // are present, then the server can choose to allow them (if in dev mode).
3,378✔
150
    // If they are not present, then the server will emit an error the next time
3,378✔
151
    // a value is set on the unknown property.
3,378✔
152
    if (!allow_schema_additions && !tables_to_remove.empty()) {
6,754✔
153
        std::string names_list;
16✔
154
        for (const std::string& table_name : tables_to_remove) {
24✔
155
            names_list += Group::table_name_to_class_name(table_name);
24✔
156
            names_list += ", ";
24✔
157
        }
24✔
158
        if (names_list.size() > 2) {
16✔
159
            // remove the final ", "
8✔
160
            names_list = names_list.substr(0, names_list.size() - 2);
16✔
161
        }
16✔
162
        throw ClientResetFailed(
16✔
163
            util::format("Client reset cannot recover when classes have been removed: {%1}", names_list));
16✔
164
    }
16✔
165

3,368✔
166
    // Create new tables in dst if needed.
3,368✔
167
    for (auto table_key : group_src.get_table_keys()) {
23,020✔
168
        if (should_skip_table(group_src, table_key))
23,020✔
169
            continue;
7,092✔
170
        ConstTableRef table_src = group_src.get_table(table_key);
15,928✔
171
        StringData table_name = table_src->get_name();
15,928✔
172
        auto pk_col_src = table_src->get_primary_key_column();
15,928✔
173
        TableRef table_dst = group_dst.get_table(table_name);
15,928✔
174
        if (!table_dst) {
15,928✔
175
            // Create the table.
16✔
176
            if (table_src->is_embedded()) {
32✔
177
                REALM_ASSERT(!pk_col_src);
16✔
178
                group_dst.add_table(table_name, Table::Type::Embedded);
16✔
179
            }
16✔
180
            else {
16✔
181
                REALM_ASSERT(pk_col_src); // a sync table will have a pk
16✔
182
                auto pk_col_src = table_src->get_primary_key_column();
16✔
183
                DataType pk_type = DataType(pk_col_src.get_type());
16✔
184
                StringData pk_col_name = table_src->get_column_name(pk_col_src);
16✔
185
                group_dst.add_table_with_primary_key(table_name, pk_type, pk_col_name, pk_col_src.is_nullable(),
16✔
186
                                                     table_src->get_table_type());
16✔
187
            }
16✔
188
        }
32✔
189
    }
15,928✔
190

3,368✔
191
    // Now the class tables are identical.
3,368✔
192
    size_t num_tables;
6,736✔
193
    {
6,736✔
194
        size_t num_tables_src = 0;
6,736✔
195
        for (auto table_key : group_src.get_table_keys()) {
23,020✔
196
            if (!should_skip_table(group_src, table_key))
23,020✔
197
                ++num_tables_src;
15,928✔
198
        }
23,020✔
199
        size_t num_tables_dst = 0;
6,736✔
200
        for (auto table_key : group_dst.get_table_keys()) {
30,176✔
201
            if (!should_skip_table(group_dst, table_key))
30,176✔
202
                ++num_tables_dst;
15,944✔
203
        }
30,176✔
204
        REALM_ASSERT_EX(allow_schema_additions || num_tables_src == num_tables_dst, num_tables_src, num_tables_dst);
6,736✔
205
        num_tables = num_tables_src;
6,736✔
206
    }
6,736✔
207
    logger.debug("The number of tables is %1", num_tables);
6,736✔
208

3,368✔
209
    // Remove columns in dst if they are absent in src.
3,368✔
210
    for (auto table_key : group_src.get_table_keys()) {
23,016✔
211
        if (should_skip_table(group_src, table_key))
23,016✔
212
            continue;
7,092✔
213
        ConstTableRef table_src = group_src.get_table(table_key);
15,924✔
214
        StringData table_name = table_src->get_name();
15,924✔
215
        TableRef table_dst = group_dst.get_table(table_name);
15,924✔
216
        REALM_ASSERT(table_dst);
15,924✔
217
        std::vector<std::string> columns_to_remove;
15,924✔
218
        for (ColKey col_key : table_dst->get_column_keys()) {
52,120✔
219
            StringData col_name = table_dst->get_column_name(col_key);
52,120✔
220
            ColKey col_key_src = table_src->get_column_key(col_name);
52,120✔
221
            if (!col_key_src) {
52,120✔
222
                columns_to_remove.push_back(col_name);
32✔
223
                continue;
32✔
224
            }
32✔
225
        }
52,120✔
226
        if (!allow_schema_additions && !columns_to_remove.empty()) {
15,924✔
227
            std::string columns_list;
4✔
228
            for (const std::string& col_name : columns_to_remove) {
12✔
229
                columns_list += col_name;
12✔
230
                columns_list += ", ";
12✔
231
            }
12✔
232
            throw ClientResetFailed(
4✔
233
                util::format("Client reset cannot recover when columns have been removed from '%1': {%2}", table_name,
4✔
234
                             columns_list));
4✔
235
        }
4✔
236
    }
15,924✔
237

3,368✔
238
    // Add columns in dst if present in src and absent in dst.
3,368✔
239
    for (auto table_key : group_src.get_table_keys()) {
22,996✔
240
        if (should_skip_table(group_src, table_key))
22,996✔
241
            continue;
7,092✔
242
        ConstTableRef table_src = group_src.get_table(table_key);
15,904✔
243
        StringData table_name = table_src->get_name();
15,904✔
244
        TableRef table_dst = group_dst.get_table(table_name);
15,904✔
245
        REALM_ASSERT(table_dst);
15,904✔
246
        for (ColKey col_key : table_src->get_column_keys()) {
52,132✔
247
            StringData col_name = table_src->get_column_name(col_key);
52,132✔
248
            ColKey col_key_dst = table_dst->get_column_key(col_name);
52,132✔
249
            if (!col_key_dst) {
52,132✔
250
                DataType col_type = table_src->get_column_type(col_key);
128✔
251
                bool nullable = col_key.is_nullable();
128✔
252
                auto search_index_type = table_src->search_index_type(col_key);
128✔
253
                logger.trace("Create column, table = %1, column name = %2, "
128✔
254
                             " type = %3, nullable = %4, search_index = %5",
128✔
255
                             table_name, col_name, col_key.get_type(), nullable, search_index_type);
128✔
256
                ColKey col_key_dst;
128✔
257
                if (Table::is_link_type(col_key.get_type())) {
128✔
258
                    ConstTableRef target_src = table_src->get_link_target(col_key);
48✔
259
                    TableRef target_dst = group_dst.get_table(target_src->get_name());
48✔
260
                    if (col_key.is_list()) {
48✔
261
                        col_key_dst = table_dst->add_column_list(*target_dst, col_name);
16✔
262
                    }
16✔
263
                    else if (col_key.is_set()) {
32✔
264
                        col_key_dst = table_dst->add_column_set(*target_dst, col_name);
×
265
                    }
×
266
                    else if (col_key.is_dictionary()) {
32✔
267
                        DataType key_type = table_src->get_dictionary_key_type(col_key);
8✔
268
                        col_key_dst = table_dst->add_column_dictionary(*target_dst, col_name, key_type);
8✔
269
                    }
8✔
270
                    else {
24✔
271
                        REALM_ASSERT(!col_key.is_collection());
24✔
272
                        col_key_dst = table_dst->add_column(*target_dst, col_name);
24✔
273
                    }
24✔
274
                }
48✔
275
                else if (col_key.is_list()) {
80✔
276
                    col_key_dst = table_dst->add_column_list(col_type, col_name, nullable);
8✔
277
                }
8✔
278
                else if (col_key.is_set()) {
72✔
279
                    col_key_dst = table_dst->add_column_set(col_type, col_name, nullable);
8✔
280
                }
8✔
281
                else if (col_key.is_dictionary()) {
64✔
282
                    DataType key_type = table_src->get_dictionary_key_type(col_key);
8✔
283
                    col_key_dst = table_dst->add_column_dictionary(col_type, col_name, nullable, key_type);
8✔
284
                }
8✔
285
                else {
56✔
286
                    REALM_ASSERT(!col_key.is_collection());
56✔
287
                    col_key_dst = table_dst->add_column(col_type, col_name, nullable);
56✔
288
                }
56✔
289

64✔
290
                if (search_index_type != IndexType::None)
128✔
291
                    table_dst->add_search_index(col_key_dst, search_index_type);
×
292
            }
128✔
293
            else {
52,004✔
294
                // column preexists in dest, make sure the types match
26,002✔
295
                if (col_key.get_type() != col_key_dst.get_type()) {
52,004✔
296
                    throw ClientResetFailed(util::format(
8✔
297
                        "Incompatable column type change detected during client reset for '%1.%2' (%3 vs %4)",
8✔
298
                        table_name, col_name, col_key.get_type(), col_key_dst.get_type()));
8✔
299
                }
8✔
300
                ColumnAttrMask src_col_attrs = col_key.get_attrs();
51,996✔
301
                ColumnAttrMask dst_col_attrs = col_key_dst.get_attrs();
51,996✔
302
                src_col_attrs.reset(ColumnAttr::col_attr_Indexed);
51,996✔
303
                dst_col_attrs.reset(ColumnAttr::col_attr_Indexed);
51,996✔
304
                // make sure the attributes such as collection type, nullability etc. match
25,998✔
305
                // but index equality doesn't matter here.
25,998✔
306
                if (src_col_attrs != dst_col_attrs) {
51,996✔
307
                    throw ClientResetFailed(util::format(
×
308
                        "Incompatable column attribute change detected during client reset for '%1.%2' (%3 vs %4)",
×
309
                        table_name, col_name, col_key.value, col_key_dst.value));
×
310
                }
×
311
            }
51,996✔
312
        }
52,132✔
313
    }
15,904✔
314

3,366✔
315
    // Now the schemas are identical.
3,366✔
316

3,366✔
317
    // Remove objects in dst that are absent in src.
3,366✔
318
    for (auto table_key : group_src.get_table_keys()) {
22,956✔
319
        if (should_skip_table(group_src, table_key))
22,956✔
320
            continue;
7,068✔
321
        auto table_src = group_src.get_table(table_key);
15,888✔
322
        // There are no primary keys in embedded tables but this is ok, because
7,944✔
323
        // embedded objects are tied to the lifetime of top level objects.
7,944✔
324
        if (table_src->is_embedded())
15,888✔
325
            continue;
664✔
326
        StringData table_name = table_src->get_name();
15,224✔
327
        logger.debug("Removing objects in '%1'", table_name);
15,224✔
328
        auto table_dst = group_dst.get_table(table_name);
15,224✔
329

7,612✔
330
        auto pk_col = table_dst->get_primary_key_column();
15,224✔
331
        REALM_ASSERT_DEBUG(pk_col); // sync realms always have a pk
15,224✔
332
        std::vector<std::pair<Mixed, ObjKey>> objects_to_remove;
15,224✔
333
        for (auto obj : *table_dst) {
21,324✔
334
            auto pk = obj.get_any(pk_col);
21,324✔
335
            if (!table_src->find_primary_key(pk)) {
21,324✔
336
                objects_to_remove.emplace_back(pk, obj.get_key());
416✔
337
            }
416✔
338
        }
21,324✔
339
        for (auto& pair : objects_to_remove) {
7,820✔
340
            logger.debug("  removing '%1'", pair.first);
416✔
341
            table_dst->remove_object(pair.second);
416✔
342
        }
416✔
343
    }
15,224✔
344

3,362✔
345
    // We must re-create any missing objects that are absent in dst before trying to copy
3,362✔
346
    // their properties because creating them may re-create any dangling links which would
3,362✔
347
    // otherwise cause inconsistencies when re-creating lists of links.
3,362✔
348
    for (auto table_key : group_src.get_table_keys()) {
22,956✔
349
        ConstTableRef table_src = group_src.get_table(table_key);
22,956✔
350
        auto table_name = table_src->get_name();
22,956✔
351
        if (should_skip_table(group_src, table_key) || table_src->is_embedded())
22,956✔
352
            continue;
7,732✔
353
        TableRef table_dst = group_dst.get_table(table_name);
15,224✔
354
        auto pk_col = table_src->get_primary_key_column();
15,224✔
355
        REALM_ASSERT(pk_col);
15,224✔
356
        logger.debug("Creating missing objects for table '%1', number of rows = %2, "
15,224✔
357
                     "primary_key_col = %3, primary_key_type = %4",
15,224✔
358
                     table_name, table_src->size(), pk_col.get_index().val, pk_col.get_type());
15,224✔
359
        for (const Obj& src : *table_src) {
21,408✔
360
            bool created = false;
21,408✔
361
            table_dst->create_object_with_primary_key(src.get_primary_key(), &created);
21,408✔
362
            if (created) {
21,408✔
363
                logger.debug("   created %1", src.get_primary_key());
500✔
364
            }
500✔
365
        }
21,408✔
366
    }
15,224✔
367

3,362✔
368
    converters::EmbeddedObjectConverter embedded_tracker;
6,724✔
369
    // Now src and dst have identical schemas and no extraneous objects from dst.
3,362✔
370
    // There may be missing object from src and the values of existing objects may
3,362✔
371
    // still differ. Diff all the values and create missing objects on the fly.
3,362✔
372
    for (auto table_key : group_src.get_table_keys()) {
22,956✔
373
        if (should_skip_table(group_src, table_key))
22,956✔
374
            continue;
7,068✔
375
        ConstTableRef table_src = group_src.get_table(table_key);
15,888✔
376
        // Embedded objects don't have a primary key, so they are handled
7,944✔
377
        // as a special case when they are encountered as a link value.
7,944✔
378
        if (table_src->is_embedded())
15,888✔
379
            continue;
664✔
380
        StringData table_name = table_src->get_name();
15,224✔
381
        TableRef table_dst = group_dst.get_table(table_name);
15,224✔
382
        REALM_ASSERT_EX(allow_schema_additions || table_src->get_column_count() == table_dst->get_column_count(),
15,224✔
383
                        allow_schema_additions, table_src->get_column_count(), table_dst->get_column_count());
15,224✔
384
        auto pk_col = table_src->get_primary_key_column();
15,224✔
385
        REALM_ASSERT(pk_col);
15,224✔
386
        logger.debug("Updating values for table '%1', number of rows = %2, "
15,224✔
387
                     "number of columns = %3, primary_key_col = %4, "
15,224✔
388
                     "primary_key_type = %5",
15,224✔
389
                     table_name, table_src->size(), table_src->get_column_count(), pk_col.get_index().val,
15,224✔
390
                     pk_col.get_type());
15,224✔
391

7,612✔
392
        converters::InterRealmObjectConverter converter(table_src, table_dst, &embedded_tracker);
15,224✔
393

7,612✔
394
        for (const Obj& src : *table_src) {
21,408✔
395
            auto src_pk = src.get_primary_key();
21,408✔
396
            // create the object - it should have been created above.
10,704✔
397
            auto dst = table_dst->get_object_with_primary_key(src_pk);
21,408✔
398
            REALM_ASSERT(dst);
21,408✔
399

10,704✔
400
            bool updated = false;
21,408✔
401
            converter.copy(src, dst, &updated);
21,408✔
402
            if (updated) {
21,408✔
403
                logger.debug("  updating %1", src_pk);
7,368✔
404
            }
7,368✔
405
        }
21,408✔
406
        embedded_tracker.process_pending();
15,224✔
407
    }
15,224✔
408
}
6,724✔
409

410
// A table without a "class_" prefix will not generate sync instructions.
411
constexpr static std::string_view s_meta_reset_table_name("client_reset_metadata");
412
constexpr static std::string_view s_pk_col_name("id");
413
constexpr static std::string_view s_version_column_name("version");
414
constexpr static std::string_view s_timestamp_col_name("event_time");
415
constexpr static std::string_view s_reset_type_col_name("type_of_reset");
416
constexpr int64_t metadata_version = 1;
417

418
void remove_pending_client_resets(Transaction& wt)
419
{
152✔
420
    if (auto table = wt.get_table(s_meta_reset_table_name); table && !table->is_empty()) {
152✔
421
        table->clear();
152✔
422
    }
152✔
423
}
152✔
424

425
util::Optional<PendingReset> has_pending_reset(const Transaction& rt)
426
{
16,782✔
427
    ConstTableRef table = rt.get_table(s_meta_reset_table_name);
16,782✔
428
    if (!table || table->size() == 0) {
16,782✔
429
        return util::none;
16,020✔
430
    }
16,020✔
431
    ColKey timestamp_col = table->get_column_key(s_timestamp_col_name);
762✔
432
    ColKey type_col = table->get_column_key(s_reset_type_col_name);
762✔
433
    ColKey version_col = table->get_column_key(s_version_column_name);
762✔
434
    REALM_ASSERT(timestamp_col);
762✔
435
    REALM_ASSERT(type_col);
762✔
436
    REALM_ASSERT(version_col);
762✔
437
    if (table->size() > 1) {
762✔
438
        // this may happen if a future version of this code changes the format and expectations around reset metadata.
439
        throw ClientResetFailed(
×
440
            util::format("Previous client resets detected (%1) but only one is expected.", table->size()));
×
441
    }
×
442
    Obj first = *table->begin();
762✔
443
    REALM_ASSERT(first);
762✔
444
    PendingReset pending;
762✔
445
    int64_t version = first.get<int64_t>(version_col);
762✔
446
    pending.time = first.get<Timestamp>(timestamp_col);
762✔
447
    if (version > metadata_version) {
762✔
448
        throw ClientResetFailed(util::format("Unsupported client reset metadata version: %1 vs %2, from %3", version,
×
449
                                             metadata_version, pending.time));
×
450
    }
×
451
    int64_t type = first.get<int64_t>(type_col);
762✔
452
    if (type == 0) {
762✔
453
        pending.type = ClientResyncMode::DiscardLocal;
440✔
454
    }
440✔
455
    else if (type == 1) {
322✔
456
        pending.type = ClientResyncMode::Recover;
320✔
457
    }
320✔
458
    else {
2✔
459
        throw ClientResetFailed(
2✔
460
            util::format("Unsupported client reset metadata type: %1 from %2", type, pending.time));
2✔
461
    }
2✔
462
    return pending;
760✔
463
}
760✔
464

465
void track_reset(Transaction& wt, ClientResyncMode mode)
466
{
6,780✔
467
    REALM_ASSERT(mode != ClientResyncMode::Manual);
6,780✔
468
    TableRef table = wt.get_table(s_meta_reset_table_name);
6,780✔
469
    ColKey version_col, timestamp_col, type_col;
6,780✔
470
    if (!table) {
6,780✔
471
        table = wt.add_table_with_primary_key(s_meta_reset_table_name, type_ObjectId, s_pk_col_name);
6,744✔
472
        REALM_ASSERT(table);
6,744✔
473
        version_col = table->add_column(type_Int, s_version_column_name);
6,744✔
474
        timestamp_col = table->add_column(type_Timestamp, s_timestamp_col_name);
6,744✔
475
        type_col = table->add_column(type_Int, s_reset_type_col_name);
6,744✔
476
    }
6,744✔
477
    else {
36✔
478
        version_col = table->get_column_key(s_version_column_name);
36✔
479
        timestamp_col = table->get_column_key(s_timestamp_col_name);
36✔
480
        type_col = table->get_column_key(s_reset_type_col_name);
36✔
481
    }
36✔
482
    REALM_ASSERT(version_col);
6,780✔
483
    REALM_ASSERT(timestamp_col);
6,780✔
484
    REALM_ASSERT(type_col);
6,780✔
485
    int64_t mode_val = 0; // Discard
6,780✔
486
    if (mode == ClientResyncMode::Recover || mode == ClientResyncMode::RecoverOrDiscard) {
6,780✔
487
        mode_val = 1; // Recover
3,400✔
488
    }
3,400✔
489

3,390✔
490
    if (table->size() > 1) {
6,780✔
491
        // this may happen if a future version of this code changes the format and expectations around reset metadata.
492
        throw ClientResetFailed(
×
493
            util::format("Previous client resets detected (%1) but only one is expected.", table->size()));
×
494
    }
×
495
    table->create_object_with_primary_key(ObjectId::gen(),
6,780✔
496
                                          {{version_col, metadata_version},
6,780✔
497
                                           {timestamp_col, Timestamp(std::chrono::system_clock::now())},
6,780✔
498
                                           {type_col, mode_val}});
6,780✔
499
}
6,780✔
500

501
static ClientResyncMode reset_precheck_guard(Transaction& wt, ClientResyncMode mode, bool recovery_is_allowed,
502
                                             util::Logger& logger)
503
{
6,776✔
504
    if (auto previous_reset = has_pending_reset(wt)) {
6,776✔
505
        logger.info("A previous reset was detected of type: '%1' at: %2", previous_reset->type, previous_reset->time);
32✔
506
        switch (previous_reset->type) {
32✔
507
            case ClientResyncMode::Manual:
✔
508
                REALM_UNREACHABLE();
509
            case ClientResyncMode::DiscardLocal:
12✔
510
                throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
12✔
511
                                                     "giving up on '%3' mode to prevent a cycle",
12✔
512
                                                     previous_reset->type, previous_reset->time, mode));
12✔
513
            case ClientResyncMode::Recover:
20✔
514
                switch (mode) {
20✔
515
                    case ClientResyncMode::Recover:
8✔
516
                        throw ClientResetFailed(util::format("A previous '%1' mode reset from %2 did not succeed, "
8✔
517
                                                             "giving up on '%3' mode to prevent a cycle",
8✔
518
                                                             previous_reset->type, previous_reset->time, mode));
8✔
519
                    case ClientResyncMode::RecoverOrDiscard:
8✔
520
                        mode = ClientResyncMode::DiscardLocal;
8✔
521
                        logger.info("A previous '%1' mode reset from %2 downgrades this mode ('%3') to DiscardLocal",
8✔
522
                                    previous_reset->type, previous_reset->time, mode);
8✔
523
                        remove_pending_client_resets(wt);
8✔
524
                        break;
8✔
525
                    case ClientResyncMode::DiscardLocal:
4✔
526
                        remove_pending_client_resets(wt);
4✔
527
                        // previous mode Recover and this mode is Discard, this is not a cycle yet
2✔
528
                        break;
4✔
529
                    case ClientResyncMode::Manual:
✔
530
                        REALM_UNREACHABLE();
531
                }
20✔
532
                break;
16✔
533
            case ClientResyncMode::RecoverOrDiscard:
10✔
534
                throw ClientResetFailed(util::format("Unexpected previous '%1' mode reset from %2 did not "
×
535
                                                     "succeed, giving up on '%3' mode to prevent a cycle",
×
536
                                                     previous_reset->type, previous_reset->time, mode));
×
537
        }
6,756✔
538
    }
6,756✔
539
    if (!recovery_is_allowed) {
6,756✔
540
        if (mode == ClientResyncMode::Recover) {
4✔
541
            throw ClientResetFailed(
×
542
                "Client reset mode is set to 'Recover' but the server does not allow recovery for this client");
×
543
        }
×
544
        else if (mode == ClientResyncMode::RecoverOrDiscard) {
4✔
545
            logger.info("Client reset in 'RecoverOrDiscard' is choosing 'DiscardLocal' because the server does not "
4✔
546
                        "permit recovery for this client");
4✔
547
            mode = ClientResyncMode::DiscardLocal;
4✔
548
        }
4✔
549
    }
4✔
550
    track_reset(wt, mode);
6,756✔
551
    return mode;
6,756✔
552
}
6,756✔
553

554
LocalVersionIDs perform_client_reset_diff(DB& db_local, DB& db_remote, sync::SaltedFileIdent client_file_ident,
555
                                          util::Logger& logger, ClientResyncMode mode, bool recovery_is_allowed,
556
                                          bool* did_recover_out, sync::SubscriptionStore* sub_store,
557
                                          util::UniqueFunction<void(int64_t)> on_flx_version_complete)
558
{
6,776✔
559
    auto wt_local = db_local.start_write();
6,776✔
560
    auto actual_mode = reset_precheck_guard(*wt_local, mode, recovery_is_allowed, logger);
6,776✔
561
    bool recover_local_changes =
6,776✔
562
        actual_mode == ClientResyncMode::Recover || actual_mode == ClientResyncMode::RecoverOrDiscard;
6,776✔
563

3,388✔
564
    logger.info("Client reset: path_local = %1, "
6,776✔
565
                "client_file_ident = (ident: %2, salt: %3), "
6,776✔
566
                "remote_path = %4, requested_mode = %5, recovery_is_allowed = %6, "
6,776✔
567
                "actual_mode = %7, will_recover = %8",
6,776✔
568
                db_local.get_path(), client_file_ident.ident, client_file_ident.salt, db_remote.get_path(), mode,
6,776✔
569
                recovery_is_allowed, actual_mode, recover_local_changes);
6,776✔
570

3,388✔
571
    auto& repl_local = dynamic_cast<ClientReplication&>(*db_local.get_replication());
6,776✔
572
    auto& history_local = repl_local.get_history();
6,776✔
573
    history_local.ensure_updated(wt_local->get_version());
6,776✔
574
    SaltedFileIdent orig_file_ident = history_local.get_client_file_ident(*wt_local);
6,776✔
575
    VersionID old_version_local = wt_local->get_version_of_current_transaction();
6,776✔
576

3,388✔
577
    auto& repl_remote = dynamic_cast<ClientReplication&>(*db_remote.get_replication());
6,776✔
578
    auto& history_remote = repl_remote.get_history();
6,776✔
579

3,388✔
580
    sync::SaltedVersion fresh_server_version = {0, 0};
6,776✔
581
    {
6,776✔
582
        SyncProgress remote_progress;
6,776✔
583
        sync::version_type remote_version_unused;
6,776✔
584
        SaltedFileIdent remote_ident_unused;
6,776✔
585
        history_remote.get_status(remote_version_unused, remote_ident_unused, remote_progress);
6,776✔
586
        fresh_server_version = remote_progress.latest_server_version;
6,776✔
587
    }
6,776✔
588

3,388✔
589
    if (!recover_local_changes) {
6,776✔
590
        auto rt_remote = db_remote.start_read();
3,368✔
591
        // transform the local Realm such that all public tables become identical to the remote Realm
1,684✔
592
        transfer_group(*rt_remote, *wt_local, logger, false);
3,368✔
593

1,684✔
594
        // now that the state of the fresh and local Realms are identical,
1,684✔
595
        // reset the local sync history and steal the fresh Realm's ident
1,684✔
596
        history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version,
3,368✔
597
                                                   BinaryData());
3,368✔
598

1,684✔
599
        int64_t subscription_version = 0;
3,368✔
600
        if (sub_store) {
3,368✔
601
            subscription_version = sub_store->set_active_as_latest(*wt_local);
40✔
602
        }
40✔
603

1,684✔
604
        wt_local->commit_and_continue_as_read();
3,368✔
605
        if (did_recover_out) {
3,368✔
606
            *did_recover_out = false;
168✔
607
        }
168✔
608
        if (on_flx_version_complete) {
3,368✔
609
            on_flx_version_complete(subscription_version);
168✔
610
        }
168✔
611

1,684✔
612
        VersionID new_version_local = wt_local->get_version_of_current_transaction();
3,368✔
613
        logger.info("perform_client_reset_diff is done: old_version = (version: %1, index: %2), "
3,368✔
614
                    "new_version = (version: %3, index: %4)",
3,368✔
615
                    old_version_local.version, old_version_local.index, new_version_local.version,
3,368✔
616
                    new_version_local.index);
3,368✔
617
        return LocalVersionIDs{old_version_local, new_version_local};
3,368✔
618
    }
3,368✔
619

1,704✔
620
    auto remake_active_subscription = [&]() {
3,408✔
621
        if (!sub_store) {
64✔
UNCOV
622
            return;
×
UNCOV
623
        }
×
624
        auto subs = sub_store->get_active();
64✔
625
        int64_t before_version = subs.version();
64✔
626
        auto mut_subs = subs.make_mutable_copy();
64✔
627
        mut_subs.update_state(sync::SubscriptionSet::State::Complete);
64✔
628
        auto sub = std::move(mut_subs).commit();
64✔
629
        if (on_flx_version_complete) {
64✔
630
            on_flx_version_complete(sub.version());
64✔
631
        }
64✔
632
        logger.info("Recreated the active subscription set in the complete state (%1 -> %2)", before_version,
64✔
633
                    sub.version());
64✔
634
    };
64✔
635

1,704✔
636
    auto frozen_pre_local_state = db_local.start_frozen();
3,408✔
637
    auto local_changes = history_local.get_local_changes(wt_local->get_version());
3,408✔
638
    logger.info("Local changesets to recover: %1", local_changes.size());
3,408✔
639

1,704✔
640
    auto wt_remote = db_remote.start_write();
3,408✔
641

1,704✔
642
    BinaryData recovered_changeset;
3,408✔
643

1,704✔
644
    // FLX with recovery has to be done in multiple commits, which is significantly different than other modes
1,704✔
645
    if (sub_store) {
3,408✔
646
        // In FLX recovery, save a copy of the pending subscriptions for later. This
34✔
647
        // needs to be done before they are wiped out by remake_active_subscription()
34✔
648
        std::vector<SubscriptionSet> pending_subscriptions = sub_store->get_pending_subscriptions();
68✔
649
        // transform the local Realm such that all public tables become identical to the remote Realm
34✔
650
        transfer_group(*wt_remote, *wt_local, logger, recover_local_changes);
68✔
651
        // now that the state of the fresh and local Realms are identical,
34✔
652
        // reset the local sync history.
34✔
653
        // Note that we do not set the new file ident yet! This is done in the last commit.
34✔
654
        history_local.set_client_reset_adjustments(wt_local->get_version(), orig_file_ident, fresh_server_version,
68✔
655
                                                   recovered_changeset);
68✔
656
        // The local Realm is committed. There are no changes to the remote Realm.
34✔
657
        wt_remote->rollback_and_continue_as_read();
68✔
658
        wt_local->commit_and_continue_as_read();
68✔
659
        // Make a copy of the active subscription set and mark it as
34✔
660
        // complete. This will cause all other subscription sets to become superceded.
34✔
661
        remake_active_subscription();
68✔
662
        // Apply local changes interleaved with pending subscriptions in separate commits
34✔
663
        // as needed. This has the consequence that there may be extra notifications along
34✔
664
        // the way to the final state, but since separate commits are necessary, this is
34✔
665
        // unavoidable.
34✔
666
        wt_local = db_local.start_write();
68✔
667
        RecoverLocalChangesetsHandler handler{*wt_local, *frozen_pre_local_state, logger, db_local.get_replication()};
68✔
668
        handler.process_changesets(local_changes, std::move(pending_subscriptions)); // throws on error
68✔
669
        // The new file ident is set as part of the final commit. This is to ensure that if
34✔
670
        // there are any exceptions during recovery, or the process is killed for some
34✔
671
        // reason, the client reset cycle detection will catch this and we will not attempt
34✔
672
        // to recover again. If we had set the ident in the first commit, a Realm which was
34✔
673
        // partially recovered, but interrupted may continue sync the next time it is
34✔
674
        // opened with only partially recovered state while having lost the history of any
34✔
675
        // offline modifications.
34✔
676
        history_local.set_client_file_ident_in_wt(wt_local->get_version(), client_file_ident);
68✔
677
        wt_local->commit_and_continue_as_read();
68✔
678
    }
68✔
679
    else {
3,340✔
680
        // In PBS recovery, the strategy is to apply all local changes to the remote realm first,
1,670✔
681
        // and then transfer the modified state all at once to the local Realm. This creates a
1,670✔
682
        // nice side effect for notifications because only the minimal state change is made.
1,670✔
683
        RecoverLocalChangesetsHandler handler{*wt_remote, *frozen_pre_local_state, logger,
3,340✔
684
                                              db_remote.get_replication()};
3,340✔
685
        handler.process_changesets(local_changes, {}); // throws on error
3,340✔
686
        ChangesetEncoder& encoder = repl_remote.get_instruction_encoder();
3,340✔
687
        const sync::ChangesetEncoder::Buffer& buffer = encoder.buffer();
3,340✔
688
        recovered_changeset = {buffer.data(), buffer.size()};
3,340✔
689

1,670✔
690
        // transform the local Realm such that all public tables become identical to the remote Realm
1,670✔
691
        transfer_group(*wt_remote, *wt_local, logger, recover_local_changes);
3,340✔
692

1,670✔
693
        // now that the state of the fresh and local Realms are identical,
1,670✔
694
        // reset the local sync history and steal the fresh Realm's ident
1,670✔
695
        history_local.set_client_reset_adjustments(wt_local->get_version(), client_file_ident, fresh_server_version,
3,340✔
696
                                                   recovered_changeset);
3,340✔
697

1,670✔
698
        // Finally, the local Realm is committed. The changes to the remote Realm are discarded.
1,670✔
699
        wt_remote->rollback_and_continue_as_read();
3,340✔
700
        wt_local->commit_and_continue_as_read();
3,340✔
701
    }
3,340✔
702

1,704✔
703
    if (did_recover_out) {
3,408✔
704
        *did_recover_out = true;
100✔
705
    }
100✔
706
    VersionID new_version_local = wt_local->get_version_of_current_transaction();
3,408✔
707
    logger.info("perform_client_reset_diff is done, old_version.version = %1, "
3,408✔
708
                "old_version.index = %2, new_version.version = %3, "
3,408✔
709
                "new_version.index = %4",
3,408✔
710
                old_version_local.version, old_version_local.index, new_version_local.version,
3,408✔
711
                new_version_local.index);
3,408✔
712

1,704✔
713
    return LocalVersionIDs{old_version_local, new_version_local};
3,408✔
714
}
3,408✔
715

716
} // namespace realm::_impl::client_reset
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc