• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / james.stone_381

25 Sep 2023 06:35PM UTC coverage: 90.919% (+0.03%) from 90.892%
james.stone_381

Pull #6670

Evergreen

ironage
optimize: only compare strings once
Pull Request #6670: Sorting stage 3

97114 of 177952 branches covered (0.0%)

879 of 887 new or added lines in 12 files covered. (99.1%)

105 existing lines in 17 files now uncovered.

236103 of 259684 relevant lines covered (90.92%)

7062315.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.22
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26

27
namespace {
28

29
using namespace realm;
30
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
31

32
ColInfo get_col_info(const Table* table)
33
{
2,034✔
34
    std::vector<std::pair<ColKey, Table*>> cols;
2,034✔
35
    if (table) {
2,034✔
36
        for (auto col : table->get_column_keys()) {
666✔
37
            Table* embedded_table = nullptr;
666✔
38
            if (auto target_table = table->get_opposite_table(col)) {
666✔
39
                if (target_table->is_embedded())
216✔
40
                    embedded_table = target_table.unchecked_ptr();
168✔
41
            }
216✔
42
            cols.emplace_back(col, embedded_table);
666✔
43
        }
666✔
44
    }
282✔
45
    return cols;
2,034✔
46
}
2,034✔
47

48
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded);
49
void add_dictionary_to_repl(Dictionary& dict, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
50
{
114✔
51
    size_t sz = dict.size();
114✔
52
    for (size_t n = 0; n < sz; ++n) {
222✔
53
        const auto& [key, val] = dict.get_pair(n);
108✔
54
        repl.dictionary_insert(dict, n, key, val);
108✔
55
        if (val.is_type(type_List)) {
108✔
56
            auto n_list = dict.get_list({key.get_string()});
6✔
57
            add_list_to_repl(*n_list, *dict.get_table()->get_repl(), nullptr);
6✔
58
        }
6✔
59
        else if (val.is_type(type_Dictionary)) {
102✔
60
            repl.dictionary_insert(dict, n, key, val);
6✔
61
            auto n_dict = dict.get_dictionary({key.get_string()});
6✔
62
            add_dictionary_to_repl(*n_dict, *dict.get_table()->get_repl(), nullptr);
6✔
63
        }
6✔
64
        else if (update_embedded) {
96✔
65
            update_embedded(val);
42✔
66
        }
42✔
67
    }
108✔
68
}
114✔
69

70
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
71
{
114✔
72
    auto sz = list.size();
114✔
73
    for (size_t n = 0; n < sz; n++) {
252✔
74
        auto val = list.get_any(n);
138✔
75
        repl.list_insert(list, n, val, n);
138✔
76
        if (val.is_type(type_List)) {
138✔
77
            auto n_list = list.get_list({n});
6✔
78
            add_list_to_repl(*n_list, *list.get_table()->get_repl(), nullptr);
6✔
79
        }
6✔
80
        else if (val.is_type(type_Dictionary)) {
132✔
81
            auto n_dict = list.get_dictionary({n});
6✔
82
            add_dictionary_to_repl(*n_dict, *list.get_table()->get_repl(), nullptr);
6✔
83
        }
6✔
84
        else if (update_embedded) {
126✔
85
            update_embedded(val);
18✔
86
        }
18✔
87
    }
138✔
88
}
114✔
89

90
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
91
{
846✔
92
    for (auto elem : cols) {
1,908✔
93
        auto col = elem.first;
1,908✔
94
        auto embedded_table = elem.second;
1,908✔
95
        auto cols_2 = get_col_info(embedded_table);
1,908✔
96
        util::UniqueFunction<void(Mixed)> update_embedded = nullptr;
1,908✔
97
        if (embedded_table) {
1,908✔
98
            update_embedded = [&](Mixed val) {
135✔
99
                if (val.is_null()) {
114✔
100
                    return;
18✔
101
                }
18✔
102
                REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
103
                Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
104
                generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
105
                return;
96✔
106
            };
96✔
107
        }
156✔
108

954✔
109
        if (col.is_list()) {
1,908✔
110
            auto list = obj.get_listbase_ptr(col);
96✔
111
            repl.list_clear(*list);
96✔
112
            add_list_to_repl(*list, repl, std::move(update_embedded));
96✔
113
        }
96✔
114
        else if (col.is_set()) {
1,812✔
115
            auto set = obj.get_setbase_ptr(col);
18✔
116
            auto sz = set->size();
18✔
117
            for (size_t n = 0; n < sz; n++) {
54✔
118
                repl.set_insert(*set, n, set->get_any(n));
36✔
119
                // Sets cannot have embedded objects
18✔
120
            }
36✔
121
        }
18✔
122
        else if (col.is_dictionary()) {
1,794✔
123
            auto dict = obj.get_dictionary(col);
96✔
124
            add_dictionary_to_repl(dict, repl, std::move(update_embedded));
96✔
125
        }
96✔
126
        else {
1,698✔
127
            auto val = obj.get_any(col);
1,698✔
128
            if (val.is_type(type_List)) {
1,698✔
129
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::List));
6✔
130
                Lst<Mixed> list(obj, col);
6✔
131
                add_list_to_repl(list, repl, std::move(update_embedded));
6✔
132
            }
6✔
133
            else if (val.is_type(type_Dictionary)) {
1,692✔
134
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::Dictionary));
6✔
135
                Dictionary dict(obj, col);
6✔
136
                add_dictionary_to_repl(dict, repl, std::move(update_embedded));
6✔
137
            }
6✔
138
            else {
1,686✔
139
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,686✔
140
                if (update_embedded)
1,686✔
141
                    update_embedded(val);
54✔
142
            }
1,686✔
143
        }
1,698✔
144
    }
1,908✔
145
}
846✔
146

147
} // namespace
148

149
namespace realm {
150

151
std::map<DB::TransactStage, const char*> log_stage = {
152
    {DB::TransactStage::transact_Frozen, "frozen"},
153
    {DB::TransactStage::transact_Writing, "write"},
154
    {DB::TransactStage::transact_Reading, "read"},
155
};
156

157
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
158
    : Group(alloc)
159
    , db(_db)
160
    , m_read_lock(rli)
161
    , m_log_id(util::gen_log_id(this))
162
{
2,712,771✔
163
    bool writable = stage == DB::transact_Writing;
2,712,771✔
164
    m_transact_stage = DB::transact_Ready;
2,712,771✔
165
    set_metrics(db->m_metrics);
2,712,771✔
166
    set_transact_stage(stage);
2,712,771✔
167
    m_alloc.note_reader_start(this);
2,712,771✔
168
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
2,712,771✔
169
                  VersionID{rli.m_version, rli.m_reader_idx});
2,712,771✔
170
    if (db->m_logger) {
2,712,771✔
171
        db->m_logger->log(util::Logger::Level::trace, "Start %1 %2: %3 ref %4", log_stage[stage], m_log_id,
857,754✔
172
                          rli.m_version, m_read_lock.m_top_ref);
857,754✔
173
    }
857,754✔
174
}
2,712,771✔
175

176
Transaction::~Transaction()
177
{
2,713,413✔
178
    // Note that this does not call close() - calling close() is done
1,630,995✔
179
    // implicitly by the deleter.
1,630,995✔
180
}
2,713,413✔
181

182
void Transaction::close()
183
{
2,723,481✔
184
    if (m_transact_stage == DB::transact_Writing) {
2,723,481✔
185
        rollback();
9,996✔
186
    }
9,996✔
187
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
2,723,481✔
188
        do_end_read();
1,620,870✔
189
    }
1,620,870✔
190
}
2,723,481✔
191

192
size_t Transaction::get_commit_size() const
193
{
37,773✔
194
    size_t sz = 0;
37,773✔
195
    if (m_transact_stage == DB::transact_Writing) {
37,773✔
196
        sz = m_alloc.get_commit_size();
7,869✔
197
    }
7,869✔
198
    return sz;
37,773✔
199
}
37,773✔
200

201
DB::version_type Transaction::commit()
202
{
977,019✔
203
    check_attached();
977,019✔
204

492,693✔
205
    if (m_transact_stage != DB::transact_Writing)
977,019✔
206
        throw WrongTransactionState("Not a write transaction");
×
207

492,693✔
208
    REALM_ASSERT(is_attached());
977,019✔
209

492,693✔
210
    // before committing, allow any accessors at group level or below to sync
492,693✔
211
    flush_accessors_for_commit();
977,019✔
212

492,693✔
213
    DB::version_type new_version = db->do_commit(*this); // Throws
977,019✔
214

492,693✔
215
    // We need to set m_read_lock in order for wait_for_change to work.
492,693✔
216
    // To set it, we grab a readlock on the latest available snapshot
492,693✔
217
    // and release it again.
492,693✔
218
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
977,019✔
219
    db->release_read_lock(lock_after_commit);
977,019✔
220

492,693✔
221
    db->end_write_on_correct_thread();
977,019✔
222

492,693✔
223
    do_end_read();
977,019✔
224
    m_read_lock = lock_after_commit;
977,019✔
225

492,693✔
226
    return new_version;
977,019✔
227
}
977,019✔
228

229
void Transaction::rollback()
230
{
10,914✔
231
    // rollback may happen as a consequence of exception handling in cases where
5,448✔
232
    // the DB has detached. If so, just back out without trying to change state.
5,448✔
233
    // the DB object has already been closed and no further processing is possible.
5,448✔
234
    if (!is_attached())
10,914✔
235
        return;
6✔
236
    if (m_transact_stage == DB::transact_Ready)
10,908✔
237
        return; // Idempotency
×
238

5,445✔
239
    if (m_transact_stage != DB::transact_Writing)
10,908✔
240
        throw WrongTransactionState("Not a write transaction");
×
241
    db->reset_free_space_tracking();
10,908✔
242
    if (!holds_write_mutex())
10,908✔
243
        db->end_write_on_correct_thread();
10,908✔
244

5,445✔
245
    do_end_read();
10,908✔
246
}
10,908✔
247

248
void Transaction::end_read()
249
{
98,031✔
250
    if (m_transact_stage == DB::transact_Ready)
98,031✔
251
        return;
6✔
252
    if (m_transact_stage == DB::transact_Writing)
98,025✔
253
        throw WrongTransactionState("Illegal end_read when in write mode");
×
254
    do_end_read();
98,025✔
255
}
98,025✔
256

257
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
258
{
388,509✔
259
    check_attached();
388,509✔
260
    if (m_transact_stage != DB::transact_Writing)
388,509✔
261
        throw WrongTransactionState("Not a write transaction");
×
262

192,624✔
263
    flush_accessors_for_commit();
388,509✔
264

192,624✔
265
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
388,509✔
266

192,624✔
267
    // advance read lock but dont update accessors:
192,624✔
268
    // As this is done under lock, along with the addition above of the newest commit,
192,624✔
269
    // we know for certain that the read lock we will grab WILL refer to our own newly
192,624✔
270
    // completed commit.
192,624✔
271

192,624✔
272
    try {
388,509✔
273
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
192,624✔
274
        // from going shortly to zero
192,624✔
275
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
388,509✔
276

192,624✔
277
        m_history = nullptr;
388,509✔
278
        set_transact_stage(DB::transact_Reading);
388,509✔
279

192,624✔
280
        if (commit_to_disk || m_oldest_version_not_persisted) {
388,509✔
281
            // Here we are either committing to disk or we are already
192,297✔
282
            // holding on to an older version. In either case there is
192,297✔
283
            // no need to hold onto this now historic version.
192,297✔
284
            db->release_read_lock(m_read_lock);
387,111✔
285
        }
387,111✔
286
        else {
1,398✔
287
            // We are not commiting to disk and there is no older
327✔
288
            // version not persisted, so hold onto this one
327✔
289
            m_oldest_version_not_persisted = m_read_lock;
1,398✔
290
        }
1,398✔
291

192,624✔
292
        if (commit_to_disk && m_oldest_version_not_persisted) {
388,509✔
293
            // We are committing to disk so we can release the
6✔
294
            // version we are holding on to
6✔
295
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
296
            m_oldest_version_not_persisted.reset();
12✔
297
        }
12✔
298
        m_read_lock = new_read_lock;
388,509✔
299
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
192,624✔
300
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
192,624✔
301
        // or older and former m_read_lock is older than current m_read_lock
192,624✔
302
        REALM_ASSERT(!m_oldest_version_not_persisted ||
388,509✔
303
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
388,509✔
304

192,624✔
305
        {
388,509✔
306
            util::CheckedLockGuard lock(m_async_mutex);
388,509✔
307
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
388,509✔
308
            if (commit_to_disk) {
388,509✔
309
                if (m_async_stage == AsyncState::Requesting) {
380,973✔
310
                    m_async_stage = AsyncState::HasLock;
×
311
                }
×
312
                else {
380,973✔
313
                    db->end_write_on_correct_thread();
380,973✔
314
                    m_async_stage = AsyncState::Idle;
380,973✔
315
                }
380,973✔
316
            }
380,973✔
317
            else {
7,536✔
318
                m_async_stage = AsyncState::HasCommits;
7,536✔
319
            }
7,536✔
320
        }
388,509✔
321

192,624✔
322
        // Remap file if it has grown, and update refs in underlying node structure.
192,624✔
323
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
388,509✔
324
        return VersionID{version, new_read_lock.m_reader_idx};
388,509✔
325
    }
388,509✔
326
    catch (std::exception& e) {
×
327
        if (db->m_logger) {
×
328
            db->m_logger->log(util::Logger::Level::error, "Tr %1: Commit failed with exception: \"%2\"", m_log_id,
×
329
                              e.what());
×
330
        }
×
331
        // In case of failure, further use of the transaction for reading is unsafe
332
        set_transact_stage(DB::transact_Ready);
×
333
        throw;
×
334
    }
×
335
}
388,509✔
336

337
VersionID Transaction::commit_and_continue_writing()
338
{
318✔
339
    check_attached();
318✔
340
    if (m_transact_stage != DB::transact_Writing)
318✔
341
        throw WrongTransactionState("Not a write transaction");
×
342

159✔
343
    // before committing, allow any accessors at group level or below to sync
159✔
344
    flush_accessors_for_commit();
318✔
345

159✔
346
    DB::version_type version = db->do_commit(*this); // Throws
318✔
347

159✔
348
    // We need to set m_read_lock in order for wait_for_change to work.
159✔
349
    // To set it, we grab a readlock on the latest available snapshot
159✔
350
    // and release it again.
159✔
351
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
318✔
352
    db->release_read_lock(m_read_lock);
318✔
353
    m_read_lock = lock_after_commit;
318✔
354
    if (Replication* repl = db->get_replication()) {
318✔
355
        bool history_updated = false;
318✔
356
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
318✔
357
    }
318✔
358

159✔
359
    bool writable = true;
318✔
360
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
318✔
361
    return VersionID{version, lock_after_commit.m_reader_idx};
318✔
362
}
318✔
363

364
TransactionRef Transaction::freeze()
365
{
6,042✔
366
    if (m_transact_stage != DB::transact_Reading)
6,042✔
367
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
368
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
369
    return db->start_frozen(version);
6,036✔
370
}
6,036✔
371

372
TransactionRef Transaction::duplicate()
373
{
49,503✔
374
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
49,503✔
375
    switch (m_transact_stage) {
49,503✔
376
        case DB::transact_Ready:
✔
377
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
378
        case DB::transact_Reading:
49,491✔
379
            return db->start_read(version);
49,491✔
380
        case DB::transact_Frozen:
✔
381
            return db->start_frozen(version);
×
382
        case DB::transact_Writing:
12✔
383
            if (get_commit_size() != 0)
12✔
384
                throw WrongTransactionState(
×
385
                    "Can only duplicate a write transaction before any changes have been made.");
×
386
            return db->start_read(version);
12✔
387
    }
×
388
    REALM_UNREACHABLE();
×
389
}
×
390

391
void Transaction::copy_to(TransactionRef dest) const
392
{
42✔
393
    _impl::CopyReplication repl(dest);
42✔
394
    replicate(dest.get(), repl);
42✔
395
}
42✔
396

397
_impl::History* Transaction::get_history() const
398
{
2,886,165✔
399
    if (!m_history) {
2,886,165✔
400
        if (auto repl = db->get_replication()) {
1,138,071✔
401
            switch (m_transact_stage) {
1,120,611✔
402
                case DB::transact_Reading:
145,083✔
403
                case DB::transact_Frozen:
145,083✔
404
                    if (!m_history_read)
145,083✔
405
                        m_history_read = repl->_create_history_read();
142,431✔
406
                    m_history = m_history_read.get();
145,083✔
407
                    m_history->set_group(const_cast<Transaction*>(this), false);
145,083✔
408
                    break;
145,083✔
409
                case DB::transact_Writing:
975,528✔
410
                    m_history = repl->_get_history_write();
975,528✔
411
                    break;
975,528✔
412
                case DB::transact_Ready:
71,682✔
413
                    break;
×
414
            }
2,886,171✔
415
        }
2,886,171✔
416
    }
1,138,071✔
417
    return m_history;
2,886,171✔
418
}
2,886,171✔
419

420
Obj Transaction::import_copy_of(const Obj& original)
421
{
18,966✔
422
    if (bool(original) && original.is_valid()) {
18,966✔
423
        TableKey tk = original.get_table_key();
18,936✔
424
        ObjKey rk = original.get_key();
18,936✔
425
        auto table = get_table(tk);
18,936✔
426
        if (table->is_valid(rk))
18,936✔
427
            return table->get_object(rk);
18,378✔
428
    }
588✔
429
    return {};
588✔
430
}
588✔
431

432
TableRef Transaction::import_copy_of(ConstTableRef original)
433
{
226,848✔
434
    TableKey tk = original->get_key();
226,848✔
435
    return get_table(tk);
226,848✔
436
}
226,848✔
437

438
LnkLst Transaction::import_copy_of(const LnkLst& original)
439
{
×
440
    if (Obj obj = import_copy_of(original.get_obj())) {
×
441
        ColKey ck = original.get_col_key();
×
442
        return obj.get_linklist(ck);
×
443
    }
×
444
    return LnkLst();
×
445
}
×
446

447
LstBasePtr Transaction::import_copy_of(const LstBase& original)
448
{
12✔
449
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
450
        ColKey ck = original.get_col_key();
6✔
451
        return obj.get_listbase_ptr(ck);
6✔
452
    }
6✔
453
    return {};
6✔
454
}
6✔
455

456
SetBasePtr Transaction::import_copy_of(const SetBase& original)
457
{
×
458
    if (Obj obj = import_copy_of(original.get_obj())) {
×
459
        ColKey ck = original.get_col_key();
×
460
        return obj.get_setbase_ptr(ck);
×
461
    }
×
462
    return {};
×
463
}
×
464

465
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
466
{
11,568✔
467
    if (Obj obj = import_copy_of(original.get_obj())) {
11,568✔
468
        auto path = original.get_short_path();
11,040✔
469
        return std::static_pointer_cast<CollectionBase>(obj.get_collection_ptr(path));
11,040✔
470
    }
11,040✔
471
    return {};
528✔
472
}
528✔
473

474
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
475
{
6✔
476
    if (!bool(original))
6✔
477
        return nullptr;
×
478
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
479
        ColKey ck = original->get_col_key();
6✔
480
        return obj.get_linklist_ptr(ck);
6✔
481
    }
6✔
482
    return std::make_unique<LnkLst>();
×
483
}
×
484

485
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
486
{
×
487
    if (!original)
×
488
        return nullptr;
×
489
    if (Obj obj = import_copy_of(original->get_obj())) {
×
490
        ColKey ck = original->get_col_key();
×
491
        return obj.get_linkset_ptr(ck);
×
492
    }
×
493
    return std::make_unique<LnkSet>();
×
494
}
×
495

496
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
497
{
56,670✔
498
    if (!original)
56,670✔
499
        return nullptr;
55,974✔
500
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
501
        ColKey ck = original->get_owning_col_key();
654✔
502
        return obj.get_linkcollection_ptr(ck);
654✔
503
    }
654✔
504
    // return some empty collection where size() == 0
21✔
505
    // the type shouldn't matter
21✔
506
    return std::make_unique<LnkLst>();
42✔
507
}
42✔
508

509
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
510
{
59,472✔
511
    return query.clone_for_handover(this, policy);
59,472✔
512
}
59,472✔
513

514
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
515
{
38,010✔
516
    return tv.clone_for_handover(this, policy);
38,010✔
517
}
38,010✔
518

519
void Transaction::upgrade_file_format(int target_file_format_version)
520
{
150✔
521
    REALM_ASSERT(is_attached());
150✔
522
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
150✔
523
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
15✔
524
        return;
30✔
525
    }
30✔
526

60✔
527
    // Be sure to revisit the following upgrade logic when a new file format
60✔
528
    // version is introduced. The following assert attempt to help you not
60✔
529
    // forget it.
60✔
530
    REALM_ASSERT_EX(target_file_format_version == 24, target_file_format_version);
120✔
531

60✔
532
    // DB::do_open() must ensure that only supported version are allowed.
60✔
533
    // It does that by asking backup if the current file format version is
60✔
534
    // included in the accepted versions, so be sure to align the list of
60✔
535
    // versions with the logic below
60✔
536

60✔
537
    int current_file_format_version = get_file_format_version();
120✔
538
    REALM_ASSERT(current_file_format_version < target_file_format_version);
120✔
539

60✔
540
    // Ensure we have search index on all primary key columns.
60✔
541
    auto table_keys = get_table_keys();
120✔
542
    if (current_file_format_version < 22) {
120✔
543
        for (auto k : table_keys) {
114✔
544
            auto t = get_table(k);
114✔
545
            if (auto col = t->get_primary_key_column()) {
114✔
546
                t->do_add_search_index(col, IndexType::General);
78✔
547
            }
78✔
548
        }
114✔
549
    }
48✔
550

60✔
551
    if (current_file_format_version == 22) {
120✔
552
        // Check that asymmetric table are empty
33✔
553
        for (auto k : table_keys) {
180✔
554
            auto t = get_table(k);
180✔
555
            if (t->is_asymmetric() && t->size() > 0) {
180✔
556
                t->clear();
6✔
557
            }
6✔
558
        }
180✔
559
    }
66✔
560
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
120✔
561
        // Upgrade Set and Dictionary columns
33✔
562
        for (auto k : table_keys) {
180✔
563
            auto t = get_table(k);
180✔
564
            t->migrate_sets_and_dictionaries();
180✔
565
        }
180✔
566
    }
66✔
567
    if (current_file_format_version < 24) {
120✔
568
        // rewrite the string indexes with the comparison order
60✔
569
        for (auto k : table_keys) {
318✔
570
            auto t = get_table(k);
318✔
571
            t->migrate_string_sets(); // rewrite sets to use the new string order
318✔
572
            if constexpr (std::is_signed<char>()) {
318✔
573
                t->for_each_public_column([&](ColKey col) {
762✔
574
                    if (col.get_type() == col_type_String || col.get_type() == col_type_Mixed) {
762✔
575
                        switch (t->search_index_type(col)) {
438✔
576
                            case IndexType::General: {
108✔
577
                                if (current_file_format_version < 22 && t->get_primary_key_column() == col) {
108✔
578
                                    break; // this index was just added by a previous upgrade step above
42✔
579
                                }
42✔
580
                                t->remove_search_index(col);
66✔
581
                                t->add_search_index(col, IndexType::General);
66✔
582
                                break;
66✔
583
                            }
66✔
584
                            case IndexType::Fulltext:
33✔
NEW
585
                                t->remove_search_index(col);
×
NEW
586
                                t->add_search_index(col, IndexType::Fulltext);
×
NEW
587
                                break;
×
588
                            case IndexType::None:
330✔
589
                                break;
330✔
590
                        }
762✔
591
                    }
762✔
592
                    return IteratorControl::AdvanceToNext;
762✔
593
                });
762✔
594
            }
318✔
595
        }
318✔
596
    }
120✔
597
    // NOTE: Additional future upgrade steps go here.
60✔
598
}
120✔
599

600
void Transaction::promote_to_async()
601
{
7,536✔
602
    util::CheckedLockGuard lck(m_async_mutex);
7,536✔
603
    if (m_async_stage == AsyncState::Idle) {
7,536✔
604
        m_async_stage = AsyncState::HasLock;
366✔
605
    }
366✔
606
}
7,536✔
607

608
void Transaction::replicate(Transaction* dest, Replication& repl) const
609
{
78✔
610
    // We should only create entries for public tables
39✔
611
    std::vector<TableKey> public_table_keys;
78✔
612
    for (auto tk : get_table_keys()) {
222✔
613
        if (table_is_public(tk))
222✔
614
            public_table_keys.push_back(tk);
180✔
615
    }
222✔
616

39✔
617
    // Create tables
39✔
618
    for (auto tk : public_table_keys) {
180✔
619
        auto table = get_table(tk);
180✔
620
        auto table_name = table->get_name();
180✔
621
        if (!table->is_embedded()) {
180✔
622
            auto pk_col = table->get_primary_key_column();
138✔
623
            if (!pk_col)
138✔
624
                throw RuntimeError(
×
625
                    ErrorCodes::BrokenInvariant,
×
626
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
627
            auto pk_name = table->get_column_name(pk_col);
138✔
628
            if (pk_name != "_id")
138✔
629
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
630
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
631
                                                Group::table_name_to_class_name(table_name), pk_name));
×
632
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
633
                                            pk_col.is_nullable(), table->get_table_type());
138✔
634
        }
138✔
635
        else {
42✔
636
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
637
        }
42✔
638
    }
180✔
639
    // Create columns
39✔
640
    for (auto tk : public_table_keys) {
180✔
641
        auto table = get_table(tk);
180✔
642
        auto pk_col = table->get_primary_key_column();
180✔
643
        auto cols = table->get_column_keys();
180✔
644
        for (auto col : cols) {
462✔
645
            if (col == pk_col)
462✔
646
                continue;
138✔
647
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
324✔
648
                               table->get_opposite_table(col).unchecked_ptr());
324✔
649
        }
324✔
650
    }
180✔
651
    dest->commit_and_continue_writing();
78✔
652
    // Now the schema should be in place - create the objects
39✔
653
#ifdef REALM_DEBUG
78✔
654
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
655
#else
656
    constexpr int number_of_objects_to_create_before_committing = 1000;
657
#endif
658
    auto n = number_of_objects_to_create_before_committing;
78✔
659
    for (auto tk : public_table_keys) {
168✔
660
        auto table = get_table(tk);
168✔
661
        if (table->is_embedded())
168✔
662
            continue;
42✔
663
        auto pk_col = table->get_primary_key_column();
126✔
664
        auto cols = get_col_info(table.unchecked_ptr());
126✔
665
        for (auto o : *table) {
750✔
666
            auto obj_key = o.get_key();
750✔
667
            Mixed pk = o.get_any(pk_col);
750✔
668
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
669
            generate_properties_for_obj(repl, o, cols);
750✔
670
            if (--n == 0) {
750✔
671
                dest->commit_and_continue_writing();
6✔
672
                n = number_of_objects_to_create_before_committing;
6✔
673
            }
6✔
674
        }
750✔
675
    }
126✔
676
}
78✔
677

678
void Transaction::complete_async_commit()
679
{
1,380✔
680
    // sync to disk:
318✔
681
    DB::ReadLockInfo read_lock;
1,380✔
682
    try {
1,380✔
683
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,380✔
684
        if (db->m_logger) {
1,380✔
685
            db->m_logger->log(util::Logger::Level::trace, "Tr %1: Committing ref %2 to disk", m_log_id,
1,380✔
686
                              read_lock.m_top_ref);
1,380✔
687
        }
1,380✔
688
        GroupCommitter out(*this);
1,380✔
689
        out.commit(read_lock.m_top_ref); // Throws
1,380✔
690
        // we must release the write mutex before the callback, because the callback
318✔
691
        // is allowed to re-request it.
318✔
692
        db->release_read_lock(read_lock);
1,380✔
693
        if (m_oldest_version_not_persisted) {
1,380✔
694
            db->release_read_lock(*m_oldest_version_not_persisted);
1,374✔
695
            m_oldest_version_not_persisted.reset();
1,374✔
696
        }
1,374✔
697
    }
1,380✔
698
    catch (const std::exception& e) {
321✔
699
        m_commit_exception = std::current_exception();
6✔
700
        if (db->m_logger) {
6✔
701
            db->m_logger->log(util::Logger::Level::error, "Tr %1: Committing to disk failed with exception: \"%2\"",
6✔
702
                              m_log_id, e.what());
6✔
703
        }
6✔
704
        m_async_commit_has_failed = true;
6✔
705
        db->release_read_lock(read_lock);
6✔
706
    }
6✔
707
}
1,380✔
708

709
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
710
{
9,876✔
711
    util::CheckedLockGuard lck(m_async_mutex);
9,876✔
712
    if (m_async_stage == AsyncState::HasLock) {
9,876✔
713
        // Nothing to commit to disk - just release write lock
27✔
714
        m_async_stage = AsyncState::Idle;
54✔
715
        db->async_end_write();
54✔
716
    }
54✔
717
    else if (m_async_stage == AsyncState::HasCommits) {
9,822✔
718
        m_async_stage = AsyncState::Syncing;
1,356✔
719
        m_commit_exception = std::exception_ptr();
1,356✔
720
        // get a callback on the helper thread, in which to sync to disk
306✔
721
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,356✔
722
            complete_async_commit();
1,356✔
723
            util::CheckedLockGuard lck(m_async_mutex);
1,356✔
724
            m_async_stage = AsyncState::Idle;
1,356✔
725
            if (m_waiting_for_sync) {
1,356✔
726
                m_waiting_for_sync = false;
366✔
727
                m_async_cv.notify_all();
366✔
728
            }
366✔
729
            else {
990✔
730
                cb();
990✔
731
            }
990✔
732
        });
1,356✔
733
    }
1,356✔
734
}
9,876✔
735

736
void Transaction::prepare_for_close()
737
{
2,877,144✔
738
    util::CheckedLockGuard lck(m_async_mutex);
2,877,144✔
739
    switch (m_async_stage) {
2,877,144✔
740
        case AsyncState::Idle:
2,885,106✔
741
            break;
2,885,106✔
742

743
        case AsyncState::Requesting:
24✔
744
            // We don't have the ability to cancel a wait on the write lock, so
12✔
745
            // unfortunately we have to wait for it to be acquired.
12✔
746
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
747
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
748
            m_waiting_for_write_lock = true;
24✔
749
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
750
                return !m_waiting_for_write_lock;
48✔
751
            });
48✔
752
            db->end_write_on_correct_thread();
24✔
753
            break;
24✔
754

755
        case AsyncState::HasLock:
30✔
756
            // We have the lock and are currently in a write transaction, and
15✔
757
            // also may have some pending previous commits to write
15✔
758
            if (m_transact_stage == DB::transact_Writing) {
30✔
759
                db->reset_free_space_tracking();
18✔
760
                m_transact_stage = DB::transact_Reading;
18✔
761
            }
18✔
762
            if (m_oldest_version_not_persisted) {
30✔
763
                complete_async_commit();
×
764
            }
×
765
            db->end_write_on_correct_thread();
30✔
766
            break;
30✔
767

768
        case AsyncState::HasCommits:
24✔
769
            // We have commits which need to be synced to disk, so do that
12✔
770
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
771
            complete_async_commit();
24✔
772
            db->end_write_on_correct_thread();
24✔
773
            break;
24✔
774

775
        case AsyncState::Syncing:
24✔
776
            // The worker thread is currently writing, so wait for it to complete
12✔
777
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
778
            m_waiting_for_sync = true;
24✔
779
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
780
                return !m_waiting_for_sync;
48✔
781
            });
48✔
782
            break;
24✔
783
    }
2,885,229✔
784
    m_async_stage = AsyncState::Idle;
2,885,229✔
785
}
2,885,229✔
786

787
void Transaction::acquire_write_lock()
788
{
383,133✔
789
    util::CheckedUniqueLock lck(m_async_mutex);
383,133✔
790
    switch (m_async_stage) {
383,133✔
791
        case AsyncState::Idle:
382,683✔
792
            lck.unlock();
382,683✔
793
            db->do_begin_possibly_async_write();
382,683✔
794
            return;
382,683✔
795

796
        case AsyncState::Requesting:
108✔
797
            m_waiting_for_write_lock = true;
108✔
798
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
216✔
799
                return !m_waiting_for_write_lock;
216✔
800
            });
216✔
801
            return;
108✔
802

803
        case AsyncState::HasLock:
✔
804
        case AsyncState::HasCommits:
✔
805
            return;
×
806

807
        case AsyncState::Syncing:
342✔
808
            m_waiting_for_sync = true;
342✔
809
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
684✔
810
                return !m_waiting_for_sync;
684✔
811
            });
684✔
812
            lck.unlock();
342✔
813
            db->do_begin_possibly_async_write();
342✔
814
            break;
342✔
815
    }
383,133✔
816
}
383,133✔
817

818
void Transaction::do_end_read() noexcept
819
{
2,706,393✔
820
    if (db->m_logger)
2,706,393✔
821
        db->m_logger->log(util::Logger::Level::trace, "End transaction %1", m_log_id);
857,682✔
822

1,624,104✔
823
    prepare_for_close();
2,706,393✔
824
    detach();
2,706,393✔
825

1,624,104✔
826
    // We should always be ensuring that async commits finish before we get here,
1,624,104✔
827
    // but if the fsync() failed or we failed to update the top pointer then
1,624,104✔
828
    // there's not much we can do and we have to just accept that we're losing
1,624,104✔
829
    // those commits.
1,624,104✔
830
    if (m_oldest_version_not_persisted) {
2,706,393✔
831
        REALM_ASSERT(m_async_commit_has_failed);
6✔
832
        // We need to not release our read lock on m_oldest_version_not_persisted
3✔
833
        // as that's the version the top pointer is referencing and overwriting
3✔
834
        // that version will corrupt the Realm file.
3✔
835
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
836
    }
6✔
837
    db->release_read_lock(m_read_lock);
2,706,393✔
838

1,624,104✔
839
    m_alloc.note_reader_end(this);
2,706,393✔
840
    set_transact_stage(DB::transact_Ready);
2,706,393✔
841
    // reset the std::shared_ptr to allow the DB object to release resources
1,624,104✔
842
    // as early as possible.
1,624,104✔
843
    db.reset();
2,706,393✔
844
}
2,706,393✔
845

846
// This is the same as do_end_read() above, but with the requirement that
847
// 1) This is called with the db->mutex locked already
848
// 2) No async commits outstanding
849
void Transaction::close_read_with_lock()
850
{
144✔
851
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
144✔
852
    {
144✔
853
        util::CheckedLockGuard lck(m_async_mutex);
144✔
854
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
144✔
855
    }
144✔
856

72✔
857
    detach();
144✔
858
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
144✔
859
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
144✔
860
                    m_oldest_version_not_persisted->m_file_size);
144✔
861
    db->do_release_read_lock(m_read_lock);
144✔
862

72✔
863
    m_alloc.note_reader_end(this);
144✔
864
    set_transact_stage(DB::transact_Ready);
144✔
865
    // reset the std::shared_ptr to allow the DB object to release resources
72✔
866
    // as early as possible.
72✔
867
    db.reset();
144✔
868
}
144✔
869

870

871
void Transaction::initialize_replication()
872
{
×
873
    if (m_transact_stage == DB::transact_Writing) {
×
874
        if (Replication* repl = get_replication()) {
×
875
            auto current_version = m_read_lock.m_version;
×
876
            bool history_updated = false;
×
877
            repl->initiate_transact(*this, current_version, history_updated); // Throws
×
878
        }
×
879
    }
×
880
}
×
881

882
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
883
{
6,218,226✔
884
#if REALM_METRICS
6,218,226✔
885
    REALM_ASSERT(m_metrics == db->m_metrics);
6,218,226✔
886
    if (m_metrics) { // null if metrics are disabled
6,218,226✔
887
        size_t free_space;
660✔
888
        size_t used_space;
660✔
889
        db->get_stats(free_space, used_space);
660✔
890
        size_t total_size = used_space + free_space;
660✔
891

330✔
892
        size_t num_objects = m_total_rows;
660✔
893
        size_t num_available_versions = static_cast<size_t>(db->get_number_of_versions());
660✔
894
        size_t num_decrypted_pages = realm::util::get_num_decrypted_pages();
660✔
895

330✔
896
        if (stage == DB::transact_Reading) {
660✔
897
            if (m_transact_stage == DB::transact_Writing) {
132✔
898
                m_metrics->end_write_transaction(total_size, free_space, num_objects, num_available_versions,
×
899
                                                 num_decrypted_pages);
×
900
            }
×
901
            m_metrics->start_read_transaction();
132✔
902
        }
132✔
903
        else if (stage == DB::transact_Writing) {
528✔
904
            if (m_transact_stage == DB::transact_Reading) {
198✔
905
                m_metrics->end_read_transaction(total_size, free_space, num_objects, num_available_versions,
×
906
                                                num_decrypted_pages);
×
907
            }
×
908
            m_metrics->start_write_transaction();
198✔
909
        }
198✔
910
        else if (stage == DB::transact_Ready) {
330✔
911
            m_metrics->end_read_transaction(total_size, free_space, num_objects, num_available_versions,
330✔
912
                                            num_decrypted_pages);
330✔
913
            m_metrics->end_write_transaction(total_size, free_space, num_objects, num_available_versions,
330✔
914
                                             num_decrypted_pages);
330✔
915
        }
330✔
916
    }
660✔
917
#endif
6,218,226✔
918

3,651,321✔
919
    m_transact_stage = stage;
6,218,226✔
920
}
6,218,226✔
921

922
class NodeTree {
923
public:
924
    NodeTree(size_t evac_limit, size_t work_limit)
925
        : m_evac_limit(evac_limit)
926
        , m_work_limit(int64_t(work_limit))
927
        , m_moved(0)
928
    {
5,304✔
929
    }
5,304✔
930
    ~NodeTree()
931
    {
5,304✔
932
        // std::cout << "Moved: " << m_moved << std::endl;
2,634✔
933
    }
5,304✔
934

935
    /// Function used to traverse the node tree and "copy on write" nodes
936
    /// that are found above the evac_limit. The function will return
937
    /// when either the whole tree has been travesed or when the work_limit
938
    /// has been reached.
939
    /// \param current_node - node to process.
940
    /// \param level - the level at which current_node is placed in the tree
941
    /// \param progress - When the traversal is initiated, this vector identifies at which
942
    ///                   node the process should be resumed. It is subesequently updated
943
    ///                   to point to the node we have just processed
944
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
945
    {
1,260,390✔
946
        if (m_work_limit < 0) {
1,260,390✔
947
            return false;
5,211✔
948
        }
5,211✔
949
        if (current_node.is_read_only()) {
1,255,179✔
950
            size_t byte_size = current_node.get_byte_size();
1,228,077✔
951
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,228,077✔
952
                current_node.copy_on_write();
1,171,350✔
953
                m_moved++;
1,171,350✔
954
                m_work_limit -= byte_size;
1,171,350✔
955
            }
1,171,350✔
956
        }
1,228,077✔
957

627,669✔
958
        if (current_node.has_refs()) {
1,255,179✔
959
            auto sz = current_node.size();
41,460✔
960
            m_work_limit -= sz;
41,460✔
961
            if (progress.size() == level) {
41,460✔
962
                progress.push_back(0);
10,356✔
963
            }
10,356✔
964
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
41,460✔
965
            size_t ndx = progress[level];
41,460✔
966
            while (ndx < sz) {
1,280,004✔
967
                auto val = current_node.get(ndx);
1,269,708✔
968
                if (val && !(val & 1)) {
1,269,708✔
969
                    Array arr(current_node.get_alloc());
1,254,873✔
970
                    arr.set_parent(&current_node, ndx);
1,254,873✔
971
                    arr.init_from_parent();
1,254,873✔
972
                    if (!trv(arr, level + 1, progress)) {
1,254,873✔
973
                        return false;
31,164✔
974
                    }
31,164✔
975
                }
1,238,544✔
976
                ndx = ++progress[level];
1,238,544✔
977
            }
1,238,544✔
978
            while (progress.size() > level)
31,077✔
979
                progress.pop_back();
10,296✔
980
        }
10,296✔
981
        return true;
1,239,567✔
982
    }
1,255,179✔
983

984
private:
985
    size_t m_evac_limit;
986
    int64_t m_work_limit;
987
    size_t m_moved;
988
};
989

990

991
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
992
{
5,304✔
993
    NodeTree node_tree(evac_limit, work_limit);
5,304✔
994
    if (progress.empty()) {
5,304✔
995
        progress.push_back(s_table_name_ndx);
105✔
996
    }
105✔
997
    if (progress[0] == s_table_name_ndx) {
5,304✔
998
        if (!node_tree.trv(m_table_names, 1, progress))
105✔
999
            return;
×
1000
        progress.back() = s_table_refs_ndx; // Handle tables next
105✔
1001
    }
105✔
1002
    if (progress[0] == s_table_refs_ndx) {
5,304✔
1003
        if (!node_tree.trv(m_tables, 1, progress))
5,304✔
1004
            return;
5,211✔
1005
        progress.back() = s_hist_ref_ndx; // Handle history next
93✔
1006
    }
93✔
1007
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
2,694✔
1008
        Array hist_arr(m_top.get_alloc());
93✔
1009
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
93✔
1010
        hist_arr.init_from_parent();
93✔
1011
        if (!node_tree.trv(hist_arr, 1, progress))
93✔
1012
            return;
×
1013
    }
93✔
1014
    progress.clear();
93✔
1015
}
93✔
1016

1017
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc