• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 2288

01 May 2024 08:17PM UTC coverage: 90.738% (-0.02%) from 90.756%
2288

push

Evergreen

web-flow
[bindgen] fix signature of update_base_url (#7665)

101892 of 180214 branches covered (56.54%)

212458 of 234145 relevant lines covered (90.74%)

5757351.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.84
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26

27
namespace {
28

29
using namespace realm;
30
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
31

32
ColInfo get_col_info(const Table* table)
33
{
2,034✔
34
    std::vector<std::pair<ColKey, Table*>> cols;
2,034✔
35
    if (table) {
2,034✔
36
        for (auto col : table->get_column_keys()) {
666✔
37
            Table* embedded_table = nullptr;
666✔
38
            if (auto target_table = table->get_opposite_table(col)) {
666✔
39
                if (target_table->is_embedded())
216✔
40
                    embedded_table = target_table.unchecked_ptr();
168✔
41
            }
216✔
42
            cols.emplace_back(col, embedded_table);
666✔
43
        }
666✔
44
    }
282✔
45
    return cols;
2,034✔
46
}
2,034✔
47

48
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded);
49

50
void add_dictionary_to_repl(Dictionary& dict, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
51
{
114✔
52
    size_t sz = dict.size();
114✔
53
    for (size_t n = 0; n < sz; ++n) {
216✔
54
        const auto& [key, val] = dict.get_pair(n);
102✔
55
        if (val.is_type(type_List)) {
102✔
56
            repl.dictionary_insert(dict, n, key, Mixed{0, CollectionType::List});
6✔
57
            auto n_list = dict.get_list({key.get_string()});
6✔
58
            add_list_to_repl(*n_list, repl, nullptr);
6✔
59
        }
6✔
60
        else if (val.is_type(type_Dictionary)) {
96✔
61
            repl.dictionary_insert(dict, n, key, Mixed{0, CollectionType::Dictionary});
6✔
62
            auto n_dict = dict.get_dictionary({key.get_string()});
6✔
63
            add_dictionary_to_repl(*n_dict, repl, nullptr);
6✔
64
        }
6✔
65
        else {
90✔
66
            repl.dictionary_insert(dict, n, key, val);
90✔
67
            if (update_embedded) {
90✔
68
                update_embedded(val);
42✔
69
            }
42✔
70
        }
90✔
71
    }
102✔
72
}
114✔
73

74
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
75
{
114✔
76
    auto sz = list.size();
114✔
77
    for (size_t n = 0; n < sz; n++) {
246✔
78
        auto val = list.get_any(n);
132✔
79
        if (val.is_type(type_List)) {
132✔
80
            repl.list_insert(list, n, Mixed{0, CollectionType::List}, n);
6✔
81
            auto n_list = list.get_list({n});
6✔
82
            add_list_to_repl(*n_list, repl, nullptr);
6✔
83
        }
6✔
84
        else if (val.is_type(type_Dictionary)) {
126✔
85
            repl.list_insert(list, n, Mixed{0, CollectionType::Dictionary}, n);
6✔
86
            auto n_dict = list.get_dictionary({n});
6✔
87
            add_dictionary_to_repl(*n_dict, repl, nullptr);
6✔
88
        }
6✔
89
        else {
120✔
90
            repl.list_insert(list, n, val, n);
120✔
91
            if (update_embedded) {
120✔
92
                update_embedded(val);
18✔
93
            }
18✔
94
        }
120✔
95
    }
132✔
96
}
114✔
97

98
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
99
{
846✔
100
    for (auto elem : cols) {
1,908✔
101
        auto col = elem.first;
1,908✔
102
        auto embedded_table = elem.second;
1,908✔
103
        auto cols_2 = get_col_info(embedded_table);
1,908✔
104
        util::UniqueFunction<void(Mixed)> update_embedded = nullptr;
1,908✔
105
        if (embedded_table) {
1,908✔
106
            update_embedded = [&](Mixed val) {
156✔
107
                if (val.is_null()) {
114✔
108
                    return;
18✔
109
                }
18✔
110
                REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
111
                Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
112
                generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
113
                return;
96✔
114
            };
114✔
115
        }
156✔
116

117
        if (col.is_list()) {
1,908✔
118
            auto list = obj.get_listbase_ptr(col);
96✔
119
            repl.list_clear(*list);
96✔
120
            add_list_to_repl(*list, repl, std::move(update_embedded));
96✔
121
        }
96✔
122
        else if (col.is_set()) {
1,812✔
123
            auto set = obj.get_setbase_ptr(col);
18✔
124
            auto sz = set->size();
18✔
125
            for (size_t n = 0; n < sz; n++) {
54✔
126
                repl.set_insert(*set, n, set->get_any(n));
36✔
127
                // Sets cannot have embedded objects
128
            }
36✔
129
        }
18✔
130
        else if (col.is_dictionary()) {
1,794✔
131
            auto dict = obj.get_dictionary(col);
96✔
132
            add_dictionary_to_repl(dict, repl, std::move(update_embedded));
96✔
133
        }
96✔
134
        else {
1,698✔
135
            auto val = obj.get_any(col);
1,698✔
136
            if (val.is_type(type_List)) {
1,698✔
137
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::List));
6✔
138
                Lst<Mixed> list(obj, col);
6✔
139
                add_list_to_repl(list, repl, std::move(update_embedded));
6✔
140
            }
6✔
141
            else if (val.is_type(type_Dictionary)) {
1,692✔
142
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::Dictionary));
6✔
143
                Dictionary dict(obj, col);
6✔
144
                add_dictionary_to_repl(dict, repl, std::move(update_embedded));
6✔
145
            }
6✔
146
            else {
1,686✔
147
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,686✔
148
                if (update_embedded)
1,686✔
149
                    update_embedded(val);
54✔
150
            }
1,686✔
151
        }
1,698✔
152
    }
1,908✔
153
}
846✔
154

155
} // namespace
156

157
namespace realm {
158

159
std::map<DB::TransactStage, const char*> log_stage = {
160
    {DB::TransactStage::transact_Frozen, "frozen"},
161
    {DB::TransactStage::transact_Writing, "write"},
162
    {DB::TransactStage::transact_Reading, "read"},
163
};
164

165
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
166
    : Group(alloc)
1,581,096✔
167
    , db(_db)
1,581,096✔
168
    , m_read_lock(rli)
1,581,096✔
169
    , m_log_id(util::gen_log_id(this))
1,581,096✔
170
{
2,248,401✔
171
    bool writable = stage == DB::transact_Writing;
2,248,401✔
172
    m_transact_stage = DB::transact_Ready;
2,248,401✔
173
    set_transact_stage(stage);
2,248,401✔
174
    m_alloc.note_reader_start(this);
2,248,401✔
175
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
2,248,401✔
176
                  VersionID{rli.m_version, rli.m_reader_idx});
2,248,401✔
177
    if (db->m_logger) {
2,248,401✔
178
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "Start %1 %2: %3 ref %4",
708,231✔
179
                          log_stage[stage], m_log_id, rli.m_version, m_read_lock.m_top_ref);
708,231✔
180
    }
708,231✔
181
}
2,248,401✔
182

183
Transaction::~Transaction()
184
{
2,249,358✔
185
    // Note that this does not call close() - calling close() is done
186
    // implicitly by the deleter.
187
}
2,249,358✔
188

189
void Transaction::close()
190
{
2,268,084✔
191
    if (m_transact_stage == DB::transact_Writing) {
2,268,084✔
192
        rollback();
15,723✔
193
    }
15,723✔
194
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
2,268,084✔
195
        do_end_read();
1,870,821✔
196
    }
1,870,821✔
197
}
2,268,084✔
198

199
size_t Transaction::get_commit_size() const
200
{
46,305✔
201
    size_t sz = 0;
46,305✔
202
    if (m_transact_stage == DB::transact_Writing) {
46,305✔
203
        sz = m_alloc.get_commit_size();
14,712✔
204
    }
14,712✔
205
    return sz;
46,305✔
206
}
46,305✔
207

208
DB::version_type Transaction::commit()
209
{
259,434✔
210
    check_attached();
259,434✔
211

212
    if (m_transact_stage != DB::transact_Writing)
259,434✔
213
        throw WrongTransactionState("Not a write transaction");
×
214

215
    REALM_ASSERT(is_attached());
259,434✔
216

217
    // before committing, allow any accessors at group level or below to sync
218
    flush_accessors_for_commit();
259,434✔
219

220
    DB::version_type new_version = db->do_commit(*this); // Throws
259,434✔
221

222
    // We need to set m_read_lock in order for wait_for_change to work.
223
    // To set it, we grab a readlock on the latest available snapshot
224
    // and release it again.
225
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
259,434✔
226
    db->release_read_lock(lock_after_commit);
259,434✔
227

228
    db->end_write_on_correct_thread();
259,434✔
229

230
    do_end_read();
259,434✔
231
    m_read_lock = lock_after_commit;
259,434✔
232

233
    return new_version;
259,434✔
234
}
259,434✔
235

236
void Transaction::rollback()
237
{
16,695✔
238
    // rollback may happen as a consequence of exception handling in cases where
239
    // the DB has detached. If so, just back out without trying to change state.
240
    // the DB object has already been closed and no further processing is possible.
241
    if (!is_attached())
16,695✔
242
        return;
6✔
243
    if (m_transact_stage == DB::transact_Ready)
16,689✔
244
        return; // Idempotency
×
245

246
    if (m_transact_stage != DB::transact_Writing)
16,689✔
247
        throw WrongTransactionState("Not a write transaction");
×
248
    db->reset_free_space_tracking();
16,689✔
249
    if (!holds_write_mutex())
16,689✔
250
        db->end_write_on_correct_thread();
16,689✔
251

252
    do_end_read();
16,689✔
253
}
16,689✔
254

255
void Transaction::end_read()
256
{
98,046✔
257
    if (m_transact_stage == DB::transact_Ready)
98,046✔
258
        return;
6✔
259
    if (m_transact_stage == DB::transact_Writing)
98,040✔
260
        throw WrongTransactionState("Illegal end_read when in write mode");
×
261
    do_end_read();
98,040✔
262
}
98,040✔
263

264
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
265
{
339,201✔
266
    check_attached();
339,201✔
267
    if (m_transact_stage != DB::transact_Writing)
339,201✔
268
        throw WrongTransactionState("Not a write transaction");
×
269

270
    flush_accessors_for_commit();
339,201✔
271

272
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
339,201✔
273

274
    // advance read lock but dont update accessors:
275
    // As this is done under lock, along with the addition above of the newest commit,
276
    // we know for certain that the read lock we will grab WILL refer to our own newly
277
    // completed commit.
278

279
    try {
339,201✔
280
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
281
        // from going shortly to zero
282
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
339,201✔
283

284
        m_history = nullptr;
339,201✔
285
        set_transact_stage(DB::transact_Reading);
339,201✔
286

287
        if (commit_to_disk || m_oldest_version_not_persisted) {
339,201✔
288
            // Here we are either committing to disk or we are already
289
            // holding on to an older version. In either case there is
290
            // no need to hold onto this now historic version.
291
            db->release_read_lock(m_read_lock);
337,800✔
292
        }
337,800✔
293
        else {
1,401✔
294
            // We are not commiting to disk and there is no older
295
            // version not persisted, so hold onto this one
296
            m_oldest_version_not_persisted = m_read_lock;
1,401✔
297
        }
1,401✔
298

299
        if (commit_to_disk && m_oldest_version_not_persisted) {
339,201✔
300
            // We are committing to disk so we can release the
301
            // version we are holding on to
302
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
303
            m_oldest_version_not_persisted.reset();
12✔
304
        }
12✔
305
        m_read_lock = new_read_lock;
339,201✔
306
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
307
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
308
        // or older and former m_read_lock is older than current m_read_lock
309
        REALM_ASSERT(!m_oldest_version_not_persisted ||
339,201✔
310
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
339,201✔
311

312
        {
339,201✔
313
            util::CheckedLockGuard lock(m_async_mutex);
339,201✔
314
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
339,201✔
315
            if (commit_to_disk) {
339,201✔
316
                if (m_async_stage == AsyncState::Requesting) {
331,662✔
317
                    m_async_stage = AsyncState::HasLock;
×
318
                }
×
319
                else {
331,662✔
320
                    db->end_write_on_correct_thread();
331,662✔
321
                    m_async_stage = AsyncState::Idle;
331,662✔
322
                }
331,662✔
323
            }
331,662✔
324
            else {
7,539✔
325
                m_async_stage = AsyncState::HasCommits;
7,539✔
326
            }
7,539✔
327
        }
339,201✔
328

329
        // Remap file if it has grown, and update refs in underlying node structure.
330
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
339,201✔
331
        return VersionID{version, new_read_lock.m_reader_idx};
339,201✔
332
    }
339,201✔
333
    catch (std::exception& e) {
339,201✔
334
        if (db->m_logger) {
×
335
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
×
336
                              "Tr %1: Commit failed with exception: \"%2\"", m_log_id, e.what());
×
337
        }
×
338
        // In case of failure, further use of the transaction for reading is unsafe
339
        set_transact_stage(DB::transact_Ready);
×
340
        throw;
×
341
    }
×
342
}
339,201✔
343

344
VersionID Transaction::commit_and_continue_writing()
345
{
11,790✔
346
    check_attached();
11,790✔
347
    if (m_transact_stage != DB::transact_Writing)
11,790✔
348
        throw WrongTransactionState("Not a write transaction");
×
349

350
    // before committing, allow any accessors at group level or below to sync
351
    flush_accessors_for_commit();
11,790✔
352

353
    DB::version_type version = db->do_commit(*this); // Throws
11,790✔
354

355
    // We need to set m_read_lock in order for wait_for_change to work.
356
    // To set it, we grab a readlock on the latest available snapshot
357
    // and release it again.
358
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
11,790✔
359
    db->release_read_lock(m_read_lock);
11,790✔
360
    m_read_lock = lock_after_commit;
11,790✔
361
    if (Replication* repl = db->get_replication()) {
11,790✔
362
        bool history_updated = false;
11,790✔
363
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
11,790✔
364
    }
11,790✔
365

366
    bool writable = true;
11,790✔
367
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
11,790✔
368
    return VersionID{version, lock_after_commit.m_reader_idx};
11,790✔
369
}
11,790✔
370

371
TransactionRef Transaction::freeze()
372
{
6,042✔
373
    if (m_transact_stage != DB::transact_Reading)
6,042✔
374
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
375
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
376
    return db->start_frozen(version);
6,036✔
377
}
6,042✔
378

379
TransactionRef Transaction::duplicate()
380
{
57,573✔
381
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
57,573✔
382
    switch (m_transact_stage) {
57,573✔
383
        case DB::transact_Ready:
✔
384
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
385
        case DB::transact_Reading:
57,561✔
386
            return db->start_read(version);
57,561✔
387
        case DB::transact_Frozen:
✔
388
            return db->start_frozen(version);
×
389
        case DB::transact_Writing:
12✔
390
            if (get_commit_size() != 0)
12✔
391
                throw WrongTransactionState(
×
392
                    "Can only duplicate a write transaction before any changes have been made.");
×
393
            return db->start_read(version);
12✔
394
    }
57,573✔
395
    REALM_UNREACHABLE();
396
}
×
397

398
void Transaction::copy_to(TransactionRef dest) const
399
{
42✔
400
    _impl::CopyReplication repl(dest);
42✔
401
    replicate(dest.get(), repl);
42✔
402
}
42✔
403

404
_impl::History* Transaction::get_history() const
405
{
1,742,898✔
406
    if (!m_history) {
1,742,898✔
407
        if (auto repl = db->get_replication()) {
372,921✔
408
            switch (m_transact_stage) {
348,471✔
409
                case DB::transact_Reading:
98,541✔
410
                case DB::transact_Frozen:
98,541✔
411
                    if (!m_history_read)
98,541✔
412
                        m_history_read = repl->_create_history_read();
95,769✔
413
                    m_history = m_history_read.get();
98,541✔
414
                    m_history->set_group(const_cast<Transaction*>(this), false);
98,541✔
415
                    break;
98,541✔
416
                case DB::transact_Writing:
249,930✔
417
                    m_history = repl->_get_history_write();
249,930✔
418
                    break;
249,930✔
419
                case DB::transact_Ready:
✔
420
                    break;
×
421
            }
348,471✔
422
        }
348,471✔
423
    }
372,921✔
424
    return m_history;
1,742,898✔
425
}
1,742,898✔
426

427
Obj Transaction::import_copy_of(const Obj& original)
428
{
18,978✔
429
    if (bool(original) && original.is_valid()) {
18,978✔
430
        TableKey tk = original.get_table()->get_key();
18,948✔
431
        ObjKey rk = original.get_key();
18,948✔
432
        auto table = get_table(tk);
18,948✔
433
        if (table->is_valid(rk))
18,948✔
434
            return table->get_object(rk);
18,390✔
435
    }
18,948✔
436
    return {};
588✔
437
}
18,978✔
438

439
TableRef Transaction::import_copy_of(ConstTableRef original)
440
{
261,228✔
441
    TableKey tk = original->get_key();
261,228✔
442
    return get_table(tk);
261,228✔
443
}
261,228✔
444

445
LnkLst Transaction::import_copy_of(const LnkLst& original)
446
{
×
447
    if (Obj obj = import_copy_of(original.get_obj())) {
×
448
        ColKey ck = original.get_col_key();
×
449
        return obj.get_linklist(ck);
×
450
    }
×
451
    return LnkLst();
×
452
}
×
453

454
LstBasePtr Transaction::import_copy_of(const LstBase& original)
455
{
12✔
456
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
457
        ColKey ck = original.get_col_key();
6✔
458
        return obj.get_listbase_ptr(ck);
6✔
459
    }
6✔
460
    return {};
6✔
461
}
12✔
462

463
SetBasePtr Transaction::import_copy_of(const SetBase& original)
464
{
×
465
    if (Obj obj = import_copy_of(original.get_obj())) {
×
466
        ColKey ck = original.get_col_key();
×
467
        return obj.get_setbase_ptr(ck);
×
468
    }
×
469
    return {};
×
470
}
×
471

472
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
473
{
11,580✔
474
    if (Obj obj = import_copy_of(original.get_obj())) {
11,580✔
475
        auto path = original.get_short_path();
11,052✔
476
        return std::static_pointer_cast<CollectionBase>(obj.get_collection_ptr(path));
11,052✔
477
    }
11,052✔
478
    return {};
528✔
479
}
11,580✔
480

481
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
482
{
6✔
483
    if (!bool(original))
6✔
484
        return nullptr;
×
485
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
486
        ColKey ck = original->get_col_key();
6✔
487
        return obj.get_linklist_ptr(ck);
6✔
488
    }
6✔
489
    return std::make_unique<LnkLst>();
×
490
}
6✔
491

492
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
493
{
×
494
    if (!original)
×
495
        return nullptr;
×
496
    if (Obj obj = import_copy_of(original->get_obj())) {
×
497
        ColKey ck = original->get_col_key();
×
498
        return obj.get_linkset_ptr(ck);
×
499
    }
×
500
    return std::make_unique<LnkSet>();
×
501
}
×
502

503
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
504
{
67,362✔
505
    if (!original)
67,362✔
506
        return nullptr;
66,666✔
507
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
508
        ColKey ck = original->get_owning_col_key();
654✔
509
        return obj.get_linkcollection_ptr(ck);
654✔
510
    }
654✔
511
    // return some empty collection where size() == 0
512
    // the type shouldn't matter
513
    return std::make_unique<LnkLst>();
42✔
514
}
696✔
515

516
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
517
{
61,776✔
518
    return query.clone_for_handover(this, policy);
61,776✔
519
}
61,776✔
520

521
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
522
{
46,560✔
523
    return tv.clone_for_handover(this, policy);
46,560✔
524
}
46,560✔
525

526
void Transaction::upgrade_file_format(int target_file_format_version)
527
{
156✔
528
    REALM_ASSERT(is_attached());
156✔
529
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
156✔
530
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
531
        return;
30✔
532
    }
30✔
533

534
    // Be sure to revisit the following upgrade logic when a new file format
535
    // version is introduced. The following assert attempt to help you not
536
    // forget it.
537
    REALM_ASSERT_EX(target_file_format_version == 24, target_file_format_version);
126✔
538

539
    // DB::do_open() must ensure that only supported version are allowed.
540
    // It does that by asking backup if the current file format version is
541
    // included in the accepted versions, so be sure to align the list of
542
    // versions with the logic below
543

544
    int current_file_format_version = get_file_format_version();
126✔
545
    REALM_ASSERT(current_file_format_version < target_file_format_version);
126✔
546

547
    if (auto logger = get_logger()) {
126✔
548
        logger->info("Upgrading from file format version %1 to %2", current_file_format_version,
36✔
549
                     target_file_format_version);
36✔
550
    }
36✔
551
    // Ensure we have search index on all primary key columns.
552
    auto table_keys = get_table_keys();
126✔
553
    if (current_file_format_version < 22) {
126✔
554
        for (auto k : table_keys) {
114✔
555
            auto t = get_table(k);
114✔
556
            if (auto col = t->get_primary_key_column()) {
114✔
557
                t->do_add_search_index(col, IndexType::General);
78✔
558
            }
78✔
559
        }
114✔
560
    }
48✔
561

562
    if (current_file_format_version == 22) {
126✔
563
        // Check that asymmetric table are empty
564
        for (auto k : table_keys) {
180✔
565
            auto t = get_table(k);
180✔
566
            if (t->is_asymmetric() && t->size() > 0) {
180✔
567
                t->clear();
6✔
568
            }
6✔
569
        }
180✔
570
    }
66✔
571
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
126✔
572
        // Upgrade Set and Dictionary columns
573
        for (auto k : table_keys) {
180✔
574
            auto t = get_table(k);
180✔
575
            t->migrate_sets_and_dictionaries();
180✔
576
        }
180✔
577
    }
66✔
578
    if (current_file_format_version < 24) {
126✔
579
        for (auto k : table_keys) {
354✔
580
            auto t = get_table(k);
354✔
581
            t->migrate_set_orderings(); // rewrite sets to use the new string/binary order
354✔
582
            // Although StringIndex sort order has been changed in this format, we choose to
583
            // avoid upgrading them because it affects a small niche case. Instead, there is a
584
            // workaround in the String Index search code for not relying on items being ordered.
585
            t->migrate_col_keys();
354✔
586
        }
354✔
587
    }
126✔
588
    // NOTE: Additional future upgrade steps go here.
589
}
126✔
590

591
void Transaction::promote_to_async()
592
{
7,542✔
593
    util::CheckedLockGuard lck(m_async_mutex);
7,542✔
594
    if (m_async_stage == AsyncState::Idle) {
7,542✔
595
        m_async_stage = AsyncState::HasLock;
372✔
596
    }
372✔
597
}
7,542✔
598

599
void Transaction::replicate(Transaction* dest, Replication& repl) const
600
{
78✔
601
    // We should only create entries for public tables
602
    std::vector<TableKey> public_table_keys;
78✔
603
    for (auto tk : get_table_keys()) {
222✔
604
        if (table_is_public(tk))
222✔
605
            public_table_keys.push_back(tk);
180✔
606
    }
222✔
607

608
    // Create tables
609
    for (auto tk : public_table_keys) {
180✔
610
        auto table = get_table(tk);
180✔
611
        auto table_name = table->get_name();
180✔
612
        if (!table->is_embedded()) {
180✔
613
            auto pk_col = table->get_primary_key_column();
138✔
614
            if (!pk_col)
138✔
615
                throw RuntimeError(
×
616
                    ErrorCodes::BrokenInvariant,
×
617
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
618
            auto pk_name = table->get_column_name(pk_col);
138✔
619
            if (pk_name != "_id")
138✔
620
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
621
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
622
                                                Group::table_name_to_class_name(table_name), pk_name));
×
623
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
624
                                            pk_col.is_nullable(), table->get_table_type());
138✔
625
        }
138✔
626
        else {
42✔
627
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
628
        }
42✔
629
    }
180✔
630
    // Create columns
631
    for (auto tk : public_table_keys) {
180✔
632
        auto table = get_table(tk);
180✔
633
        auto pk_col = table->get_primary_key_column();
180✔
634
        auto cols = table->get_column_keys();
180✔
635
        for (auto col : cols) {
462✔
636
            if (col == pk_col)
462✔
637
                continue;
138✔
638
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
324✔
639
                               table->get_opposite_table(col).unchecked_ptr());
324✔
640
        }
324✔
641
    }
180✔
642
    dest->commit_and_continue_writing();
78✔
643
    // Now the schema should be in place - create the objects
644
#ifdef REALM_DEBUG
78✔
645
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
646
#else
647
    constexpr int number_of_objects_to_create_before_committing = 1000;
648
#endif
649
    auto n = number_of_objects_to_create_before_committing;
78✔
650
    for (auto tk : public_table_keys) {
168✔
651
        auto table = get_table(tk);
168✔
652
        if (table->is_embedded())
168✔
653
            continue;
42✔
654
        auto pk_col = table->get_primary_key_column();
126✔
655
        auto cols = get_col_info(table.unchecked_ptr());
126✔
656
        for (auto o : *table) {
750✔
657
            auto obj_key = o.get_key();
750✔
658
            Mixed pk = o.get_any(pk_col);
750✔
659
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
660
            generate_properties_for_obj(repl, o, cols);
750✔
661
            if (--n == 0) {
750✔
662
                dest->commit_and_continue_writing();
6✔
663
                n = number_of_objects_to_create_before_committing;
6✔
664
            }
6✔
665
        }
750✔
666
    }
126✔
667
}
78✔
668

669
void Transaction::complete_async_commit()
670
{
1,386✔
671
    // sync to disk:
672
    DB::ReadLockInfo read_lock;
1,386✔
673
    try {
1,386✔
674
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,386✔
675
        if (db->m_logger) {
1,386✔
676
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,386✔
677
                              "Tr %1: Committing ref %2 to disk", m_log_id, read_lock.m_top_ref);
1,386✔
678
        }
1,386✔
679
        GroupCommitter out(*this);
1,386✔
680
        out.commit(read_lock.m_top_ref); // Throws
1,386✔
681
        // we must release the write mutex before the callback, because the callback
682
        // is allowed to re-request it.
683
        db->release_read_lock(read_lock);
1,386✔
684
        if (m_oldest_version_not_persisted) {
1,386✔
685
            db->release_read_lock(*m_oldest_version_not_persisted);
1,380✔
686
            m_oldest_version_not_persisted.reset();
1,380✔
687
        }
1,380✔
688
    }
1,386✔
689
    catch (const std::exception& e) {
1,386✔
690
        m_commit_exception = std::current_exception();
6✔
691
        if (db->m_logger) {
6✔
692
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
6✔
693
                              "Tr %1: Committing to disk failed with exception: \"%2\"", m_log_id, e.what());
6✔
694
        }
6✔
695
        m_async_commit_has_failed = true;
6✔
696
        db->release_read_lock(read_lock);
6✔
697
    }
6✔
698
}
1,386✔
699

700
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
701
{
10,317✔
702
    util::CheckedLockGuard lck(m_async_mutex);
10,317✔
703
    if (m_async_stage == AsyncState::HasLock) {
10,317✔
704
        // Nothing to commit to disk - just release write lock
705
        m_async_stage = AsyncState::Idle;
54✔
706
        db->async_end_write();
54✔
707
    }
54✔
708
    else if (m_async_stage == AsyncState::HasCommits) {
10,263✔
709
        m_async_stage = AsyncState::Syncing;
1,362✔
710
        m_commit_exception = std::exception_ptr();
1,362✔
711
        // get a callback on the helper thread, in which to sync to disk
712
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,362✔
713
            complete_async_commit();
1,362✔
714
            util::CheckedLockGuard lck(m_async_mutex);
1,362✔
715
            m_async_stage = AsyncState::Idle;
1,362✔
716
            if (m_waiting_for_sync) {
1,362✔
717
                m_waiting_for_sync = false;
366✔
718
                m_async_cv.notify_all();
366✔
719
            }
366✔
720
            else {
996✔
721
                cb();
996✔
722
            }
996✔
723
        });
1,362✔
724
    }
1,362✔
725
}
10,317✔
726

727
void Transaction::prepare_for_close()
728
{
2,342,274✔
729
    util::CheckedLockGuard lck(m_async_mutex);
2,342,274✔
730
    switch (m_async_stage) {
2,342,274✔
731
        case AsyncState::Idle:
2,347,578✔
732
            break;
2,347,578✔
733

734
        case AsyncState::Requesting:
24✔
735
            // We don't have the ability to cancel a wait on the write lock, so
736
            // unfortunately we have to wait for it to be acquired.
737
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
738
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
739
            m_waiting_for_write_lock = true;
24✔
740
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
741
                return !m_waiting_for_write_lock;
48✔
742
            });
48✔
743
            db->end_write_on_correct_thread();
24✔
744
            break;
24✔
745

746
        case AsyncState::HasLock:
30✔
747
            // We have the lock and are currently in a write transaction, and
748
            // also may have some pending previous commits to write
749
            if (m_transact_stage == DB::transact_Writing) {
30✔
750
                db->reset_free_space_tracking();
18✔
751
                m_transact_stage = DB::transact_Reading;
18✔
752
            }
18✔
753
            if (m_oldest_version_not_persisted) {
30✔
754
                complete_async_commit();
×
755
            }
×
756
            db->end_write_on_correct_thread();
30✔
757
            break;
30✔
758

759
        case AsyncState::HasCommits:
24✔
760
            // We have commits which need to be synced to disk, so do that
761
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
762
            complete_async_commit();
24✔
763
            db->end_write_on_correct_thread();
24✔
764
            break;
24✔
765

766
        case AsyncState::Syncing:
24✔
767
            // The worker thread is currently writing, so wait for it to complete
768
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
769
            m_waiting_for_sync = true;
24✔
770
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
771
                return !m_waiting_for_sync;
48✔
772
            });
48✔
773
            break;
24✔
774
    }
2,342,274✔
775
    m_async_stage = AsyncState::Idle;
2,347,851✔
776
}
2,347,851✔
777

778
void Transaction::acquire_write_lock()
779
{
335,310✔
780
    util::CheckedUniqueLock lck(m_async_mutex);
335,310✔
781
    switch (m_async_stage) {
335,310✔
782
        case AsyncState::Idle:
334,863✔
783
            lck.unlock();
334,863✔
784
            db->do_begin_possibly_async_write();
334,863✔
785
            return;
334,863✔
786

787
        case AsyncState::Requesting:
105✔
788
            m_waiting_for_write_lock = true;
105✔
789
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
210✔
790
                return !m_waiting_for_write_lock;
210✔
791
            });
210✔
792
            return;
105✔
793

794
        case AsyncState::HasLock:
✔
795
        case AsyncState::HasCommits:
✔
796
            return;
×
797

798
        case AsyncState::Syncing:
342✔
799
            m_waiting_for_sync = true;
342✔
800
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
684✔
801
                return !m_waiting_for_sync;
684✔
802
            });
684✔
803
            lck.unlock();
342✔
804
            db->do_begin_possibly_async_write();
342✔
805
            break;
342✔
806
    }
335,310✔
807
}
335,310✔
808

809
void Transaction::do_end_read() noexcept
810
{
2,244,456✔
811
    if (db->m_logger)
2,244,456✔
812
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "End transaction %1", m_log_id);
708,189✔
813

814
    prepare_for_close();
2,244,456✔
815
    detach();
2,244,456✔
816

817
    // We should always be ensuring that async commits finish before we get here,
818
    // but if the fsync() failed or we failed to update the top pointer then
819
    // there's not much we can do and we have to just accept that we're losing
820
    // those commits.
821
    if (m_oldest_version_not_persisted) {
2,244,456✔
822
        REALM_ASSERT(m_async_commit_has_failed);
6✔
823
        // We need to not release our read lock on m_oldest_version_not_persisted
824
        // as that's the version the top pointer is referencing and overwriting
825
        // that version will corrupt the Realm file.
826
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
827
    }
6✔
828
    db->release_read_lock(m_read_lock);
2,244,456✔
829

830
    m_alloc.note_reader_end(this);
2,244,456✔
831
    set_transact_stage(DB::transact_Ready);
2,244,456✔
832
    // reset the std::shared_ptr to allow the DB object to release resources
833
    // as early as possible.
834
    db.reset();
2,244,456✔
835
}
2,244,456✔
836

837
// This is the same as do_end_read() above, but with the requirement that
838
// 1) This is called with the db->mutex locked already
839
// 2) No async commits outstanding
840
void Transaction::close_read_with_lock()
841
{
144✔
842
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
144✔
843
    {
144✔
844
        util::CheckedLockGuard lck(m_async_mutex);
144✔
845
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
144✔
846
    }
144✔
847

848
    detach();
144✔
849
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
144✔
850
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
144✔
851
                    m_oldest_version_not_persisted->m_file_size);
144✔
852
    db->do_release_read_lock(m_read_lock);
144✔
853

854
    m_alloc.note_reader_end(this);
144✔
855
    set_transact_stage(DB::transact_Ready);
144✔
856
    // reset the std::shared_ptr to allow the DB object to release resources
857
    // as early as possible.
858
    db.reset();
144✔
859
}
144✔
860

861

862
void Transaction::initialize_replication()
863
{
×
864
    if (m_transact_stage == DB::transact_Writing) {
×
865
        if (Replication* repl = get_replication()) {
×
866
            auto current_version = m_read_lock.m_version;
×
867
            bool history_updated = false;
×
868
            repl->initiate_transact(*this, current_version, history_updated); // Throws
×
869
        }
×
870
    }
×
871
}
×
872

873
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
874
{
5,186,268✔
875
    m_transact_stage = stage;
5,186,268✔
876
}
5,186,268✔
877

878
class NodeTree {
879
public:
880
    NodeTree(size_t evac_limit, size_t work_limit)
881
        : m_evac_limit(evac_limit)
2,667✔
882
        , m_work_limit(int64_t(work_limit))
2,667✔
883
        , m_moved(0)
2,667✔
884
    {
5,331✔
885
    }
5,331✔
886
    ~NodeTree()
887
    {
5,331✔
888
        // std::cout << "Moved: " << m_moved << std::endl;
889
    }
5,331✔
890

891
    /// Function used to traverse the node tree and "copy on write" nodes
892
    /// that are found above the evac_limit. The function will return
893
    /// when either the whole tree has been travesed or when the work_limit
894
    /// has been reached.
895
    /// \param current_node - node to process.
896
    /// \param level - the level at which current_node is placed in the tree
897
    /// \param progress - When the traversal is initiated, this vector identifies at which
898
    ///                   node the process should be resumed. It is subesequently updated
899
    ///                   to point to the node we have just processed
900
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
901
    {
1,286,529✔
902
        if (m_work_limit < 0) {
1,286,529✔
903
            return false;
5,250✔
904
        }
5,250✔
905
        if (current_node.is_read_only()) {
1,281,279✔
906
            size_t byte_size = current_node.get_byte_size();
1,254,312✔
907
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,254,312✔
908
                current_node.copy_on_write();
1,172,829✔
909
                m_moved++;
1,172,829✔
910
                m_work_limit -= byte_size;
1,172,829✔
911
            }
1,172,829✔
912
        }
1,254,312✔
913

914
        if (current_node.has_refs()) {
1,281,279✔
915
            auto sz = current_node.size();
41,838✔
916
            m_work_limit -= sz;
41,838✔
917
            if (progress.size() == level) {
41,838✔
918
                progress.push_back(0);
10,575✔
919
            }
10,575✔
920
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
41,838✔
921
            size_t ndx = progress[level];
41,838✔
922
            while (ndx < sz) {
1,306,746✔
923
                auto val = current_node.get(ndx);
1,296,276✔
924
                if (val && !(val & 1)) {
1,296,276✔
925
                    Array arr(current_node.get_alloc());
1,281,516✔
926
                    arr.set_parent(&current_node, ndx);
1,281,516✔
927
                    arr.init_from_parent();
1,281,516✔
928
                    if (!trv(arr, level + 1, progress)) {
1,281,516✔
929
                        return false;
31,368✔
930
                    }
31,368✔
931
                }
1,281,516✔
932
                ndx = ++progress[level];
1,264,908✔
933
            }
1,264,908✔
934
            while (progress.size() > level)
20,955✔
935
                progress.pop_back();
10,485✔
936
        }
10,470✔
937
        return true;
1,249,911✔
938
    }
1,281,279✔
939

940
private:
941
    size_t m_evac_limit;
942
    int64_t m_work_limit;
943
    size_t m_moved;
944
};
945

946

947
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
948
{
5,331✔
949
    NodeTree node_tree(evac_limit, work_limit);
5,331✔
950
    if (progress.empty()) {
5,331✔
951
        progress.push_back(s_table_name_ndx);
99✔
952
    }
99✔
953
    if (progress[0] == s_table_name_ndx) {
5,331✔
954
        if (!node_tree.trv(m_table_names, 1, progress))
99✔
955
            return;
×
956
        progress.back() = s_table_refs_ndx; // Handle tables next
99✔
957
    }
99✔
958
    if (progress[0] == s_table_refs_ndx) {
5,331✔
959
        if (!node_tree.trv(m_tables, 1, progress))
5,331✔
960
            return;
5,250✔
961
        progress.back() = s_hist_ref_ndx; // Handle history next
81✔
962
    }
81✔
963
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
81✔
964
        Array hist_arr(m_top.get_alloc());
72✔
965
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
72✔
966
        hist_arr.init_from_parent();
72✔
967
        if (!node_tree.trv(hist_arr, 1, progress))
72✔
968
            return;
×
969
    }
72✔
970
    progress.clear();
81✔
971
}
81✔
972

973
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc