• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / github_pull_request_281750

30 Oct 2023 03:37PM UTC coverage: 90.528% (-1.0%) from 91.571%
github_pull_request_281750

Pull #6073

Evergreen

jedelbo
Log free space and history sizes when opening file
Pull Request #6073: Merge next-major

95488 of 175952 branches covered (0.0%)

8973 of 12277 new or added lines in 149 files covered. (73.09%)

622 existing lines in 51 files now uncovered.

233503 of 257934 relevant lines covered (90.53%)

6533720.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.7
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26

27
namespace {
28

29
using namespace realm;
30
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
31

32
ColInfo get_col_info(const Table* table)
33
{
2,034✔
34
    std::vector<std::pair<ColKey, Table*>> cols;
2,034✔
35
    if (table) {
2,034✔
36
        for (auto col : table->get_column_keys()) {
666✔
37
            Table* embedded_table = nullptr;
666✔
38
            if (auto target_table = table->get_opposite_table(col)) {
666✔
39
                if (target_table->is_embedded())
216✔
40
                    embedded_table = target_table.unchecked_ptr();
168✔
41
            }
216✔
42
            cols.emplace_back(col, embedded_table);
666✔
43
        }
666✔
44
    }
282✔
45
    return cols;
2,034✔
46
}
2,034✔
47

48
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded);
49
void add_dictionary_to_repl(Dictionary& dict, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
50
{
114✔
51
    size_t sz = dict.size();
114✔
52
    for (size_t n = 0; n < sz; ++n) {
222✔
53
        const auto& [key, val] = dict.get_pair(n);
108✔
54
        repl.dictionary_insert(dict, n, key, val);
108✔
55
        if (val.is_type(type_List)) {
108✔
56
            auto n_list = dict.get_list({key.get_string()});
6✔
57
            add_list_to_repl(*n_list, *dict.get_table()->get_repl(), nullptr);
6✔
58
        }
6✔
59
        else if (val.is_type(type_Dictionary)) {
102✔
60
            repl.dictionary_insert(dict, n, key, val);
6✔
61
            auto n_dict = dict.get_dictionary({key.get_string()});
6✔
62
            add_dictionary_to_repl(*n_dict, *dict.get_table()->get_repl(), nullptr);
6✔
63
        }
6✔
64
        else if (update_embedded) {
96✔
65
            update_embedded(val);
42✔
66
        }
42✔
67
    }
108✔
68
}
114✔
69

70
void add_list_to_repl(CollectionBase& list, Replication& repl, util::UniqueFunction<void(Mixed)> update_embedded)
71
{
114✔
72
    auto sz = list.size();
114✔
73
    for (size_t n = 0; n < sz; n++) {
252✔
74
        auto val = list.get_any(n);
138✔
75
        repl.list_insert(list, n, val, n);
138✔
76
        if (val.is_type(type_List)) {
138✔
77
            auto n_list = list.get_list({n});
6✔
78
            add_list_to_repl(*n_list, *list.get_table()->get_repl(), nullptr);
6✔
79
        }
6✔
80
        else if (val.is_type(type_Dictionary)) {
132✔
81
            auto n_dict = list.get_dictionary({n});
6✔
82
            add_dictionary_to_repl(*n_dict, *list.get_table()->get_repl(), nullptr);
6✔
83
        }
6✔
84
        else if (update_embedded) {
126✔
85
            update_embedded(val);
18✔
86
        }
18✔
87
    }
138✔
88
}
114✔
89

90
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
91
{
846✔
92
    for (auto elem : cols) {
1,908✔
93
        auto col = elem.first;
1,908✔
94
        auto embedded_table = elem.second;
1,908✔
95
        auto cols_2 = get_col_info(embedded_table);
1,908✔
96
        util::UniqueFunction<void(Mixed)> update_embedded = nullptr;
1,908✔
97
        if (embedded_table) {
1,908✔
98
            update_embedded = [&](Mixed val) {
135✔
99
                if (val.is_null()) {
114✔
100
                    return;
18✔
101
                }
18✔
102
                REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
103
                Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
104
                generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
105
                return;
96✔
106
            };
96✔
107
        }
156✔
108

954✔
109
        if (col.is_list()) {
1,908✔
110
            auto list = obj.get_listbase_ptr(col);
96✔
111
            repl.list_clear(*list);
96✔
112
            add_list_to_repl(*list, repl, std::move(update_embedded));
96✔
113
        }
96✔
114
        else if (col.is_set()) {
1,812✔
115
            auto set = obj.get_setbase_ptr(col);
18✔
116
            auto sz = set->size();
18✔
117
            for (size_t n = 0; n < sz; n++) {
54✔
118
                repl.set_insert(*set, n, set->get_any(n));
36✔
119
                // Sets cannot have embedded objects
18✔
120
            }
36✔
121
        }
18✔
122
        else if (col.is_dictionary()) {
1,794✔
123
            auto dict = obj.get_dictionary(col);
96✔
124
            add_dictionary_to_repl(dict, repl, std::move(update_embedded));
96✔
125
        }
96✔
126
        else {
1,698✔
127
            auto val = obj.get_any(col);
1,698✔
128
            if (val.is_type(type_List)) {
1,698✔
129
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::List));
6✔
130
                Lst<Mixed> list(obj, col);
6✔
131
                add_list_to_repl(list, repl, std::move(update_embedded));
6✔
132
            }
6✔
133
            else if (val.is_type(type_Dictionary)) {
1,692✔
134
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), Mixed(0, CollectionType::Dictionary));
6✔
135
                Dictionary dict(obj, col);
6✔
136
                add_dictionary_to_repl(dict, repl, std::move(update_embedded));
6✔
137
            }
6✔
138
            else {
1,686✔
139
                repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,686✔
140
                if (update_embedded)
1,686✔
141
                    update_embedded(val);
54✔
142
            }
1,686✔
143
        }
1,698✔
144
    }
1,908✔
145
}
846✔
146

147
} // namespace
148

149
namespace realm {
150

151
std::map<DB::TransactStage, const char*> log_stage = {
152
    {DB::TransactStage::transact_Frozen, "frozen"},
153
    {DB::TransactStage::transact_Writing, "write"},
154
    {DB::TransactStage::transact_Reading, "read"},
155
};
156

157
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
158
    : Group(alloc)
159
    , db(_db)
160
    , m_read_lock(rli)
161
    , m_log_id(util::gen_log_id(this))
162
{
3,084,264✔
163
    bool writable = stage == DB::transact_Writing;
3,084,264✔
164
    m_transact_stage = DB::transact_Ready;
3,084,264✔
165
    set_transact_stage(stage);
3,084,264✔
166
    m_alloc.note_reader_start(this);
3,084,264✔
167
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
3,084,264✔
168
                  VersionID{rli.m_version, rli.m_reader_idx});
3,084,264✔
169
    if (db->m_logger) {
3,084,264✔
170
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "Start %1 %2: %3 ref %4",
867,240✔
171
                          log_stage[stage], m_log_id, rli.m_version, m_read_lock.m_top_ref);
867,240✔
172
    }
867,240✔
173
}
3,084,264✔
174

175
Transaction::~Transaction()
176
{
3,085,632✔
177
    // Note that this does not call close() - calling close() is done
1,979,913✔
178
    // implicitly by the deleter.
1,979,913✔
179
}
3,085,632✔
180

181
void Transaction::close()
182
{
3,094,215✔
183
    if (m_transact_stage == DB::transact_Writing) {
3,094,215✔
184
        rollback();
9,996✔
185
    }
9,996✔
186
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
3,094,215✔
187
        do_end_read();
1,993,023✔
188
    }
1,993,023✔
189
}
3,094,215✔
190

191
size_t Transaction::get_commit_size() const
192
{
41,958✔
193
    size_t sz = 0;
41,958✔
194
    if (m_transact_stage == DB::transact_Writing) {
41,958✔
195
        sz = m_alloc.get_commit_size();
12,057✔
196
    }
12,057✔
197
    return sz;
41,958✔
198
}
41,958✔
199

200
DB::version_type Transaction::commit()
201
{
973,125✔
202
    check_attached();
973,125✔
203

493,209✔
204
    if (m_transact_stage != DB::transact_Writing)
973,125✔
205
        throw WrongTransactionState("Not a write transaction");
×
206

493,209✔
207
    REALM_ASSERT(is_attached());
973,125✔
208

493,209✔
209
    // before committing, allow any accessors at group level or below to sync
493,209✔
210
    flush_accessors_for_commit();
973,125✔
211

493,209✔
212
    DB::version_type new_version = db->do_commit(*this); // Throws
973,125✔
213

493,209✔
214
    // We need to set m_read_lock in order for wait_for_change to work.
493,209✔
215
    // To set it, we grab a readlock on the latest available snapshot
493,209✔
216
    // and release it again.
493,209✔
217
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
973,125✔
218
    db->release_read_lock(lock_after_commit);
973,125✔
219

493,209✔
220
    db->end_write_on_correct_thread();
973,125✔
221

493,209✔
222
    do_end_read();
973,125✔
223
    m_read_lock = lock_after_commit;
973,125✔
224

493,209✔
225
    return new_version;
973,125✔
226
}
973,125✔
227

228
void Transaction::rollback()
229
{
10,914✔
230
    // rollback may happen as a consequence of exception handling in cases where
5,463✔
231
    // the DB has detached. If so, just back out without trying to change state.
5,463✔
232
    // the DB object has already been closed and no further processing is possible.
5,463✔
233
    if (!is_attached())
10,914✔
234
        return;
6✔
235
    if (m_transact_stage == DB::transact_Ready)
10,908✔
236
        return; // Idempotency
×
237

5,460✔
238
    if (m_transact_stage != DB::transact_Writing)
10,908✔
239
        throw WrongTransactionState("Not a write transaction");
×
240
    db->reset_free_space_tracking();
10,908✔
241
    if (!holds_write_mutex())
10,908✔
242
        db->end_write_on_correct_thread();
10,908✔
243

5,460✔
244
    do_end_read();
10,908✔
245
}
10,908✔
246

247
void Transaction::end_read()
248
{
98,034✔
249
    if (m_transact_stage == DB::transact_Ready)
98,034✔
250
        return;
6✔
251
    if (m_transact_stage == DB::transact_Writing)
98,028✔
252
        throw WrongTransactionState("Illegal end_read when in write mode");
×
253
    do_end_read();
98,028✔
254
}
98,028✔
255

256
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
257
{
388,854✔
258
    check_attached();
388,854✔
259
    if (m_transact_stage != DB::transact_Writing)
388,854✔
260
        throw WrongTransactionState("Not a write transaction");
×
261

193,599✔
262
    flush_accessors_for_commit();
388,854✔
263

193,599✔
264
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
388,854✔
265

193,599✔
266
    // advance read lock but dont update accessors:
193,599✔
267
    // As this is done under lock, along with the addition above of the newest commit,
193,599✔
268
    // we know for certain that the read lock we will grab WILL refer to our own newly
193,599✔
269
    // completed commit.
193,599✔
270

193,599✔
271
    try {
388,854✔
272
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
193,599✔
273
        // from going shortly to zero
193,599✔
274
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
388,854✔
275

193,599✔
276
        m_history = nullptr;
388,854✔
277
        set_transact_stage(DB::transact_Reading);
388,854✔
278

193,599✔
279
        if (commit_to_disk || m_oldest_version_not_persisted) {
388,854✔
280
            // Here we are either committing to disk or we are already
193,275✔
281
            // holding on to an older version. In either case there is
193,275✔
282
            // no need to hold onto this now historic version.
193,275✔
283
            db->release_read_lock(m_read_lock);
387,459✔
284
        }
387,459✔
285
        else {
1,395✔
286
            // We are not commiting to disk and there is no older
324✔
287
            // version not persisted, so hold onto this one
324✔
288
            m_oldest_version_not_persisted = m_read_lock;
1,395✔
289
        }
1,395✔
290

193,599✔
291
        if (commit_to_disk && m_oldest_version_not_persisted) {
388,854✔
292
            // We are committing to disk so we can release the
6✔
293
            // version we are holding on to
6✔
294
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
295
            m_oldest_version_not_persisted.reset();
12✔
296
        }
12✔
297
        m_read_lock = new_read_lock;
388,854✔
298
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
193,599✔
299
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
193,599✔
300
        // or older and former m_read_lock is older than current m_read_lock
193,599✔
301
        REALM_ASSERT(!m_oldest_version_not_persisted ||
388,854✔
302
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
388,854✔
303

193,599✔
304
        {
388,854✔
305
            util::CheckedLockGuard lock(m_async_mutex);
388,854✔
306
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
388,854✔
307
            if (commit_to_disk) {
388,854✔
308
                if (m_async_stage == AsyncState::Requesting) {
381,321✔
309
                    m_async_stage = AsyncState::HasLock;
×
310
                }
×
311
                else {
381,321✔
312
                    db->end_write_on_correct_thread();
381,321✔
313
                    m_async_stage = AsyncState::Idle;
381,321✔
314
                }
381,321✔
315
            }
381,321✔
316
            else {
7,533✔
317
                m_async_stage = AsyncState::HasCommits;
7,533✔
318
            }
7,533✔
319
        }
388,854✔
320

193,599✔
321
        // Remap file if it has grown, and update refs in underlying node structure.
193,599✔
322
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
388,854✔
323
        return VersionID{version, new_read_lock.m_reader_idx};
388,854✔
324
    }
388,854✔
325
    catch (std::exception& e) {
×
326
        if (db->m_logger) {
×
NEW
327
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
×
NEW
328
                              "Tr %1: Commit failed with exception: \"%2\"", m_log_id, e.what());
×
UNCOV
329
        }
×
330
        // In case of failure, further use of the transaction for reading is unsafe
331
        set_transact_stage(DB::transact_Ready);
×
332
        throw;
×
333
    }
×
334
}
388,854✔
335

336
VersionID Transaction::commit_and_continue_writing()
337
{
318✔
338
    check_attached();
318✔
339
    if (m_transact_stage != DB::transact_Writing)
318✔
340
        throw WrongTransactionState("Not a write transaction");
×
341

159✔
342
    // before committing, allow any accessors at group level or below to sync
159✔
343
    flush_accessors_for_commit();
318✔
344

159✔
345
    DB::version_type version = db->do_commit(*this); // Throws
318✔
346

159✔
347
    // We need to set m_read_lock in order for wait_for_change to work.
159✔
348
    // To set it, we grab a readlock on the latest available snapshot
159✔
349
    // and release it again.
159✔
350
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
318✔
351
    db->release_read_lock(m_read_lock);
318✔
352
    m_read_lock = lock_after_commit;
318✔
353
    if (Replication* repl = db->get_replication()) {
318✔
354
        bool history_updated = false;
318✔
355
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
318✔
356
    }
318✔
357

159✔
358
    bool writable = true;
318✔
359
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
318✔
360
    return VersionID{version, lock_after_commit.m_reader_idx};
318✔
361
}
318✔
362

363
TransactionRef Transaction::freeze()
364
{
6,042✔
365
    if (m_transact_stage != DB::transact_Reading)
6,042✔
366
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
367
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
368
    return db->start_frozen(version);
6,036✔
369
}
6,036✔
370

371
TransactionRef Transaction::duplicate()
372
{
51,063✔
373
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
51,063✔
374
    switch (m_transact_stage) {
51,063✔
375
        case DB::transact_Ready:
✔
376
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
377
        case DB::transact_Reading:
51,051✔
378
            return db->start_read(version);
51,051✔
379
        case DB::transact_Frozen:
✔
380
            return db->start_frozen(version);
×
381
        case DB::transact_Writing:
12✔
382
            if (get_commit_size() != 0)
12✔
383
                throw WrongTransactionState(
×
384
                    "Can only duplicate a write transaction before any changes have been made.");
×
385
            return db->start_read(version);
12✔
386
    }
×
387
    REALM_UNREACHABLE();
×
388
}
×
389

390
void Transaction::copy_to(TransactionRef dest) const
391
{
42✔
392
    _impl::CopyReplication repl(dest);
42✔
393
    replicate(dest.get(), repl);
42✔
394
}
42✔
395

396
_impl::History* Transaction::get_history() const
397
{
6,714,153✔
398
    if (!m_history) {
6,714,153✔
399
        if (auto repl = db->get_replication()) {
1,134,489✔
400
            switch (m_transact_stage) {
1,117,023✔
401
                case DB::transact_Reading:
145,206✔
402
                case DB::transact_Frozen:
145,206✔
403
                    if (!m_history_read)
145,206✔
404
                        m_history_read = repl->_create_history_read();
142,512✔
405
                    m_history = m_history_read.get();
145,206✔
406
                    m_history->set_group(const_cast<Transaction*>(this), false);
145,206✔
407
                    break;
145,206✔
408
                case DB::transact_Writing:
971,826✔
409
                    m_history = repl->_get_history_write();
971,826✔
410
                    break;
971,826✔
411
                case DB::transact_Ready:
71,757✔
412
                    break;
×
413
            }
6,714,147✔
414
        }
6,714,147✔
415
    }
1,134,489✔
416
    return m_history;
6,714,147✔
417
}
6,714,147✔
418

419
Obj Transaction::import_copy_of(const Obj& original)
420
{
18,966✔
421
    if (bool(original) && original.is_valid()) {
18,966✔
422
        TableKey tk = original.get_table_key();
18,936✔
423
        ObjKey rk = original.get_key();
18,936✔
424
        auto table = get_table(tk);
18,936✔
425
        if (table->is_valid(rk))
18,936✔
426
            return table->get_object(rk);
18,378✔
427
    }
588✔
428
    return {};
588✔
429
}
588✔
430

431
TableRef Transaction::import_copy_of(ConstTableRef original)
432
{
233,598✔
433
    TableKey tk = original->get_key();
233,598✔
434
    return get_table(tk);
233,598✔
435
}
233,598✔
436

437
LnkLst Transaction::import_copy_of(const LnkLst& original)
438
{
×
439
    if (Obj obj = import_copy_of(original.get_obj())) {
×
440
        ColKey ck = original.get_col_key();
×
441
        return obj.get_linklist(ck);
×
442
    }
×
443
    return LnkLst();
×
444
}
×
445

446
LstBasePtr Transaction::import_copy_of(const LstBase& original)
447
{
12✔
448
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
449
        ColKey ck = original.get_col_key();
6✔
450
        return obj.get_listbase_ptr(ck);
6✔
451
    }
6✔
452
    return {};
6✔
453
}
6✔
454

455
SetBasePtr Transaction::import_copy_of(const SetBase& original)
456
{
×
457
    if (Obj obj = import_copy_of(original.get_obj())) {
×
458
        ColKey ck = original.get_col_key();
×
459
        return obj.get_setbase_ptr(ck);
×
460
    }
×
461
    return {};
×
462
}
×
463

464
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
465
{
11,568✔
466
    if (Obj obj = import_copy_of(original.get_obj())) {
11,568✔
467
        auto path = original.get_short_path();
11,040✔
468
        return std::static_pointer_cast<CollectionBase>(obj.get_collection_ptr(path));
11,040✔
469
    }
11,040✔
470
    return {};
528✔
471
}
528✔
472

473
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
474
{
6✔
475
    if (!bool(original))
6✔
476
        return nullptr;
×
477
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
478
        ColKey ck = original->get_col_key();
6✔
479
        return obj.get_linklist_ptr(ck);
6✔
480
    }
6✔
481
    return std::make_unique<LnkLst>();
×
482
}
×
483

484
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
485
{
×
486
    if (!original)
×
487
        return nullptr;
×
488
    if (Obj obj = import_copy_of(original->get_obj())) {
×
489
        ColKey ck = original->get_col_key();
×
490
        return obj.get_linkset_ptr(ck);
×
491
    }
×
492
    return std::make_unique<LnkSet>();
×
493
}
×
494

495
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
496
{
58,902✔
497
    if (!original)
58,902✔
498
        return nullptr;
58,206✔
499
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
500
        ColKey ck = original->get_owning_col_key();
654✔
501
        return obj.get_linkcollection_ptr(ck);
654✔
502
    }
654✔
503
    // return some empty collection where size() == 0
21✔
504
    // the type shouldn't matter
21✔
505
    return std::make_unique<LnkLst>();
42✔
506
}
42✔
507

508
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
509
{
59,526✔
510
    return query.clone_for_handover(this, policy);
59,526✔
511
}
59,526✔
512

513
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
514
{
39,552✔
515
    return tv.clone_for_handover(this, policy);
39,552✔
516
}
39,552✔
517

518
void Transaction::upgrade_file_format(int target_file_format_version)
519
{
156✔
520
    REALM_ASSERT(is_attached());
156✔
521
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
156✔
522
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
15✔
523
        return;
30✔
524
    }
30✔
525

63✔
526
    // Be sure to revisit the following upgrade logic when a new file format
63✔
527
    // version is introduced. The following assert attempt to help you not
63✔
528
    // forget it.
63✔
529
    REALM_ASSERT_EX(target_file_format_version == 24, target_file_format_version);
126✔
530

63✔
531
    // DB::do_open() must ensure that only supported version are allowed.
63✔
532
    // It does that by asking backup if the current file format version is
63✔
533
    // included in the accepted versions, so be sure to align the list of
63✔
534
    // versions with the logic below
63✔
535

63✔
536
    int current_file_format_version = get_file_format_version();
126✔
537
    REALM_ASSERT(current_file_format_version < target_file_format_version);
126✔
538

63✔
539
    if (auto logger = get_logger()) {
126✔
540
        logger->info("Upgrading from file format version %1 to %2", current_file_format_version,
36✔
541
                     target_file_format_version);
36✔
542
    }
36✔
543
    // Ensure we have search index on all primary key columns.
63✔
544
    auto table_keys = get_table_keys();
126✔
545
    if (current_file_format_version < 22) {
126✔
546
        for (auto k : table_keys) {
114✔
547
            auto t = get_table(k);
114✔
548
            if (auto col = t->get_primary_key_column()) {
114✔
549
                t->do_add_search_index(col, IndexType::General);
78✔
550
            }
78✔
551
        }
114✔
552
    }
48✔
553

63✔
554
    if (current_file_format_version == 22) {
126✔
555
        // Check that asymmetric table are empty
33✔
556
        for (auto k : table_keys) {
180✔
557
            auto t = get_table(k);
180✔
558
            if (t->is_asymmetric() && t->size() > 0) {
180✔
559
                t->clear();
6✔
560
            }
6✔
561
        }
180✔
562
    }
66✔
563
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
126✔
564
        // Upgrade Set and Dictionary columns
33✔
565
        for (auto k : table_keys) {
180✔
566
            auto t = get_table(k);
180✔
567
            t->migrate_sets_and_dictionaries();
180✔
568
        }
180✔
569
    }
66✔
570
    if (current_file_format_version < 24) {
126✔
571
        for (auto k : table_keys) {
354✔
572
            auto t = get_table(k);
354✔
573
            t->migrate_set_orderings(); // rewrite sets to use the new string/binary order
354✔
574
            // Although StringIndex sort order has been changed in this format, we choose to
177✔
575
            // avoid upgrading them because it affects a small niche case. Instead, there is a
177✔
576
            // workaround in the String Index search code for not relying on items being ordered.
177✔
577
        }
354✔
578
    }
126✔
579
    // NOTE: Additional future upgrade steps go here.
63✔
580
}
126✔
581

582
void Transaction::promote_to_async()
583
{
7,536✔
584
    util::CheckedLockGuard lck(m_async_mutex);
7,536✔
585
    if (m_async_stage == AsyncState::Idle) {
7,536✔
586
        m_async_stage = AsyncState::HasLock;
366✔
587
    }
366✔
588
}
7,536✔
589

590
void Transaction::replicate(Transaction* dest, Replication& repl) const
591
{
78✔
592
    // We should only create entries for public tables
39✔
593
    std::vector<TableKey> public_table_keys;
78✔
594
    for (auto tk : get_table_keys()) {
222✔
595
        if (table_is_public(tk))
222✔
596
            public_table_keys.push_back(tk);
180✔
597
    }
222✔
598

39✔
599
    // Create tables
39✔
600
    for (auto tk : public_table_keys) {
180✔
601
        auto table = get_table(tk);
180✔
602
        auto table_name = table->get_name();
180✔
603
        if (!table->is_embedded()) {
180✔
604
            auto pk_col = table->get_primary_key_column();
138✔
605
            if (!pk_col)
138✔
606
                throw RuntimeError(
×
607
                    ErrorCodes::BrokenInvariant,
×
608
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
609
            auto pk_name = table->get_column_name(pk_col);
138✔
610
            if (pk_name != "_id")
138✔
611
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
612
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
613
                                                Group::table_name_to_class_name(table_name), pk_name));
×
614
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
615
                                            pk_col.is_nullable(), table->get_table_type());
138✔
616
        }
138✔
617
        else {
42✔
618
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
619
        }
42✔
620
    }
180✔
621
    // Create columns
39✔
622
    for (auto tk : public_table_keys) {
180✔
623
        auto table = get_table(tk);
180✔
624
        auto pk_col = table->get_primary_key_column();
180✔
625
        auto cols = table->get_column_keys();
180✔
626
        for (auto col : cols) {
462✔
627
            if (col == pk_col)
462✔
628
                continue;
138✔
629
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
324✔
630
                               table->get_opposite_table(col).unchecked_ptr());
324✔
631
        }
324✔
632
    }
180✔
633
    dest->commit_and_continue_writing();
78✔
634
    // Now the schema should be in place - create the objects
39✔
635
#ifdef REALM_DEBUG
78✔
636
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
637
#else
638
    constexpr int number_of_objects_to_create_before_committing = 1000;
639
#endif
640
    auto n = number_of_objects_to_create_before_committing;
78✔
641
    for (auto tk : public_table_keys) {
168✔
642
        auto table = get_table(tk);
168✔
643
        if (table->is_embedded())
168✔
644
            continue;
42✔
645
        auto pk_col = table->get_primary_key_column();
126✔
646
        auto cols = get_col_info(table.unchecked_ptr());
126✔
647
        for (auto o : *table) {
750✔
648
            auto obj_key = o.get_key();
750✔
649
            Mixed pk = o.get_any(pk_col);
750✔
650
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
651
            generate_properties_for_obj(repl, o, cols);
750✔
652
            if (--n == 0) {
750✔
653
                dest->commit_and_continue_writing();
6✔
654
                n = number_of_objects_to_create_before_committing;
6✔
655
            }
6✔
656
        }
750✔
657
    }
126✔
658
}
78✔
659

660
void Transaction::complete_async_commit()
661
{
1,380✔
662
    // sync to disk:
318✔
663
    DB::ReadLockInfo read_lock;
1,380✔
664
    try {
1,380✔
665
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,380✔
666
        if (db->m_logger) {
1,380✔
667
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace,
1,380✔
668
                              "Tr %1: Committing ref %2 to disk", m_log_id, read_lock.m_top_ref);
1,380✔
669
        }
1,380✔
670
        GroupCommitter out(*this);
1,380✔
671
        out.commit(read_lock.m_top_ref); // Throws
1,380✔
672
        // we must release the write mutex before the callback, because the callback
318✔
673
        // is allowed to re-request it.
318✔
674
        db->release_read_lock(read_lock);
1,380✔
675
        if (m_oldest_version_not_persisted) {
1,380✔
676
            db->release_read_lock(*m_oldest_version_not_persisted);
1,374✔
677
            m_oldest_version_not_persisted.reset();
1,374✔
678
        }
1,374✔
679
    }
1,380✔
680
    catch (const std::exception& e) {
321✔
681
        m_commit_exception = std::current_exception();
6✔
682
        if (db->m_logger) {
6✔
683
            db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::error,
6✔
684
                              "Tr %1: Committing to disk failed with exception: \"%2\"", m_log_id, e.what());
6✔
685
        }
6✔
686
        m_async_commit_has_failed = true;
6✔
687
        db->release_read_lock(read_lock);
6✔
688
    }
6✔
689
}
1,380✔
690

691
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
692
{
9,897✔
693
    util::CheckedLockGuard lck(m_async_mutex);
9,897✔
694
    if (m_async_stage == AsyncState::HasLock) {
9,897✔
695
        // Nothing to commit to disk - just release write lock
27✔
696
        m_async_stage = AsyncState::Idle;
54✔
697
        db->async_end_write();
54✔
698
    }
54✔
699
    else if (m_async_stage == AsyncState::HasCommits) {
9,843✔
700
        m_async_stage = AsyncState::Syncing;
1,356✔
701
        m_commit_exception = std::exception_ptr();
1,356✔
702
        // get a callback on the helper thread, in which to sync to disk
306✔
703
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,356✔
704
            complete_async_commit();
1,356✔
705
            util::CheckedLockGuard lck(m_async_mutex);
1,356✔
706
            m_async_stage = AsyncState::Idle;
1,356✔
707
            if (m_waiting_for_sync) {
1,356✔
708
                m_waiting_for_sync = false;
366✔
709
                m_async_cv.notify_all();
366✔
710
            }
366✔
711
            else {
990✔
712
                cb();
990✔
713
            }
990✔
714
        });
1,356✔
715
    }
1,356✔
716
}
9,897✔
717

718
void Transaction::prepare_for_close()
719
{
3,240,381✔
720
    util::CheckedLockGuard lck(m_async_mutex);
3,240,381✔
721
    switch (m_async_stage) {
3,240,381✔
722
        case AsyncState::Idle:
3,251,763✔
723
            break;
3,251,763✔
724

725
        case AsyncState::Requesting:
24✔
726
            // We don't have the ability to cancel a wait on the write lock, so
12✔
727
            // unfortunately we have to wait for it to be acquired.
12✔
728
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
729
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
730
            m_waiting_for_write_lock = true;
24✔
731
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
732
                return !m_waiting_for_write_lock;
48✔
733
            });
48✔
734
            db->end_write_on_correct_thread();
24✔
735
            break;
24✔
736

737
        case AsyncState::HasLock:
30✔
738
            // We have the lock and are currently in a write transaction, and
15✔
739
            // also may have some pending previous commits to write
15✔
740
            if (m_transact_stage == DB::transact_Writing) {
30✔
741
                db->reset_free_space_tracking();
18✔
742
                m_transact_stage = DB::transact_Reading;
18✔
743
            }
18✔
744
            if (m_oldest_version_not_persisted) {
30✔
745
                complete_async_commit();
×
746
            }
×
747
            db->end_write_on_correct_thread();
30✔
748
            break;
30✔
749

750
        case AsyncState::HasCommits:
24✔
751
            // We have commits which need to be synced to disk, so do that
12✔
752
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
753
            complete_async_commit();
24✔
754
            db->end_write_on_correct_thread();
24✔
755
            break;
24✔
756

757
        case AsyncState::Syncing:
24✔
758
            // The worker thread is currently writing, so wait for it to complete
12✔
759
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
760
            m_waiting_for_sync = true;
24✔
761
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
762
                return !m_waiting_for_sync;
48✔
763
            });
48✔
764
            break;
24✔
765
    }
3,251,937✔
766
    m_async_stage = AsyncState::Idle;
3,251,937✔
767
}
3,251,937✔
768

769
void Transaction::acquire_write_lock()
770
{
383,292✔
771
    util::CheckedUniqueLock lck(m_async_mutex);
383,292✔
772
    switch (m_async_stage) {
383,292✔
773
        case AsyncState::Idle:
382,848✔
774
            lck.unlock();
382,848✔
775
            db->do_begin_possibly_async_write();
382,848✔
776
            return;
382,848✔
777

778
        case AsyncState::Requesting:
105✔
779
            m_waiting_for_write_lock = true;
105✔
780
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
210✔
781
                return !m_waiting_for_write_lock;
210✔
782
            });
210✔
783
            return;
105✔
784

785
        case AsyncState::HasLock:
✔
786
        case AsyncState::HasCommits:
✔
787
            return;
×
788

789
        case AsyncState::Syncing:
342✔
790
            m_waiting_for_sync = true;
342✔
791
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
684✔
792
                return !m_waiting_for_sync;
684✔
793
            });
684✔
794
            lck.unlock();
342✔
795
            db->do_begin_possibly_async_write();
342✔
796
            break;
342✔
797
    }
383,292✔
798
}
383,292✔
799

800
void Transaction::do_end_read() noexcept
801
{
3,076,377✔
802
    if (db->m_logger)
3,076,377✔
803
        db->m_logger->log(util::LogCategory::transaction, util::Logger::Level::trace, "End transaction %1", m_log_id);
867,147✔
804

1,970,796✔
805
    prepare_for_close();
3,076,377✔
806
    detach();
3,076,377✔
807

1,970,796✔
808
    // We should always be ensuring that async commits finish before we get here,
1,970,796✔
809
    // but if the fsync() failed or we failed to update the top pointer then
1,970,796✔
810
    // there's not much we can do and we have to just accept that we're losing
1,970,796✔
811
    // those commits.
1,970,796✔
812
    if (m_oldest_version_not_persisted) {
3,076,377✔
813
        REALM_ASSERT(m_async_commit_has_failed);
6✔
814
        // We need to not release our read lock on m_oldest_version_not_persisted
3✔
815
        // as that's the version the top pointer is referencing and overwriting
3✔
816
        // that version will corrupt the Realm file.
3✔
817
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
818
    }
6✔
819
    db->release_read_lock(m_read_lock);
3,076,377✔
820

1,970,796✔
821
    m_alloc.note_reader_end(this);
3,076,377✔
822
    set_transact_stage(DB::transact_Ready);
3,076,377✔
823
    // reset the std::shared_ptr to allow the DB object to release resources
1,970,796✔
824
    // as early as possible.
1,970,796✔
825
    db.reset();
3,076,377✔
826
}
3,076,377✔
827

828
// This is the same as do_end_read() above, but with the requirement that
829
// 1) This is called with the db->mutex locked already
830
// 2) No async commits outstanding
831
void Transaction::close_read_with_lock()
832
{
144✔
833
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
144✔
834
    {
144✔
835
        util::CheckedLockGuard lck(m_async_mutex);
144✔
836
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
144✔
837
    }
144✔
838

72✔
839
    detach();
144✔
840
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
144✔
841
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
144✔
842
                    m_oldest_version_not_persisted->m_file_size);
144✔
843
    db->do_release_read_lock(m_read_lock);
144✔
844

72✔
845
    m_alloc.note_reader_end(this);
144✔
846
    set_transact_stage(DB::transact_Ready);
144✔
847
    // reset the std::shared_ptr to allow the DB object to release resources
72✔
848
    // as early as possible.
72✔
849
    db.reset();
144✔
850
}
144✔
851

852

853
void Transaction::initialize_replication()
UNCOV
854
{
×
UNCOV
855
    if (m_transact_stage == DB::transact_Writing) {
×
UNCOV
856
        if (Replication* repl = get_replication()) {
×
UNCOV
857
            auto current_version = m_read_lock.m_version;
×
UNCOV
858
            bool history_updated = false;
×
UNCOV
859
            repl->initiate_transact(*this, current_version, history_updated); // Throws
×
UNCOV
860
        }
×
UNCOV
861
    }
×
UNCOV
862
}
×
863

864
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
865
{
6,958,287✔
866
    m_transact_stage = stage;
6,958,287✔
867
}
6,958,287✔
868

869
class NodeTree {
870
public:
871
    NodeTree(size_t evac_limit, size_t work_limit)
872
        : m_evac_limit(evac_limit)
873
        , m_work_limit(int64_t(work_limit))
874
        , m_moved(0)
875
    {
5,325✔
876
    }
5,325✔
877
    ~NodeTree()
878
    {
5,325✔
879
        // std::cout << "Moved: " << m_moved << std::endl;
2,643✔
880
    }
5,325✔
881

882
    /// Function used to traverse the node tree and "copy on write" nodes
883
    /// that are found above the evac_limit. The function will return
884
    /// when either the whole tree has been travesed or when the work_limit
885
    /// has been reached.
886
    /// \param current_node - node to process.
887
    /// \param level - the level at which current_node is placed in the tree
888
    /// \param progress - When the traversal is initiated, this vector identifies at which
889
    ///                   node the process should be resumed. It is subesequently updated
890
    ///                   to point to the node we have just processed
891
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
892
    {
1,260,972✔
893
        if (m_work_limit < 0) {
1,260,972✔
894
            return false;
5,220✔
895
        }
5,220✔
896
        if (current_node.is_read_only()) {
1,255,752✔
897
            size_t byte_size = current_node.get_byte_size();
1,228,653✔
898
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,228,653✔
899
                current_node.copy_on_write();
1,171,668✔
900
                m_moved++;
1,171,668✔
901
                m_work_limit -= byte_size;
1,171,668✔
902
            }
1,171,668✔
903
        }
1,228,653✔
904

627,732✔
905
        if (current_node.has_refs()) {
1,255,752✔
906
            auto sz = current_node.size();
41,616✔
907
            m_work_limit -= sz;
41,616✔
908
            if (progress.size() == level) {
41,616✔
909
                progress.push_back(0);
10,428✔
910
            }
10,428✔
911
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
41,616✔
912
            size_t ndx = progress[level];
41,616✔
913
            while (ndx < sz) {
1,280,478✔
914
                auto val = current_node.get(ndx);
1,270,110✔
915
                if (val && !(val & 1)) {
1,270,110✔
916
                    Array arr(current_node.get_alloc());
1,255,455✔
917
                    arr.set_parent(&current_node, ndx);
1,255,455✔
918
                    arr.init_from_parent();
1,255,455✔
919
                    if (!trv(arr, level + 1, progress)) {
1,255,455✔
920
                        return false;
31,248✔
921
                    }
31,248✔
922
                }
1,238,862✔
923
                ndx = ++progress[level];
1,238,862✔
924
            }
1,238,862✔
925
            while (progress.size() > level)
31,293✔
926
                progress.pop_back();
10,368✔
927
        }
10,368✔
928
        return true;
1,240,128✔
929
    }
1,255,752✔
930

931
private:
932
    size_t m_evac_limit;
933
    int64_t m_work_limit;
934
    size_t m_moved;
935
};
936

937

938
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
939
{
5,325✔
940
    NodeTree node_tree(evac_limit, work_limit);
5,325✔
941
    if (progress.empty()) {
5,325✔
942
        progress.push_back(s_table_name_ndx);
117✔
943
    }
117✔
944
    if (progress[0] == s_table_name_ndx) {
5,325✔
945
        if (!node_tree.trv(m_table_names, 1, progress))
117✔
946
            return;
×
947
        progress.back() = s_table_refs_ndx; // Handle tables next
117✔
948
    }
117✔
949
    if (progress[0] == s_table_refs_ndx) {
5,325✔
950
        if (!node_tree.trv(m_tables, 1, progress))
5,325✔
951
            return;
5,220✔
952
        progress.back() = s_hist_ref_ndx; // Handle history next
105✔
953
    }
105✔
954
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
2,715✔
955
        Array hist_arr(m_top.get_alloc());
105✔
956
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
105✔
957
        hist_arr.init_from_parent();
105✔
958
        if (!node_tree.trv(hist_arr, 1, progress))
105✔
959
            return;
×
960
    }
105✔
961
    progress.clear();
105✔
962
}
105✔
963

964
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc