• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / nicola.cabiddu_1042

27 Sep 2023 06:04PM UTC coverage: 91.085% (-1.8%) from 92.915%
nicola.cabiddu_1042

Pull #6766

Evergreen

nicola-cab
Fix logic for dictionaries
Pull Request #6766: Client Reset for collections in mixed / nested collections

97276 of 178892 branches covered (0.0%)

1994 of 2029 new or added lines in 7 files covered. (98.28%)

4556 existing lines in 112 files now uncovered.

237059 of 260260 relevant lines covered (91.09%)

6321099.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.55
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26
namespace {
27

28
using namespace realm;
29
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
30

31
ColInfo get_col_info(const Table* table)
32
{
2,034✔
33
    std::vector<std::pair<ColKey, Table*>> cols;
2,034✔
34
    if (table) {
2,034✔
35
        for (auto col : table->get_column_keys()) {
666✔
36
            Table* embedded_table = nullptr;
666✔
37
            if (auto target_table = table->get_opposite_table(col)) {
666✔
38
                if (target_table->is_embedded())
216✔
39
                    embedded_table = target_table.unchecked_ptr();
168✔
40
            }
216✔
41
            cols.emplace_back(col, embedded_table);
666✔
42
        }
666✔
43
    }
282✔
44
    return cols;
2,034✔
45
}
2,034✔
46

47
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded);
48
void add_dictionary_to_repl(Dictionary& dict, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
49
{
114✔
50
    size_t sz = dict.size();
114✔
51
    for (size_t n = 0; n < sz; ++n) {
222✔
52
        const auto& [key, val] = dict.get_pair(n);
108✔
53
        repl.dictionary_insert(dict, n, key, val);
108✔
54
        if (val.is_type(type_List)) {
108✔
55
            auto n_list = dict.get_list({key.get_string()});
6✔
56
            add_list_to_repl(*n_list, *dict.get_table()->get_repl(), nullptr);
6✔
57
        }
6✔
58
        else if (val.is_type(type_Dictionary)) {
102✔
59
            repl.dictionary_insert(dict, n, key, val);
6✔
60
            auto n_dict = dict.get_dictionary({key.get_string()});
6✔
61
            add_dictionary_to_repl(*n_dict, *dict.get_table()->get_repl(), nullptr);
6✔
62
        }
6✔
63
        else if (update_embedded) {
96✔
64
            update_embedded(val);
42✔
65
        }
42✔
66
    }
108✔
67
}
114✔
68

69
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
70
{
114✔
71
    auto sz = list.size();
114✔
72
    for (size_t n = 0; n < sz; n++) {
252✔
73
        auto val = list.get_any(n);
138✔
74
        repl.list_insert(list, n, val, n);
138✔
75
        if (val.is_type(type_List)) {
138✔
76
            auto n_list = list.get_list({n});
6✔
77
            add_list_to_repl(*n_list, *list.get_table()->get_repl(), nullptr);
6✔
78
        }
6✔
79
        else if (val.is_type(type_Dictionary)) {
132✔
80
            auto n_dict = list.get_dictionary({n});
6✔
81
            add_dictionary_to_repl(*n_dict, *list.get_table()->get_repl(), nullptr);
6✔
82
        }
6✔
83
        else if (update_embedded) {
126✔
84
            update_embedded(val);
18✔
85
        }
18✔
86
    }
138✔
87
}
114✔
88

89
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
90
{
846✔
91
    for (auto elem : cols) {
1,908✔
92
        auto col = elem.first;
1,908✔
93
        auto embedded_table = elem.second;
1,908✔
94
        auto cols_2 = get_col_info(embedded_table);
1,908✔
95
        util::UniqueFunction<void(Mixed)> update_embedded = nullptr;
1,908✔
96
        if (embedded_table) {
1,908✔
97
            update_embedded = [&](Mixed val) {
135✔
98
                if (val.is_null()) {
114✔
99
                    return;
18✔
100
                }
18✔
101
                REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
102
                Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
103
                generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
104
                return;
96✔
105
            };
96✔
106
        }
156✔
107

954✔
108
        if (col.is_list()) {
1,908✔
109
            auto list = obj.get_listbase_ptr(col);
96✔
110
            repl.list_clear(*list);
96✔
111
            add_list_to_repl(*list, repl, std::move(update_embedded));
96✔
112
        }
96✔
113
        else if (col.is_set()) {
1,812✔
114
            auto set = obj.get_setbase_ptr(col);
18✔
115
            auto sz = set->size();
18✔
116
            for (size_t n = 0; n < sz; n++) {
54✔
117
                repl.set_insert(*set, n, set->get_any(n));
36✔
118
                // Sets cannot have embedded objects
18✔
119
            }
36✔
120
        }
18✔
121
        else if (col.is_dictionary()) {
1,794✔
122
            auto dict = obj.get_dictionary(col);
96✔
123
            add_dictionary_to_repl(dict, repl, std::move(update_embedded));
96✔
124
        }
96✔
125
        else {
1,698✔
126
            auto val = obj.get_any(col);
1,698✔
127
            if (val.is_type(type_List)) {
1,698✔
128
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::List));
6✔
129
                Lst<Mixed> list(obj, col);
6✔
130
                add_list_to_repl(list, repl, std::move(update_embedded));
6✔
131
            }
6✔
132
            else if (val.is_type(type_Dictionary)) {
1,692✔
133
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::Dictionary));
6✔
134
                Dictionary dict(obj, col);
6✔
135
                add_dictionary_to_repl(dict, repl, std::move(update_embedded));
6✔
136
            }
6✔
137
            else {
1,686✔
138
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,686✔
139
                if (update_embedded)
1,686✔
140
                    update_embedded(val);
54✔
141
            }
1,686✔
142
        }
1,698✔
143
    }
1,908✔
144
}
846✔
145

146
} // namespace
147

148
namespace realm {
149

150
std::map<DB::TransactStage, const char*> log_stage = {
151
    {DB::TransactStage::transact_Frozen, "frozen"},
152
    {DB::TransactStage::transact_Writing, "write"},
153
    {DB::TransactStage::transact_Reading, "read"},
154
};
155

156
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
157
    : Group(alloc)
158
    , db(_db)
159
    , m_read_lock(rli)
160
    , m_log_id(util::gen_log_id(this))
161
{
2,699,004✔
162
    bool writable = stage == DB::transact_Writing;
2,699,004✔
163
    m_transact_stage = DB::transact_Ready;
2,699,004✔
164
    set_transact_stage(stage);
2,699,004✔
165
    m_alloc.note_reader_start(this);
2,699,004✔
166
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
2,699,004✔
167
                  VersionID{rli.m_version, rli.m_reader_idx});
2,699,004✔
168
    if (db->m_logger) {
2,699,004✔
169
        db->m_logger->log(util::Logger::Level::trace, "Start %1 %2: %3 ref %4", log_stage[stage], m_log_id,
852,120✔
170
                          rli.m_version, m_read_lock.m_top_ref);
852,120✔
171
    }
852,120✔
172
}
2,699,004✔
173

174
Transaction::~Transaction()
175
{
2,699,823✔
176
    // Note that this does not call close() - calling close() is done
1,627,023✔
177
    // implicitly by the deleter.
1,627,023✔
178
}
2,699,823✔
179

180
void Transaction::close()
181
{
2,712,027✔
182
    if (m_transact_stage == DB::transact_Writing) {
2,712,027✔
183
        rollback();
9,930✔
184
    }
9,930✔
185
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
2,712,027✔
186
        do_end_read();
1,612,320✔
187
    }
1,612,320✔
188
}
2,712,027✔
189

190
size_t Transaction::get_commit_size() const
191
{
42,759✔
192
    size_t sz = 0;
42,759✔
193
    if (m_transact_stage == DB::transact_Writing) {
42,759✔
194
        sz = m_alloc.get_commit_size();
12,747✔
195
    }
12,747✔
196
    return sz;
42,759✔
197
}
42,759✔
198

199
DB::version_type Transaction::commit()
200
{
973,008✔
201
    check_attached();
973,008✔
202

492,885✔
203
    if (m_transact_stage != DB::transact_Writing)
973,008✔
UNCOV
204
        throw WrongTransactionState("Not a write transaction");
×
205

492,885✔
206
    REALM_ASSERT(is_attached());
973,008✔
207

492,885✔
208
    // before committing, allow any accessors at group level or below to sync
492,885✔
209
    flush_accessors_for_commit();
973,008✔
210

492,885✔
211
    DB::version_type new_version = db->do_commit(*this); // Throws
973,008✔
212

492,885✔
213
    // We need to set m_read_lock in order for wait_for_change to work.
492,885✔
214
    // To set it, we grab a readlock on the latest available snapshot
492,885✔
215
    // and release it again.
492,885✔
216
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
973,008✔
217
    db->release_read_lock(lock_after_commit);
973,008✔
218

492,885✔
219
    db->end_write_on_correct_thread();
973,008✔
220

492,885✔
221
    do_end_read();
973,008✔
222
    m_read_lock = lock_after_commit;
973,008✔
223

492,885✔
224
    return new_version;
973,008✔
225
}
973,008✔
226

227
void Transaction::rollback()
228
{
10,848✔
229
    // rollback may happen as a consequence of exception handling in cases where
5,433✔
230
    // the DB has detached. If so, just back out without trying to change state.
5,433✔
231
    // the DB object has already been closed and no further processing is possible.
5,433✔
232
    if (!is_attached())
10,848✔
233
        return;
6✔
234
    if (m_transact_stage == DB::transact_Ready)
10,842✔
UNCOV
235
        return; // Idempotency
×
236

5,430✔
237
    if (m_transact_stage != DB::transact_Writing)
10,842✔
UNCOV
238
        throw WrongTransactionState("Not a write transaction");
×
239
    db->reset_free_space_tracking();
10,842✔
240
    if (!holds_write_mutex())
10,842✔
241
        db->end_write_on_correct_thread();
10,842✔
242

5,430✔
243
    do_end_read();
10,842✔
244
}
10,842✔
245

246
void Transaction::end_read()
247
{
98,040✔
248
    if (m_transact_stage == DB::transact_Ready)
98,040✔
249
        return;
6✔
250
    if (m_transact_stage == DB::transact_Writing)
98,034✔
UNCOV
251
        throw WrongTransactionState("Illegal end_read when in write mode");
×
252
    do_end_read();
98,034✔
253
}
98,034✔
254

255
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
256
{
389,328✔
257
    check_attached();
389,328✔
258
    if (m_transact_stage != DB::transact_Writing)
389,328✔
UNCOV
259
        throw WrongTransactionState("Not a write transaction");
×
260

193,185✔
261
    flush_accessors_for_commit();
389,328✔
262

193,185✔
263
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
389,328✔
264

193,185✔
265
    // advance read lock but dont update accessors:
193,185✔
266
    // As this is done under lock, along with the addition above of the newest commit,
193,185✔
267
    // we know for certain that the read lock we will grab WILL refer to our own newly
193,185✔
268
    // completed commit.
193,185✔
269

193,185✔
270
    try {
389,328✔
271
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
193,185✔
272
        // from going shortly to zero
193,185✔
273
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
389,328✔
274

193,185✔
275
        m_history = nullptr;
389,328✔
276
        set_transact_stage(DB::transact_Reading);
389,328✔
277

193,185✔
278
        if (commit_to_disk || m_oldest_version_not_persisted) {
389,328✔
279
            // Here we are either committing to disk or we are already
192,861✔
280
            // holding on to an older version. In either case there is
192,861✔
281
            // no need to hold onto this now historic version.
192,861✔
282
            db->release_read_lock(m_read_lock);
387,936✔
283
        }
387,936✔
284
        else {
1,392✔
285
            // We are not commiting to disk and there is no older
324✔
286
            // version not persisted, so hold onto this one
324✔
287
            m_oldest_version_not_persisted = m_read_lock;
1,392✔
288
        }
1,392✔
289

193,185✔
290
        if (commit_to_disk && m_oldest_version_not_persisted) {
389,328✔
291
            // We are committing to disk so we can release the
6✔
292
            // version we are holding on to
6✔
293
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
294
            m_oldest_version_not_persisted.reset();
12✔
295
        }
12✔
296
        m_read_lock = new_read_lock;
389,328✔
297
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
193,185✔
298
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
193,185✔
299
        // or older and former m_read_lock is older than current m_read_lock
193,185✔
300
        REALM_ASSERT(!m_oldest_version_not_persisted ||
389,328✔
301
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
389,328✔
302

193,185✔
303
        {
389,328✔
304
            util::CheckedLockGuard lock(m_async_mutex);
389,328✔
305
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
389,328✔
306
            if (commit_to_disk) {
389,328✔
307
                if (m_async_stage == AsyncState::Requesting) {
381,798✔
UNCOV
308
                    m_async_stage = AsyncState::HasLock;
×
UNCOV
309
                }
×
310
                else {
381,798✔
311
                    db->end_write_on_correct_thread();
381,798✔
312
                    m_async_stage = AsyncState::Idle;
381,798✔
313
                }
381,798✔
314
            }
381,798✔
315
            else {
7,530✔
316
                m_async_stage = AsyncState::HasCommits;
7,530✔
317
            }
7,530✔
318
        }
389,328✔
319

193,185✔
320
        // Remap file if it has grown, and update refs in underlying node structure.
193,185✔
321
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
389,328✔
322
        return VersionID{version, new_read_lock.m_reader_idx};
389,328✔
323
    }
389,328✔
UNCOV
324
    catch (std::exception& e) {
×
UNCOV
325
        if (db->m_logger) {
×
UNCOV
326
            db->m_logger->log(util::Logger::Level::error, "Tr %1: Commit failed with exception: \"%2\"", m_log_id,
×
UNCOV
327
                              e.what());
×
UNCOV
328
        }
×
329
        // In case of failure, further use of the transaction for reading is unsafe
UNCOV
330
        set_transact_stage(DB::transact_Ready);
×
331
        throw;
×
332
    }
×
333
}
389,328✔
334

335
VersionID Transaction::commit_and_continue_writing()
336
{
318✔
337
    check_attached();
318✔
338
    if (m_transact_stage != DB::transact_Writing)
318✔
339
        throw WrongTransactionState("Not a write transaction");
×
340

159✔
341
    // before committing, allow any accessors at group level or below to sync
159✔
342
    flush_accessors_for_commit();
318✔
343

159✔
344
    DB::version_type version = db->do_commit(*this); // Throws
318✔
345

159✔
346
    // We need to set m_read_lock in order for wait_for_change to work.
159✔
347
    // To set it, we grab a readlock on the latest available snapshot
159✔
348
    // and release it again.
159✔
349
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
318✔
350
    db->release_read_lock(m_read_lock);
318✔
351
    m_read_lock = lock_after_commit;
318✔
352
    if (Replication* repl = db->get_replication()) {
318✔
353
        bool history_updated = false;
318✔
354
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
318✔
355
    }
318✔
356

159✔
357
    bool writable = true;
318✔
358
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
318✔
359
    return VersionID{version, lock_after_commit.m_reader_idx};
318✔
360
}
318✔
361

362
TransactionRef Transaction::freeze()
363
{
6,042✔
364
    if (m_transact_stage != DB::transact_Reading)
6,042✔
365
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
366
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
367
    return db->start_frozen(version);
6,036✔
368
}
6,036✔
369

370
TransactionRef Transaction::duplicate()
371
{
50,349✔
372
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
50,349✔
373
    switch (m_transact_stage) {
50,349✔
UNCOV
374
        case DB::transact_Ready:
✔
UNCOV
375
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
376
        case DB::transact_Reading:
50,337✔
377
            return db->start_read(version);
50,337✔
UNCOV
378
        case DB::transact_Frozen:
✔
UNCOV
379
            return db->start_frozen(version);
×
380
        case DB::transact_Writing:
12✔
381
            if (get_commit_size() != 0)
12✔
UNCOV
382
                throw WrongTransactionState(
×
UNCOV
383
                    "Can only duplicate a write transaction before any changes have been made.");
×
384
            return db->start_read(version);
12✔
UNCOV
385
    }
×
UNCOV
386
    REALM_UNREACHABLE();
×
UNCOV
387
}
×
388

389
void Transaction::copy_to(TransactionRef dest) const
390
{
42✔
391
    _impl::CopyReplication repl(dest);
42✔
392
    replicate(dest.get(), repl);
42✔
393
}
42✔
394

395
_impl::History* Transaction::get_history() const
396
{
3,457,662✔
397
    if (!m_history) {
3,457,662✔
398
        if (auto repl = db->get_replication()) {
1,135,719✔
399
            switch (m_transact_stage) {
1,118,259✔
400
                case DB::transact_Reading:
146,322✔
401
                case DB::transact_Frozen:
146,322✔
402
                    if (!m_history_read)
146,322✔
403
                        m_history_read = repl->_create_history_read();
143,670✔
404
                    m_history = m_history_read.get();
146,322✔
405
                    m_history->set_group(const_cast<Transaction*>(this), false);
146,322✔
406
                    break;
146,322✔
407
                case DB::transact_Writing:
971,931✔
408
                    m_history = repl->_get_history_write();
971,931✔
409
                    break;
971,931✔
410
                case DB::transact_Ready:
72,312✔
UNCOV
411
                    break;
×
412
            }
3,457,656✔
413
        }
3,457,656✔
414
    }
1,135,719✔
415
    return m_history;
3,457,656✔
416
}
3,457,656✔
417

418
Obj Transaction::import_copy_of(const Obj& original)
419
{
18,966✔
420
    if (bool(original) && original.is_valid()) {
18,966✔
421
        TableKey tk = original.get_table_key();
18,936✔
422
        ObjKey rk = original.get_key();
18,936✔
423
        auto table = get_table(tk);
18,936✔
424
        if (table->is_valid(rk))
18,936✔
425
            return table->get_object(rk);
18,378✔
426
    }
588✔
427
    return {};
588✔
428
}
588✔
429

430
TableRef Transaction::import_copy_of(ConstTableRef original)
431
{
229,350✔
432
    TableKey tk = original->get_key();
229,350✔
433
    return get_table(tk);
229,350✔
434
}
229,350✔
435

436
LnkLst Transaction::import_copy_of(const LnkLst& original)
437
{
×
438
    if (Obj obj = import_copy_of(original.get_obj())) {
×
UNCOV
439
        ColKey ck = original.get_col_key();
×
UNCOV
440
        return obj.get_linklist(ck);
×
441
    }
×
442
    return LnkLst();
×
443
}
×
444

445
LstBasePtr Transaction::import_copy_of(const LstBase& original)
446
{
12✔
447
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
448
        ColKey ck = original.get_col_key();
6✔
449
        return obj.get_listbase_ptr(ck);
6✔
450
    }
6✔
451
    return {};
6✔
452
}
6✔
453

454
SetBasePtr Transaction::import_copy_of(const SetBase& original)
UNCOV
455
{
×
UNCOV
456
    if (Obj obj = import_copy_of(original.get_obj())) {
×
UNCOV
457
        ColKey ck = original.get_col_key();
×
UNCOV
458
        return obj.get_setbase_ptr(ck);
×
UNCOV
459
    }
×
UNCOV
460
    return {};
×
UNCOV
461
}
×
462

463
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
464
{
11,568✔
465
    if (Obj obj = import_copy_of(original.get_obj())) {
11,568✔
466
        auto path = original.get_short_path();
11,040✔
467
        return std::static_pointer_cast<CollectionBase>(obj.get_collection_ptr(path));
11,040✔
468
    }
11,040✔
469
    return {};
528✔
470
}
528✔
471

472
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
473
{
6✔
474
    if (!bool(original))
6✔
UNCOV
475
        return nullptr;
×
476
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
477
        ColKey ck = original->get_col_key();
6✔
478
        return obj.get_linklist_ptr(ck);
6✔
479
    }
6✔
UNCOV
480
    return std::make_unique<LnkLst>();
×
UNCOV
481
}
×
482

483
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
UNCOV
484
{
×
UNCOV
485
    if (!original)
×
UNCOV
486
        return nullptr;
×
UNCOV
487
    if (Obj obj = import_copy_of(original->get_obj())) {
×
UNCOV
488
        ColKey ck = original->get_col_key();
×
UNCOV
489
        return obj.get_linkset_ptr(ck);
×
UNCOV
490
    }
×
UNCOV
491
    return std::make_unique<LnkSet>();
×
UNCOV
492
}
×
493

494
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
495
{
57,504✔
496
    if (!original)
57,504✔
497
        return nullptr;
56,808✔
498
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
499
        ColKey ck = original->get_owning_col_key();
654✔
500
        return obj.get_linkcollection_ptr(ck);
654✔
501
    }
654✔
502
    // return some empty collection where size() == 0
21✔
503
    // the type shouldn't matter
21✔
504
    return std::make_unique<LnkLst>();
42✔
505
}
42✔
506

507
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
508
{
59,472✔
509
    return query.clone_for_handover(this, policy);
59,472✔
510
}
59,472✔
511

512
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
513
{
38,844✔
514
    return tv.clone_for_handover(this, policy);
38,844✔
515
}
38,844✔
516

517
void Transaction::upgrade_file_format(int target_file_format_version)
518
{
156✔
519
    REALM_ASSERT(is_attached());
156✔
520
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
156✔
521
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
15✔
522
        return;
30✔
523
    }
30✔
524

63✔
525
    // Be sure to revisit the following upgrade logic when a new file format
63✔
526
    // version is introduced. The following assert attempt to help you not
63✔
527
    // forget it.
63✔
528
    REALM_ASSERT_EX(target_file_format_version == 24, target_file_format_version);
126✔
529

63✔
530
    // DB::do_open() must ensure that only supported version are allowed.
63✔
531
    // It does that by asking backup if the current file format version is
63✔
532
    // included in the accepted versions, so be sure to align the list of
63✔
533
    // versions with the logic below
63✔
534

63✔
535
    int current_file_format_version = get_file_format_version();
126✔
536
    REALM_ASSERT(current_file_format_version < target_file_format_version);
126✔
537

63✔
538
    // Ensure we have search index on all primary key columns.
63✔
539
    auto table_keys = get_table_keys();
126✔
540
    if (current_file_format_version < 22) {
126✔
541
        for (auto k : table_keys) {
114✔
542
            auto t = get_table(k);
114✔
543
            if (auto col = t->get_primary_key_column()) {
114✔
544
                t->do_add_search_index(col, IndexType::General);
78✔
545
            }
78✔
546
        }
114✔
547
    }
48✔
548

63✔
549
    if (current_file_format_version == 22) {
126✔
550
        // Check that asymmetric table are empty
33✔
551
        for (auto k : table_keys) {
180✔
552
            auto t = get_table(k);
180✔
553
            if (t->is_asymmetric() && t->size() > 0) {
180✔
554
                t->clear();
6✔
555
            }
6✔
556
        }
180✔
557
    }
66✔
558
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
126✔
559
        // Upgrade Set and Dictionary columns
33✔
560
        for (auto k : table_keys) {
180✔
561
            auto t = get_table(k);
180✔
562
            t->migrate_sets_and_dictionaries();
180✔
563
        }
180✔
564
    }
66✔
565
    // NOTE: Additional future upgrade steps go here.
63✔
566
}
126✔
567

568
void Transaction::promote_to_async()
569
{
7,536✔
570
    util::CheckedLockGuard lck(m_async_mutex);
7,536✔
571
    if (m_async_stage == AsyncState::Idle) {
7,536✔
572
        m_async_stage = AsyncState::HasLock;
366✔
573
    }
366✔
574
}
7,536✔
575

576
void Transaction::replicate(Transaction* dest, Replication& repl) const
577
{
78✔
578
    // We should only create entries for public tables
39✔
579
    std::vector<TableKey> public_table_keys;
78✔
580
    for (auto tk : get_table_keys()) {
222✔
581
        if (table_is_public(tk))
222✔
582
            public_table_keys.push_back(tk);
180✔
583
    }
222✔
584

39✔
585
    // Create tables
39✔
586
    for (auto tk : public_table_keys) {
180✔
587
        auto table = get_table(tk);
180✔
588
        auto table_name = table->get_name();
180✔
589
        if (!table->is_embedded()) {
180✔
590
            auto pk_col = table->get_primary_key_column();
138✔
591
            if (!pk_col)
138✔
UNCOV
592
                throw RuntimeError(
×
UNCOV
593
                    ErrorCodes::BrokenInvariant,
×
UNCOV
594
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
595
            auto pk_name = table->get_column_name(pk_col);
138✔
596
            if (pk_name != "_id")
138✔
UNCOV
597
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
UNCOV
598
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
UNCOV
599
                                                Group::table_name_to_class_name(table_name), pk_name));
×
600
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
601
                                            pk_col.is_nullable(), table->get_table_type());
138✔
602
        }
138✔
603
        else {
42✔
604
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
605
        }
42✔
606
    }
180✔
607
    // Create columns
39✔
608
    for (auto tk : public_table_keys) {
180✔
609
        auto table = get_table(tk);
180✔
610
        auto pk_col = table->get_primary_key_column();
180✔
611
        auto cols = table->get_column_keys();
180✔
612
        for (auto col : cols) {
462✔
613
            if (col == pk_col)
462✔
614
                continue;
138✔
615
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
324✔
616
                               table->get_opposite_table(col).unchecked_ptr());
324✔
617
        }
324✔
618
    }
180✔
619
    dest->commit_and_continue_writing();
78✔
620
    // Now the schema should be in place - create the objects
39✔
621
#ifdef REALM_DEBUG
78✔
622
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
623
#else
624
    constexpr int number_of_objects_to_create_before_committing = 1000;
625
#endif
626
    auto n = number_of_objects_to_create_before_committing;
78✔
627
    for (auto tk : public_table_keys) {
168✔
628
        auto table = get_table(tk);
168✔
629
        if (table->is_embedded())
168✔
630
            continue;
42✔
631
        auto pk_col = table->get_primary_key_column();
126✔
632
        auto cols = get_col_info(table.unchecked_ptr());
126✔
633
        for (auto o : *table) {
750✔
634
            auto obj_key = o.get_key();
750✔
635
            Mixed pk = o.get_any(pk_col);
750✔
636
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
637
            generate_properties_for_obj(repl, o, cols);
750✔
638
            if (--n == 0) {
750✔
639
                dest->commit_and_continue_writing();
6✔
640
                n = number_of_objects_to_create_before_committing;
6✔
641
            }
6✔
642
        }
750✔
643
    }
126✔
644
}
78✔
645

646
void Transaction::complete_async_commit()
647
{
1,380✔
648
    // sync to disk:
318✔
649
    DB::ReadLockInfo read_lock;
1,380✔
650
    try {
1,380✔
651
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,380✔
652
        if (db->m_logger) {
1,380✔
653
            db->m_logger->log(util::Logger::Level::trace, "Tr %1: Committing ref %2 to disk", m_log_id,
1,380✔
654
                              read_lock.m_top_ref);
1,380✔
655
        }
1,380✔
656
        GroupCommitter out(*this);
1,380✔
657
        out.commit(read_lock.m_top_ref); // Throws
1,380✔
658
        // we must release the write mutex before the callback, because the callback
318✔
659
        // is allowed to re-request it.
318✔
660
        db->release_read_lock(read_lock);
1,380✔
661
        if (m_oldest_version_not_persisted) {
1,380✔
662
            db->release_read_lock(*m_oldest_version_not_persisted);
1,374✔
663
            m_oldest_version_not_persisted.reset();
1,374✔
664
        }
1,374✔
665
    }
1,380✔
666
    catch (const std::exception& e) {
321✔
667
        m_commit_exception = std::current_exception();
6✔
668
        if (db->m_logger) {
6✔
669
            db->m_logger->log(util::Logger::Level::error, "Tr %1: Committing to disk failed with exception: \"%2\"",
6✔
670
                              m_log_id, e.what());
6✔
671
        }
6✔
672
        m_async_commit_has_failed = true;
6✔
673
        db->release_read_lock(read_lock);
6✔
674
    }
6✔
675
}
1,380✔
676

677
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
678
{
9,873✔
679
    util::CheckedLockGuard lck(m_async_mutex);
9,873✔
680
    if (m_async_stage == AsyncState::HasLock) {
9,873✔
681
        // Nothing to commit to disk - just release write lock
27✔
682
        m_async_stage = AsyncState::Idle;
54✔
683
        db->async_end_write();
54✔
684
    }
54✔
685
    else if (m_async_stage == AsyncState::HasCommits) {
9,819✔
686
        m_async_stage = AsyncState::Syncing;
1,356✔
687
        m_commit_exception = std::exception_ptr();
1,356✔
688
        // get a callback on the helper thread, in which to sync to disk
306✔
689
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,356✔
690
            complete_async_commit();
1,356✔
691
            util::CheckedLockGuard lck(m_async_mutex);
1,356✔
692
            m_async_stage = AsyncState::Idle;
1,356✔
693
            if (m_waiting_for_sync) {
1,356✔
694
                m_waiting_for_sync = false;
363✔
695
                m_async_cv.notify_all();
363✔
696
            }
363✔
697
            else {
993✔
698
                cb();
993✔
699
            }
993✔
700
        });
1,356✔
701
    }
1,356✔
702
}
9,873✔
703

704
void Transaction::prepare_for_close()
705
{
2,861,337✔
706
    util::CheckedLockGuard lck(m_async_mutex);
2,861,337✔
707
    switch (m_async_stage) {
2,861,337✔
708
        case AsyncState::Idle:
2,868,336✔
709
            break;
2,868,336✔
710

711
        case AsyncState::Requesting:
24✔
712
            // We don't have the ability to cancel a wait on the write lock, so
12✔
713
            // unfortunately we have to wait for it to be acquired.
12✔
714
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
715
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
716
            m_waiting_for_write_lock = true;
24✔
717
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
718
                return !m_waiting_for_write_lock;
48✔
719
            });
48✔
720
            db->end_write_on_correct_thread();
24✔
721
            break;
24✔
722

723
        case AsyncState::HasLock:
30✔
724
            // We have the lock and are currently in a write transaction, and
15✔
725
            // also may have some pending previous commits to write
15✔
726
            if (m_transact_stage == DB::transact_Writing) {
30✔
727
                db->reset_free_space_tracking();
18✔
728
                m_transact_stage = DB::transact_Reading;
18✔
729
            }
18✔
730
            if (m_oldest_version_not_persisted) {
30✔
UNCOV
731
                complete_async_commit();
×
UNCOV
732
            }
×
733
            db->end_write_on_correct_thread();
30✔
734
            break;
30✔
735

736
        case AsyncState::HasCommits:
24✔
737
            // We have commits which need to be synced to disk, so do that
12✔
738
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
739
            complete_async_commit();
24✔
740
            db->end_write_on_correct_thread();
24✔
741
            break;
24✔
742

743
        case AsyncState::Syncing:
24✔
744
            // The worker thread is currently writing, so wait for it to complete
12✔
745
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
746
            m_waiting_for_sync = true;
24✔
747
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
748
                return !m_waiting_for_sync;
48✔
749
            });
48✔
750
            break;
24✔
751
    }
2,868,426✔
752
    m_async_stage = AsyncState::Idle;
2,868,426✔
753
}
2,868,426✔
754

755
void Transaction::acquire_write_lock()
756
{
383,529✔
757
    util::CheckedUniqueLock lck(m_async_mutex);
383,529✔
758
    switch (m_async_stage) {
383,529✔
759
        case AsyncState::Idle:
383,079✔
760
            lck.unlock();
383,079✔
761
            db->do_begin_possibly_async_write();
383,079✔
762
            return;
383,079✔
763

764
        case AsyncState::Requesting:
105✔
765
            m_waiting_for_write_lock = true;
105✔
766
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
210✔
767
                return !m_waiting_for_write_lock;
210✔
768
            });
210✔
769
            return;
105✔
770

UNCOV
771
        case AsyncState::HasLock:
✔
UNCOV
772
        case AsyncState::HasCommits:
✔
UNCOV
773
            return;
×
774

775
        case AsyncState::Syncing:
339✔
776
            m_waiting_for_sync = true;
339✔
777
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
678✔
778
                return !m_waiting_for_sync;
678✔
779
            });
678✔
780
            lck.unlock();
339✔
781
            db->do_begin_possibly_async_write();
339✔
782
            break;
339✔
783
    }
383,529✔
784
}
383,529✔
785

786
void Transaction::do_end_read() noexcept
787
{
2,693,184✔
788
    if (db->m_logger)
2,693,184✔
789
        db->m_logger->log(util::Logger::Level::trace, "End transaction %1", m_log_id);
852,057✔
790

1,620,549✔
791
    prepare_for_close();
2,693,184✔
792
    detach();
2,693,184✔
793

1,620,549✔
794
    // We should always be ensuring that async commits finish before we get here,
1,620,549✔
795
    // but if the fsync() failed or we failed to update the top pointer then
1,620,549✔
796
    // there's not much we can do and we have to just accept that we're losing
1,620,549✔
797
    // those commits.
1,620,549✔
798
    if (m_oldest_version_not_persisted) {
2,693,184✔
799
        REALM_ASSERT(m_async_commit_has_failed);
6✔
800
        // We need to not release our read lock on m_oldest_version_not_persisted
3✔
801
        // as that's the version the top pointer is referencing and overwriting
3✔
802
        // that version will corrupt the Realm file.
3✔
803
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
804
    }
6✔
805
    db->release_read_lock(m_read_lock);
2,693,184✔
806

1,620,549✔
807
    m_alloc.note_reader_end(this);
2,693,184✔
808
    set_transact_stage(DB::transact_Ready);
2,693,184✔
809
    // reset the std::shared_ptr to allow the DB object to release resources
1,620,549✔
810
    // as early as possible.
1,620,549✔
811
    db.reset();
2,693,184✔
812
}
2,693,184✔
813

814
// This is the same as do_end_read() above, but with the requirement that
815
// 1) This is called with the db->mutex locked already
816
// 2) No async commits outstanding
817
void Transaction::close_read_with_lock()
818
{
144✔
819
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
144✔
820
    {
144✔
821
        util::CheckedLockGuard lck(m_async_mutex);
144✔
822
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
144✔
823
    }
144✔
824

72✔
825
    detach();
144✔
826
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
144✔
827
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
144✔
828
                    m_oldest_version_not_persisted->m_file_size);
144✔
829
    db->do_release_read_lock(m_read_lock);
144✔
830

72✔
831
    m_alloc.note_reader_end(this);
144✔
832
    set_transact_stage(DB::transact_Ready);
144✔
833
    // reset the std::shared_ptr to allow the DB object to release resources
72✔
834
    // as early as possible.
72✔
835
    db.reset();
144✔
836
}
144✔
837

838

839
void Transaction::initialize_replication()
UNCOV
840
{
×
841
    if (m_transact_stage == DB::transact_Writing) {
×
UNCOV
842
        if (Replication* repl = get_replication()) {
×
UNCOV
843
            auto current_version = m_read_lock.m_version;
×
UNCOV
844
            bool history_updated = false;
×
UNCOV
845
            repl->initiate_transact(*this, current_version, history_updated); // Throws
×
UNCOV
846
        }
×
UNCOV
847
    }
×
848
}
×
849

850
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
851
{
6,193,590✔
852
    m_transact_stage = stage;
6,193,590✔
853
}
6,193,590✔
854

855
class NodeTree {
856
public:
857
    NodeTree(size_t evac_limit, size_t work_limit)
858
        : m_evac_limit(evac_limit)
859
        , m_work_limit(int64_t(work_limit))
860
        , m_moved(0)
861
    {
5,301✔
862
    }
5,301✔
863
    ~NodeTree()
864
    {
5,301✔
865
        // std::cout << "Moved: " << m_moved << std::endl;
2,643✔
866
    }
5,301✔
867

868
    /// Function used to traverse the node tree and "copy on write" nodes
869
    /// that are found above the evac_limit. The function will return
870
    /// when either the whole tree has been travesed or when the work_limit
871
    /// has been reached.
872
    /// \param current_node - node to process.
873
    /// \param level - the level at which current_node is placed in the tree
874
    /// \param progress - When the traversal is initiated, this vector identifies at which
875
    ///                   node the process should be resumed. It is subesequently updated
876
    ///                   to point to the node we have just processed
877
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
878
    {
1,260,747✔
879
        if (m_work_limit < 0) {
1,260,747✔
880
            return false;
5,217✔
881
        }
5,217✔
882
        if (current_node.is_read_only()) {
1,255,530✔
883
            size_t byte_size = current_node.get_byte_size();
1,228,599✔
884
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,228,599✔
885
                current_node.copy_on_write();
1,171,953✔
886
                m_moved++;
1,171,953✔
887
                m_work_limit -= byte_size;
1,171,953✔
888
            }
1,171,953✔
889
        }
1,228,599✔
890

627,933✔
891
        if (current_node.has_refs()) {
1,255,530✔
892
            auto sz = current_node.size();
41,460✔
893
            m_work_limit -= sz;
41,460✔
894
            if (progress.size() == level) {
41,460✔
895
                progress.push_back(0);
10,302✔
896
            }
10,302✔
897
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
41,460✔
898
            size_t ndx = progress[level];
41,460✔
899
            while (ndx < sz) {
1,279,992✔
900
                auto val = current_node.get(ndx);
1,269,750✔
901
                if (val && !(val & 1)) {
1,269,750✔
902
                    Array arr(current_node.get_alloc());
1,255,272✔
903
                    arr.set_parent(&current_node, ndx);
1,255,272✔
904
                    arr.init_from_parent();
1,255,272✔
905
                    if (!trv(arr, level + 1, progress)) {
1,255,272✔
906
                        return false;
31,218✔
907
                    }
31,218✔
908
                }
1,238,532✔
909
                ndx = ++progress[level];
1,238,532✔
910
            }
1,238,532✔
911
            while (progress.size() > level)
30,993✔
912
                progress.pop_back();
10,242✔
913
        }
10,242✔
914
        return true;
1,239,906✔
915
    }
1,255,530✔
916

917
private:
918
    size_t m_evac_limit;
919
    int64_t m_work_limit;
920
    size_t m_moved;
921
};
922

923

924
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
925
{
5,301✔
926
    NodeTree node_tree(evac_limit, work_limit);
5,301✔
927
    if (progress.empty()) {
5,301✔
928
        progress.push_back(s_table_name_ndx);
96✔
929
    }
96✔
930
    if (progress[0] == s_table_name_ndx) {
5,301✔
931
        if (!node_tree.trv(m_table_names, 1, progress))
96✔
UNCOV
932
            return;
×
933
        progress.back() = s_table_refs_ndx; // Handle tables next
96✔
934
    }
96✔
935
    if (progress[0] == s_table_refs_ndx) {
5,301✔
936
        if (!node_tree.trv(m_tables, 1, progress))
5,301✔
937
            return;
5,217✔
938
        progress.back() = s_hist_ref_ndx; // Handle history next
84✔
939
    }
84✔
940
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
2,691✔
941
        Array hist_arr(m_top.get_alloc());
84✔
942
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
84✔
943
        hist_arr.init_from_parent();
84✔
944
        if (!node_tree.trv(hist_arr, 1, progress))
84✔
UNCOV
945
            return;
×
946
    }
84✔
947
    progress.clear();
84✔
948
}
84✔
949

950
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc