• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / jorgen.edelbo_402

21 Aug 2024 11:10AM UTC coverage: 91.054% (-0.03%) from 91.085%
jorgen.edelbo_402

Pull #7803

Evergreen

jedelbo
Small fix to Table::typed_write

When writing the realm to a new file from a write transaction,
the Table may be COW so that the top ref is changed. So don't
use the ref that is present in the group when the operation starts.
Pull Request #7803: Feature/string compression

103494 of 181580 branches covered (57.0%)

1929 of 1999 new or added lines in 46 files covered. (96.5%)

695 existing lines in 51 files now uncovered.

220142 of 241772 relevant lines covered (91.05%)

7344461.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.8
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26

27
namespace {
28

29
using namespace realm;
30
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
31

32
ColInfo get_col_info(const Table* table)
33
{
2,034✔
34
    std::vector<std::pair<ColKey, Table*>> cols;
2,034✔
35
    if (table) {
2,034✔
36
        for (auto col : table->get_column_keys()) {
666✔
37
            Table* embedded_table = nullptr;
666✔
38
            if (auto target_table = table->get_opposite_table(col)) {
666✔
39
                if (target_table->is_embedded())
216✔
40
                    embedded_table = target_table.unchecked_ptr();
168✔
41
            }
216✔
42
            cols.emplace_back(col, embedded_table);
666✔
43
        }
666✔
44
    }
282✔
45
    return cols;
2,034✔
46
}
2,034✔
47

48
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded);
49

50
void add_dictionary_to_repl(Dictionary& dict, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
51
{
114✔
52
    size_t sz = dict.size();
114✔
53
    for (size_t n = 0; n < sz; ++n) {
216✔
54
        const auto& [key, val] = dict.get_pair(n);
102✔
55
        if (val.is_type(type_List)) {
102✔
56
            repl.dictionary_insert(dict, n, key, Mixed{0, CollectionType::List});
6✔
57
            auto n_list = dict.get_list({key.get_string()});
6✔
58
            add_list_to_repl(*n_list, repl, nullptr);
6✔
59
        }
6✔
60
        else if (val.is_type(type_Dictionary)) {
96✔
61
            repl.dictionary_insert(dict, n, key, Mixed{0, CollectionType::Dictionary});
6✔
62
            auto n_dict = dict.get_dictionary({key.get_string()});
6✔
63
            add_dictionary_to_repl(*n_dict, repl, nullptr);
6✔
64
        }
6✔
65
        else {
90✔
66
            repl.dictionary_insert(dict, n, key, val);
90✔
67
            if (update_embedded) {
90✔
68
                update_embedded(val);
42✔
69
            }
42✔
70
        }
90✔
71
    }
102✔
72
}
114✔
73

74
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
75
{
114✔
76
    auto sz = list.size();
114✔
77
    for (size_t n = 0; n < sz; n++) {
246✔
78
        auto val = list.get_any(n);
132✔
79
        if (val.is_type(type_List)) {
132✔
80
            repl.list_insert(list, n, Mixed{0, CollectionType::List}, n);
6✔
81
            auto n_list = list.get_list({n});
6✔
82
            add_list_to_repl(*n_list, repl, nullptr);
6✔
83
        }
6✔
84
        else if (val.is_type(type_Dictionary)) {
126✔
85
            repl.list_insert(list, n, Mixed{0, CollectionType::Dictionary}, n);
6✔
86
            auto n_dict = list.get_dictionary({n});
6✔
87
            add_dictionary_to_repl(*n_dict, repl, nullptr);
6✔
88
        }
6✔
89
        else {
120✔
90
            repl.list_insert(list, n, val, n);
120✔
91
            if (update_embedded) {
120✔
92
                update_embedded(val);
18✔
93
            }
18✔
94
        }
120✔
95
    }
132✔
96
}
114✔
97

98
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
99
{
846✔
100
    for (auto elem : cols) {
1,908✔
101
        auto col = elem.first;
1,908✔
102
        auto embedded_table = elem.second;
1,908✔
103
        auto cols_2 = get_col_info(embedded_table);
1,908✔
104
        util::UniqueFunction<void(Mixed)> update_embedded = nullptr;
1,908✔
105
        if (embedded_table) {
1,908✔
106
            update_embedded = [&](Mixed val) {
156✔
107
                if (val.is_null()) {
114✔
108
                    return;
18✔
109
                }
18✔
110
                REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
111
                Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
112
                generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
113
                return;
96✔
114
            };
114✔
115
        }
156✔
116

117
        if (col.is_list()) {
1,908✔
118
            auto list = obj.get_listbase_ptr(col);
96✔
119
            repl.list_clear(*list);
96✔
120
            add_list_to_repl(*list, repl, std::move(update_embedded));
96✔
121
        }
96✔
122
        else if (col.is_set()) {
1,812✔
123
            auto set = obj.get_setbase_ptr(col);
18✔
124
            auto sz = set->size();
18✔
125
            for (size_t n = 0; n < sz; n++) {
54✔
126
                repl.set_insert(*set, n, set->get_any(n));
36✔
127
                // Sets cannot have embedded objects
128
            }
36✔
129
        }
18✔
130
        else if (col.is_dictionary()) {
1,794✔
131
            auto dict = obj.get_dictionary(col);
96✔
132
            add_dictionary_to_repl(dict, repl, std::move(update_embedded));
96✔
133
        }
96✔
134
        else {
1,698✔
135
            auto val = obj.get_any(col);
1,698✔
136
            if (val.is_type(type_List)) {
1,698✔
137
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::List));
6✔
138
                Lst<Mixed> list(obj, col);
6✔
139
                add_list_to_repl(list, repl, std::move(update_embedded));
6✔
140
            }
6✔
141
            else if (val.is_type(type_Dictionary)) {
1,692✔
142
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::Dictionary));
6✔
143
                Dictionary dict(obj, col);
6✔
144
                add_dictionary_to_repl(dict, repl, std::move(update_embedded));
6✔
145
            }
6✔
146
            else {
1,686✔
147
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,686✔
148
                if (update_embedded)
1,686✔
149
                    update_embedded(val);
54✔
150
            }
1,686✔
151
        }
1,698✔
152
    }
1,908✔
153
}
846✔
154

155
} // namespace
156

157
namespace realm {
158

159
std::map<DB::TransactStage, const char*> log_stage = {
160
    {DB::TransactStage::transact_Frozen, "frozen"},
161
    {DB::TransactStage::transact_Writing, "write"},
162
    {DB::TransactStage::transact_Reading, "read"},
163
};
164

165
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
166
    : Group(alloc)
1,234,872✔
167
    , db(_db)
1,234,872✔
168
    , m_read_lock(rli)
1,234,872✔
169
    , m_log_id(util::gen_log_id(this))
1,234,872✔
170
{
1,926,825✔
171
    bool writable = stage == DB::transact_Writing;
1,926,825✔
172
    m_transact_stage = DB::transact_Ready;
1,926,825✔
173
    set_transact_stage(stage);
1,926,825✔
174
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
1,926,825✔
175
                  VersionID{rli.m_version, rli.m_reader_idx});
1,926,825✔
176
    if (db->m_logger) {
1,926,825✔
177
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "Start %1 %2: %3 ref %4",
723,771✔
178
                          log_stage[stage], m_log_id, rli.m_version, m_read_lock.m_top_ref);
723,771✔
179
    }
723,771✔
180
}
1,926,825✔
181

182
Transaction::~Transaction()
183
{
1,926,969✔
184
    // Note that this does not call close() - calling close() is done
185
    // implicitly by the deleter.
186
}
1,926,969✔
187

188
void Transaction::close()
189
{
1,951,461✔
190
    if (m_transact_stage == DB::transact_Writing) {
1,951,461✔
191
        rollback();
16,029✔
192
    }
16,029✔
193
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
1,951,461✔
194
        do_end_read();
1,552,779✔
195
    }
1,552,779✔
196
}
1,951,461✔
197

198
size_t Transaction::get_commit_size() const
199
{
46,254✔
200
    size_t sz = 0;
46,254✔
201
    if (m_transact_stage == DB::transact_Writing) {
46,254✔
202
        sz = m_alloc.get_commit_size();
14,652✔
203
    }
14,652✔
204
    return sz;
46,254✔
205
}
46,254✔
206

207
DB::version_type Transaction::commit()
208
{
258,219✔
209
    check_attached();
258,219✔
210

211
    if (m_transact_stage != DB::transact_Writing)
258,219✔
212
        throw WrongTransactionState("Not a write transaction");
×
213

214
    REALM_ASSERT(is_attached());
258,219✔
215

216
    // before committing, allow any accessors at group level or below to sync
217
    flush_accessors_for_commit();
258,219✔
218

219
    DB::version_type new_version = db->do_commit(*this); // Throws
258,219✔
220

221
    // We need to set m_read_lock in order for wait_for_change to work.
222
    // To set it, we grab a readlock on the latest available snapshot
223
    // and release it again.
224
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
258,219✔
225
    db->release_read_lock(lock_after_commit);
258,219✔
226

227
    db->end_write_on_correct_thread();
258,219✔
228

229
    do_end_read();
258,219✔
230
    m_read_lock = lock_after_commit;
258,219✔
231

232
    return new_version;
258,219✔
233
}
258,219✔
234

235
void Transaction::rollback()
236
{
17,001✔
237
    // rollback may happen as a consequence of exception handling in cases where
238
    // the DB has detached. If so, just back out without trying to change state.
239
    // the DB object has already been closed and no further processing is possible.
240
    if (!is_attached())
17,001✔
241
        return;
6✔
242
    if (m_transact_stage == DB::transact_Ready)
16,995✔
243
        return; // Idempotency
×
244

245
    if (m_transact_stage != DB::transact_Writing)
16,995✔
246
        throw WrongTransactionState("Not a write transaction");
×
247
    db->reset_free_space_tracking();
16,995✔
248
    if (!holds_write_mutex())
16,995✔
249
        db->end_write_on_correct_thread();
16,995✔
250

251
    do_end_read();
16,995✔
252
}
16,995✔
253

254
void Transaction::end_read()
255
{
98,019✔
256
    if (m_transact_stage == DB::transact_Ready)
98,019✔
257
        return;
6✔
258
    if (m_transact_stage == DB::transact_Writing)
98,013✔
259
        throw WrongTransactionState("Illegal end_read when in write mode");
×
260
    do_end_read();
98,013✔
261
}
98,013✔
262

263
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
264
{
342,102✔
265
    check_attached();
342,102✔
266
    if (m_transact_stage != DB::transact_Writing)
342,102✔
267
        throw WrongTransactionState("Not a write transaction");
×
268

269
    flush_accessors_for_commit();
342,102✔
270

271
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
342,102✔
272

273
    // advance read lock but dont update accessors:
274
    // As this is done under lock, along with the addition above of the newest commit,
275
    // we know for certain that the read lock we will grab WILL refer to our own newly
276
    // completed commit.
277

278
    try {
342,102✔
279
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
280
        // from going shortly to zero
281
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
342,102✔
282

283
        m_history = nullptr;
342,102✔
284
        set_transact_stage(DB::transact_Reading);
342,102✔
285

286
        if (commit_to_disk || m_oldest_version_not_persisted) {
342,102✔
287
            // Here we are either committing to disk or we are already
288
            // holding on to an older version. In either case there is
289
            // no need to hold onto this now historic version.
290
            db->release_read_lock(m_read_lock);
340,701✔
291
        }
340,701✔
292
        else {
1,401✔
293
            // We are not commiting to disk and there is no older
294
            // version not persisted, so hold onto this one
295
            m_oldest_version_not_persisted = m_read_lock;
1,401✔
296
        }
1,401✔
297

298
        if (commit_to_disk && m_oldest_version_not_persisted) {
342,102✔
299
            // We are committing to disk so we can release the
300
            // version we are holding on to
301
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
302
            m_oldest_version_not_persisted.reset();
12✔
303
        }
12✔
304
        m_read_lock = new_read_lock;
342,102✔
305
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
306
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
307
        // or older and former m_read_lock is older than current m_read_lock
308
        REALM_ASSERT(!m_oldest_version_not_persisted ||
342,102✔
309
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
342,102✔
310

311
        {
342,102✔
312
            util::CheckedLockGuard lock(m_async_mutex);
342,102✔
313
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
342,102✔
314
            if (commit_to_disk) {
342,102✔
315
                if (m_async_stage == AsyncState::Requesting) {
334,563✔
316
                    m_async_stage = AsyncState::HasLock;
×
317
                }
×
318
                else {
334,563✔
319
                    db->end_write_on_correct_thread();
334,563✔
320
                    m_async_stage = AsyncState::Idle;
334,563✔
321
                }
334,563✔
322
            }
334,563✔
323
            else {
7,539✔
324
                m_async_stage = AsyncState::HasCommits;
7,539✔
325
            }
7,539✔
326
        }
342,102✔
327

328
        // Remap file if it has grown, and update refs in underlying node structure.
329
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
342,102✔
330
        return VersionID{version, new_read_lock.m_reader_idx};
342,102✔
331
    }
342,102✔
332
    catch (std::exception& e) {
342,102✔
333
        if (db->m_logger) {
×
334
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
×
335
                              "Tr %1: Commit failed with exception: \"%2\"", m_log_id, e.what());
×
336
        }
×
337
        // In case of failure, further use of the transaction for reading is unsafe
338
        set_transact_stage(DB::transact_Ready);
×
339
        throw;
×
340
    }
×
341
}
342,102✔
342

343
VersionID Transaction::commit_and_continue_writing()
344
{
25,692✔
345
    check_attached();
25,692✔
346
    if (m_transact_stage != DB::transact_Writing)
25,692✔
347
        throw WrongTransactionState("Not a write transaction");
×
348

349
    // before committing, allow any accessors at group level or below to sync
350
    flush_accessors_for_commit();
25,692✔
351

352
    DB::version_type version = db->do_commit(*this); // Throws
25,692✔
353

354
    // We need to set m_read_lock in order for wait_for_change to work.
355
    // To set it, we grab a readlock on the latest available snapshot
356
    // and release it again.
357
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
25,692✔
358
    db->release_read_lock(m_read_lock);
25,692✔
359
    m_read_lock = lock_after_commit;
25,692✔
360
    if (Replication* repl = db->get_replication()) {
25,692✔
361
        bool history_updated = false;
25,692✔
362
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
25,692✔
363
    }
25,692✔
364

365
    bool writable = true;
25,692✔
366
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
25,692✔
367
    return VersionID{version, lock_after_commit.m_reader_idx};
25,692✔
368
}
25,692✔
369

370
TransactionRef Transaction::freeze()
371
{
6,042✔
372
    if (m_transact_stage != DB::transact_Reading)
6,042✔
373
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
374
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
375
    return db->start_frozen(version);
6,036✔
376
}
6,042✔
377

378
TransactionRef Transaction::duplicate()
379
{
59,388✔
380
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
59,388✔
381
    switch (m_transact_stage) {
59,388✔
382
        case DB::transact_Ready:
✔
383
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
384
        case DB::transact_Reading:
59,376✔
385
            return db->start_read(version);
59,376✔
386
        case DB::transact_Frozen:
✔
387
            return db->start_frozen(version);
×
388
        case DB::transact_Writing:
12✔
389
            if (get_commit_size() != 0)
12✔
390
                throw WrongTransactionState(
×
391
                    "Can only duplicate a write transaction before any changes have been made.");
×
392
            return db->start_read(version);
12✔
393
    }
59,388✔
394
    REALM_UNREACHABLE();
395
}
×
396

397
void Transaction::copy_to(TransactionRef dest) const
398
{
42✔
399
    _impl::CopyReplication repl(dest);
42✔
400
    replicate(dest.get(), repl);
42✔
401
}
42✔
402

403
_impl::History* Transaction::get_history() const
404
{
2,397,537✔
405
    if (!m_history) {
2,397,537✔
406
        if (auto repl = db->get_replication()) {
370,347✔
407
            switch (m_transact_stage) {
345,879✔
408
                case DB::transact_Reading:
99,333✔
409
                case DB::transact_Frozen:
99,333✔
410
                    if (!m_history_read)
99,333✔
411
                        m_history_read = repl->_create_history_read();
96,591✔
412
                    m_history = m_history_read.get();
99,333✔
413
                    m_history->set_group(const_cast<Transaction*>(this), false);
99,333✔
414
                    break;
99,333✔
415
                case DB::transact_Writing:
246,558✔
416
                    m_history = repl->_get_history_write();
246,558✔
417
                    break;
246,558✔
418
                case DB::transact_Ready:
✔
419
                    break;
×
420
            }
345,879✔
421
        }
345,879✔
422
    }
370,347✔
423
    return m_history;
2,397,552✔
424
}
2,397,537✔
425

426
Obj Transaction::import_copy_of(const Obj& original)
427
{
18,978✔
428
    if (bool(original) && original.is_valid()) {
18,978✔
429
        TableKey tk = original.get_table()->get_key();
18,948✔
430
        ObjKey rk = original.get_key();
18,948✔
431
        auto table = get_table(tk);
18,948✔
432
        if (table->is_valid(rk))
18,948✔
433
            return table->get_object(rk);
18,390✔
434
    }
18,948✔
435
    return {};
588✔
436
}
18,978✔
437

438
TableRef Transaction::import_copy_of(ConstTableRef original)
439
{
266,655✔
440
    TableKey tk = original->get_key();
266,655✔
441
    return get_table(tk);
266,655✔
442
}
266,655✔
443

444
LnkLst Transaction::import_copy_of(const LnkLst& original)
445
{
×
446
    if (Obj obj = import_copy_of(original.get_obj())) {
×
447
        ColKey ck = original.get_col_key();
×
448
        return obj.get_linklist(ck);
×
449
    }
×
450
    return LnkLst();
×
451
}
×
452

453
LstBasePtr Transaction::import_copy_of(const LstBase& original)
454
{
12✔
455
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
456
        ColKey ck = original.get_col_key();
6✔
457
        return obj.get_listbase_ptr(ck);
6✔
458
    }
6✔
459
    return {};
6✔
460
}
12✔
461

462
SetBasePtr Transaction::import_copy_of(const SetBase& original)
463
{
×
464
    if (Obj obj = import_copy_of(original.get_obj())) {
×
465
        ColKey ck = original.get_col_key();
×
466
        return obj.get_setbase_ptr(ck);
×
467
    }
×
468
    return {};
×
469
}
×
470

471
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
472
{
11,580✔
473
    if (Obj obj = import_copy_of(original.get_obj())) {
11,580✔
474
        auto path = original.get_short_path();
11,052✔
475
        return std::static_pointer_cast<CollectionBase>(obj.get_collection_ptr(path));
11,052✔
476
    }
11,052✔
477
    return {};
528✔
478
}
11,580✔
479

480
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
481
{
6✔
482
    if (!bool(original))
6✔
483
        return nullptr;
×
484
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
485
        ColKey ck = original->get_col_key();
6✔
486
        return obj.get_linklist_ptr(ck);
6✔
487
    }
6✔
488
    return std::make_unique<LnkLst>();
×
489
}
6✔
490

491
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
492
{
×
493
    if (!original)
×
494
        return nullptr;
×
495
    if (Obj obj = import_copy_of(original->get_obj())) {
×
496
        ColKey ck = original->get_col_key();
×
497
        return obj.get_linkset_ptr(ck);
×
498
    }
×
499
    return std::make_unique<LnkSet>();
×
500
}
×
501

502
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
503
{
69,171✔
504
    if (!original)
69,171✔
505
        return nullptr;
68,475✔
506
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
507
        ColKey ck = original->get_owning_col_key();
654✔
508
        return obj.get_linkcollection_ptr(ck);
654✔
509
    }
654✔
510
    // return some empty collection where size() == 0
511
    // the type shouldn't matter
512
    return std::make_unique<LnkLst>();
42✔
513
}
696✔
514

515
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
516
{
61,776✔
517
    return query.clone_for_handover(this, policy);
61,776✔
518
}
61,776✔
519

520
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
521
{
48,369✔
522
    return tv.clone_for_handover(this, policy);
48,369✔
523
}
48,369✔
524

525
void Transaction::upgrade_file_format(int target_file_format_version)
526
{
156✔
527
    REALM_ASSERT(is_attached());
156✔
528
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
156✔
529
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
530
        return;
30✔
531
    }
30✔
532

533
    // Be sure to revisit the following upgrade logic when a new file format
534
    // version is introduced. The following assert attempt to help you not
535
    // forget it.
536
    REALM_ASSERT_EX(target_file_format_version == 24, target_file_format_version);
126✔
537

538
    // DB::do_open() must ensure that only supported version are allowed.
539
    // It does that by asking backup if the current file format version is
540
    // included in the accepted versions, so be sure to align the list of
541
    // versions with the logic below
542

543
    int current_file_format_version = get_file_format_version();
126✔
544
    REALM_ASSERT(current_file_format_version < target_file_format_version);
126✔
545

546
    if (auto logger = get_logger()) {
126✔
547
        logger->info("Upgrading from file format version %1 to %2", current_file_format_version,
36✔
548
                     target_file_format_version);
36✔
549
    }
36✔
550
    // Ensure we have search index on all primary key columns.
551
    auto table_keys = get_table_keys();
126✔
552
    if (current_file_format_version < 22) {
126✔
553
        for (auto k : table_keys) {
114✔
554
            auto t = get_table(k);
114✔
555
            if (auto col = t->get_primary_key_column()) {
114✔
556
                t->do_add_search_index(col, IndexType::General);
78✔
557
            }
78✔
558
        }
114✔
559
    }
48✔
560

561
    if (current_file_format_version == 22) {
126✔
562
        // Check that asymmetric table are empty
563
        for (auto k : table_keys) {
180✔
564
            auto t = get_table(k);
180✔
565
            if (t->is_asymmetric() && t->size() > 0) {
180✔
566
                t->clear();
6✔
567
            }
6✔
568
        }
180✔
569
    }
66✔
570
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
126✔
571
        // Upgrade Set and Dictionary columns
572
        for (auto k : table_keys) {
180✔
573
            auto t = get_table(k);
180✔
574
            t->migrate_sets_and_dictionaries();
180✔
575
        }
180✔
576
    }
66✔
577
    if (current_file_format_version < 24) {
126✔
578
        for (auto k : table_keys) {
354✔
579
            auto t = get_table(k);
354✔
580
            t->migrate_set_orderings(); // rewrite sets to use the new string/binary order
354✔
581
            // Although StringIndex sort order has been changed in this format, we choose to
582
            // avoid upgrading them because it affects a small niche case. Instead, there is a
583
            // workaround in the String Index search code for not relying on items being ordered.
584
            t->migrate_col_keys();
354✔
585
        }
354✔
586
    }
126✔
587
    // NOTE: Additional future upgrade steps go here.
588
}
126✔
589

590
void Transaction::promote_to_async()
591
{
7,542✔
592
    util::CheckedLockGuard lck(m_async_mutex);
7,542✔
593
    if (m_async_stage == AsyncState::Idle) {
7,542✔
594
        m_async_stage = AsyncState::HasLock;
372✔
595
    }
372✔
596
}
7,542✔
597

598
void Transaction::replicate(Transaction* dest, Replication& repl) const
599
{
78✔
600
    // We should only create entries for public tables
601
    std::vector<TableKey> public_table_keys;
78✔
602
    for (auto tk : get_table_keys()) {
222✔
603
        if (table_is_public(tk))
222✔
604
            public_table_keys.push_back(tk);
180✔
605
    }
222✔
606

607
    // Create tables
608
    for (auto tk : public_table_keys) {
180✔
609
        auto table = get_table(tk);
180✔
610
        auto table_name = table->get_name();
180✔
611
        if (!table->is_embedded()) {
180✔
612
            auto pk_col = table->get_primary_key_column();
138✔
613
            if (!pk_col)
138✔
UNCOV
614
                throw RuntimeError(
×
UNCOV
615
                    ErrorCodes::BrokenInvariant,
×
UNCOV
616
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
617
            auto pk_name = table->get_column_name(pk_col);
138✔
618
            if (pk_name != "_id")
138✔
619
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
620
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
621
                                                Group::table_name_to_class_name(table_name), pk_name));
×
622
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
623
                                            pk_col.is_nullable(), table->get_table_type());
138✔
624
        }
138✔
625
        else {
42✔
626
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
627
        }
42✔
628
    }
180✔
629
    // Create columns
630
    for (auto tk : public_table_keys) {
180✔
631
        auto table = get_table(tk);
180✔
632
        auto pk_col = table->get_primary_key_column();
180✔
633
        auto cols = table->get_column_keys();
180✔
634
        for (auto col : cols) {
462✔
635
            if (col == pk_col)
462✔
636
                continue;
138✔
637
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
324✔
638
                               table->get_opposite_table(col).unchecked_ptr());
324✔
639
        }
324✔
640
    }
180✔
641
    dest->commit_and_continue_writing();
78✔
642
    // Now the schema should be in place - create the objects
643
#ifdef REALM_DEBUG
78✔
644
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
645
#else
646
    constexpr int number_of_objects_to_create_before_committing = 1000;
647
#endif
648
    auto n = number_of_objects_to_create_before_committing;
78✔
649
    for (auto tk : public_table_keys) {
168✔
650
        auto table = get_table(tk);
168✔
651
        if (table->is_embedded())
168✔
652
            continue;
42✔
653
        auto pk_col = table->get_primary_key_column();
126✔
654
        auto cols = get_col_info(table.unchecked_ptr());
126✔
655
        for (auto o : *table) {
750✔
656
            auto obj_key = o.get_key();
750✔
657
            Mixed pk = o.get_any(pk_col);
750✔
658
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
659
            generate_properties_for_obj(repl, o, cols);
750✔
660
            if (--n == 0) {
750✔
661
                dest->commit_and_continue_writing();
6✔
662
                n = number_of_objects_to_create_before_committing;
6✔
663
            }
6✔
664
        }
750✔
665
    }
126✔
666
}
78✔
667

668
void Transaction::complete_async_commit()
669
{
1,386✔
670
    // sync to disk:
671
    DB::ReadLockInfo read_lock;
1,386✔
672
    try {
1,386✔
673
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,386✔
674
        if (db->m_logger) {
1,386✔
675
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,386✔
676
                              "Tr %1: Committing ref %2 to disk", m_log_id, read_lock.m_top_ref);
1,386✔
677
        }
1,386✔
678
        GroupCommitter out(*this);
1,386✔
679
        out.commit(read_lock.m_top_ref); // Throws
1,386✔
680
        // we must release the write mutex before the callback, because the callback
681
        // is allowed to re-request it.
682
        db->release_read_lock(read_lock);
1,386✔
683
        if (m_oldest_version_not_persisted) {
1,386✔
684
            db->release_read_lock(*m_oldest_version_not_persisted);
1,380✔
685
            m_oldest_version_not_persisted.reset();
1,380✔
686
        }
1,380✔
687
    }
1,386✔
688
    catch (const std::exception& e) {
1,386✔
689
        m_commit_exception = std::current_exception();
6✔
690
        if (db->m_logger) {
6✔
691
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
6✔
692
                              "Tr %1: Committing to disk failed with exception: \"%2\"", m_log_id, e.what());
6✔
693
        }
6✔
694
        m_async_commit_has_failed = true;
6✔
695
        db->release_read_lock(read_lock);
6✔
696
    }
6✔
697
}
1,386✔
698

699
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
700
{
10,089✔
701
    util::CheckedLockGuard lck(m_async_mutex);
10,089✔
702
    if (m_async_stage == AsyncState::HasLock) {
10,089✔
703
        // Nothing to commit to disk - just release write lock
704
        m_async_stage = AsyncState::Idle;
54✔
705
        db->async_end_write();
54✔
706
    }
54✔
707
    else if (m_async_stage == AsyncState::HasCommits) {
10,035✔
708
        m_async_stage = AsyncState::Syncing;
1,362✔
709
        m_commit_exception = std::exception_ptr();
1,362✔
710
        // get a callback on the helper thread, in which to sync to disk
711
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,362✔
712
            complete_async_commit();
1,362✔
713
            util::CheckedLockGuard lck(m_async_mutex);
1,362✔
714
            m_async_stage = AsyncState::Idle;
1,362✔
715
            if (m_waiting_for_sync) {
1,362✔
716
                m_waiting_for_sync = false;
366✔
717
                m_async_cv.notify_all();
366✔
718
            }
366✔
719
            else {
996✔
720
                cb();
996✔
721
            }
996✔
722
        });
1,362✔
723
    }
1,362✔
724
}
10,089✔
725

726
void Transaction::prepare_for_close()
727
{
2,024,724✔
728
    util::CheckedLockGuard lck(m_async_mutex);
2,024,724✔
729
    switch (m_async_stage) {
2,024,724✔
730
        case AsyncState::Idle:
2,026,347✔
731
            break;
2,026,347✔
732

733
        case AsyncState::Requesting:
24✔
734
            // We don't have the ability to cancel a wait on the write lock, so
735
            // unfortunately we have to wait for it to be acquired.
736
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
737
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
738
            m_waiting_for_write_lock = true;
24✔
739
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
740
                return !m_waiting_for_write_lock;
48✔
741
            });
48✔
742
            db->end_write_on_correct_thread();
24✔
743
            break;
24✔
744

745
        case AsyncState::HasLock:
30✔
746
            // We have the lock and are currently in a write transaction, and
747
            // also may have some pending previous commits to write
748
            if (m_transact_stage == DB::transact_Writing) {
30✔
749
                db->reset_free_space_tracking();
18✔
750
                m_transact_stage = DB::transact_Reading;
18✔
751
            }
18✔
752
            if (m_oldest_version_not_persisted) {
30✔
UNCOV
753
                complete_async_commit();
×
UNCOV
754
            }
×
755
            db->end_write_on_correct_thread();
30✔
756
            break;
30✔
757

758
        case AsyncState::HasCommits:
24✔
759
            // We have commits which need to be synced to disk, so do that
760
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
761
            complete_async_commit();
24✔
762
            db->end_write_on_correct_thread();
24✔
763
            break;
24✔
764

765
        case AsyncState::Syncing:
24✔
766
            // The worker thread is currently writing, so wait for it to complete
767
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
768
            m_waiting_for_sync = true;
24✔
769
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
770
                return !m_waiting_for_sync;
48✔
771
            });
48✔
772
            break;
24✔
773
    }
2,024,724✔
774
    m_async_stage = AsyncState::Idle;
2,026,644✔
775
}
2,026,644✔
776

777
void Transaction::acquire_write_lock()
778
{
340,110✔
779
    util::CheckedUniqueLock lck(m_async_mutex);
340,110✔
780
    switch (m_async_stage) {
340,110✔
781
        case AsyncState::Idle:
339,660✔
782
            lck.unlock();
339,660✔
783
            db->do_begin_possibly_async_write();
339,660✔
784
            return;
339,660✔
785

786
        case AsyncState::Requesting:
105✔
787
            m_waiting_for_write_lock = true;
105✔
788
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
210✔
789
                return !m_waiting_for_write_lock;
210✔
790
            });
210✔
791
            return;
105✔
792

UNCOV
793
        case AsyncState::HasLock:
✔
UNCOV
794
        case AsyncState::HasCommits:
✔
UNCOV
795
            return;
×
796

797
        case AsyncState::Syncing:
342✔
798
            m_waiting_for_sync = true;
342✔
799
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
684✔
800
                return !m_waiting_for_sync;
684✔
801
            });
684✔
802
            lck.unlock();
342✔
803
            db->do_begin_possibly_async_write();
342✔
804
            break;
342✔
805
    }
340,110✔
806
}
340,110✔
807

808
void Transaction::do_end_read() noexcept
809
{
1,925,949✔
810
    if (db->m_logger)
1,925,949✔
811
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "End transaction %1", m_log_id);
723,723✔
812

813
    prepare_for_close();
1,925,949✔
814
    detach();
1,925,949✔
815

816
    // We should always be ensuring that async commits finish before we get here,
817
    // but if the fsync() failed or we failed to update the top pointer then
818
    // there's not much we can do and we have to just accept that we're losing
819
    // those commits.
820
    if (m_oldest_version_not_persisted) {
1,925,949✔
821
        REALM_ASSERT(m_async_commit_has_failed);
6✔
822
        // We need to not release our read lock on m_oldest_version_not_persisted
823
        // as that's the version the top pointer is referencing and overwriting
824
        // that version will corrupt the Realm file.
825
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
826
    }
6✔
827
    db->release_read_lock(m_read_lock);
1,925,949✔
828

829
    set_transact_stage(DB::transact_Ready);
1,925,949✔
830
    // reset the std::shared_ptr to allow the DB object to release resources
831
    // as early as possible.
832
    db.reset();
1,925,949✔
833
}
1,925,949✔
834

835
// This is the same as do_end_read() above, but with the requirement that
836
// 1) This is called with the db->mutex locked already
837
// 2) No async commits outstanding
838
void Transaction::close_read_with_lock()
839
{
162✔
840
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
162✔
841
    {
162✔
842
        util::CheckedLockGuard lck(m_async_mutex);
162✔
843
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
162✔
844
    }
162✔
845

846
    detach();
162✔
847
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
162✔
848
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
162✔
849
                    m_oldest_version_not_persisted->m_file_size);
162✔
850
    db->do_release_read_lock(m_read_lock);
162✔
851

852
    set_transact_stage(DB::transact_Ready);
162✔
853
    // reset the std::shared_ptr to allow the DB object to release resources
854
    // as early as possible.
855
    db.reset();
162✔
856
}
162✔
857

858

859
void Transaction::initialize_replication()
UNCOV
860
{
×
UNCOV
861
    if (m_transact_stage == DB::transact_Writing) {
×
UNCOV
862
        if (Replication* repl = get_replication()) {
×
UNCOV
863
            auto current_version = m_read_lock.m_version;
×
UNCOV
864
            bool history_updated = false;
×
865
            repl->initiate_transact(*this, current_version, history_updated); // Throws
×
866
        }
×
867
    }
×
868
}
×
869

870
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
871
{
4,551,573✔
872
    m_transact_stage = stage;
4,551,573✔
873
}
4,551,573✔
874

875
class NodeTree {
876
public:
877
    NodeTree(size_t evac_limit, size_t work_limit)
878
        : m_evac_limit(evac_limit)
2,649✔
879
        , m_work_limit(int64_t(work_limit))
2,649✔
880
        , m_moved(0)
2,649✔
881
    {
5,301✔
882
    }
5,301✔
883
    ~NodeTree()
884
    {
5,301✔
885
        // std::cout << "Moved: " << m_moved << std::endl;
886
    }
5,301✔
887

888
    /// Function used to traverse the node tree and "copy on write" nodes
889
    /// that are found above the evac_limit. The function will return
890
    /// when either the whole tree has been travesed or when the work_limit
891
    /// has been reached.
892
    /// \param current_node - node to process.
893
    /// \param level - the level at which current_node is placed in the tree
894
    /// \param progress - When the traversal is initiated, this vector identifies at which
895
    ///                   node the process should be resumed. It is subesequently updated
896
    ///                   to point to the node we have just processed
897
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
898
    {
1,279,515✔
899
        if (m_work_limit < 0) {
1,279,515✔
900
            return false;
5,193✔
901
        }
5,193✔
902
        if (current_node.is_read_only()) {
1,274,322✔
903
            size_t byte_size = current_node.get_byte_size();
1,247,862✔
904
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,247,862✔
905
                current_node.copy_on_write();
1,169,766✔
906
                m_moved++;
1,169,766✔
907
                m_work_limit -= byte_size;
1,169,766✔
908
            }
1,169,766✔
909
        }
1,247,862✔
910

911
        if (current_node.has_refs()) {
1,274,322✔
912
            auto sz = current_node.size();
42,063✔
913
            m_work_limit -= sz;
42,063✔
914
            if (progress.size() == level) {
42,063✔
915
                progress.push_back(0);
11,040✔
916
            }
11,040✔
917
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
42,063✔
918
            size_t ndx = progress[level];
42,063✔
919
            while (ndx < sz) {
1,300,566✔
920
                auto val = current_node.get(ndx);
1,289,586✔
921
                if (val && !(val & 1)) {
1,289,586✔
922
                    Array arr(current_node.get_alloc());
1,274,262✔
923
                    arr.set_parent(&current_node, ndx);
1,274,262✔
924
                    arr.init_from_parent();
1,274,262✔
925
                    if (!trv(arr, level + 1, progress)) {
1,274,262✔
926
                        return false;
31,083✔
927
                    }
31,083✔
928
                }
1,274,262✔
929
                ndx = ++progress[level];
1,258,503✔
930
            }
1,258,503✔
931
            while (progress.size() > level)
21,960✔
932
                progress.pop_back();
10,980✔
933
        }
10,980✔
934
        return true;
1,243,239✔
935
    }
1,274,322✔
936

937
private:
938
    size_t m_evac_limit;
939
    int64_t m_work_limit;
940
    size_t m_moved;
941
};
942

943

944
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
945
{
5,301✔
946
    NodeTree node_tree(evac_limit, work_limit);
5,301✔
947
    if (progress.empty()) {
5,301✔
948
        progress.push_back(s_table_name_ndx);
120✔
949
    }
120✔
950
    if (progress[0] == s_table_name_ndx) {
5,301✔
951
        if (!node_tree.trv(m_table_names, 1, progress))
120✔
UNCOV
952
            return;
×
953
        progress.back() = s_table_refs_ndx; // Handle tables next
120✔
954
    }
120✔
955
    if (progress[0] == s_table_refs_ndx) {
5,301✔
956
        if (!node_tree.trv(m_tables, 1, progress))
5,301✔
957
            return;
5,193✔
958
        progress.back() = s_hist_ref_ndx; // Handle history next
108✔
959
    }
108✔
960
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
108✔
961
        Array hist_arr(m_top.get_alloc());
96✔
962
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
96✔
963
        hist_arr.init_from_parent();
96✔
964
        if (!node_tree.trv(hist_arr, 1, progress))
96✔
UNCOV
965
            return;
×
966
    }
96✔
967
    progress.clear();
108✔
968
}
108✔
969

970
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc