• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

realm / realm-core / 1653

08 Sep 2023 11:57PM UTC coverage: 91.247% (+0.04%) from 91.205%
1653

push

Evergreen

GitHub
Fix tools build (#6475)

95872 of 175834 branches covered (0.0%)

233451 of 255845 relevant lines covered (91.25%)

7518301.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.44
/src/realm/transaction.cpp
1
/*************************************************************************
2
 *
3
 * Copyright 2022 Realm Inc.
4
 *
5
 * Licensed under the Apache License, Version 2.0 (the "License");
6
 * you may not use this file except in compliance with the License.
7
 * You may obtain a copy of the License at
8
 *
9
 * http://www.apache.org/licenses/LICENSE-2.0
10
 *
11
 * Unless required by applicable law or agreed to in writing, software
12
 * distributed under the License is distributed on an "AS IS" BASIS,
13
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14
 * See the License for the specific language governing permissions and
15
 * limitations under the License.
16
 *
17
 **************************************************************************/
18

19
#include <realm/transaction.hpp>
20
#include "impl/copy_replication.hpp"
21
#include <realm/list.hpp>
22
#include <realm/set.hpp>
23
#include <realm/dictionary.hpp>
24
#include <realm/table_view.hpp>
25
#include <realm/group_writer.hpp>
26

27
namespace {
28

29
using namespace realm;
30
using ColInfo = std::vector<std::pair<ColKey, Table*>>;
31

32
ColInfo get_col_info(const Table* table)
33
{
2,010✔
34
    std::vector<std::pair<ColKey, Table*>> cols;
2,010✔
35
    if (table) {
2,010✔
36
        for (auto col : table->get_column_keys()) {
654✔
37
            Table* embedded_table = nullptr;
654✔
38
            if (auto target_table = table->get_opposite_table(col)) {
654✔
39
                if (target_table->is_embedded())
216✔
40
                    embedded_table = target_table.unchecked_ptr();
168✔
41
            }
216✔
42
            cols.emplace_back(col, embedded_table);
654✔
43
        }
654✔
44
    }
282✔
45
    return cols;
2,010✔
46
}
2,010✔
47

48
void generate_properties_for_obj(Replication& repl, const Obj& obj, const ColInfo& cols)
49
{
846✔
50
    for (auto elem : cols) {
1,884✔
51
        auto col = elem.first;
1,884✔
52
        auto embedded_table = elem.second;
1,884✔
53
        auto cols_2 = get_col_info(embedded_table);
1,884✔
54
        auto update_embedded = [&](Mixed val) {
999✔
55
            if (val.is_null()) {
114✔
56
                return;
18✔
57
            }
18✔
58
            REALM_ASSERT(val.is_type(type_Link, type_TypedLink));
96✔
59
            Obj embedded_obj = embedded_table->get_object(val.get<ObjKey>());
96✔
60
            generate_properties_for_obj(repl, embedded_obj, cols_2);
96✔
61
        };
96✔
62

942✔
63
        if (col.is_list()) {
1,884✔
64
            auto list = obj.get_listbase_ptr(col);
96✔
65
            auto sz = list->size();
96✔
66
            repl.list_clear(*list);
96✔
67
            for (size_t n = 0; n < sz; n++) {
180✔
68
                auto val = list->get_any(n);
84✔
69
                repl.list_insert(*list, n, val, n);
84✔
70
                if (embedded_table) {
84✔
71
                    update_embedded(val);
18✔
72
                }
18✔
73
            }
84✔
74
        }
96✔
75
        else if (col.is_set()) {
1,788✔
76
            auto set = obj.get_setbase_ptr(col);
18✔
77
            auto sz = set->size();
18✔
78
            for (size_t n = 0; n < sz; n++) {
54✔
79
                repl.set_insert(*set, n, set->get_any(n));
36✔
80
                // Sets cannot have embedded objects
18✔
81
            }
36✔
82
        }
18✔
83
        else if (col.is_dictionary()) {
1,770✔
84
            auto dict = obj.get_dictionary(col);
96✔
85
            size_t n = 0;
96✔
86
            for (auto [key, value] : dict) {
87✔
87
                repl.dictionary_insert(dict, n++, key, value);
78✔
88
                if (embedded_table) {
78✔
89
                    update_embedded(value);
42✔
90
                }
42✔
91
            }
78✔
92
        }
96✔
93
        else {
1,674✔
94
            auto val = obj.get_any(col);
1,674✔
95
            repl.set(obj.get_table().unchecked_ptr(), col, obj.get_key(), val);
1,674✔
96
            if (embedded_table) {
1,674✔
97
                update_embedded(val);
54✔
98
            }
54✔
99
        }
1,674✔
100
    }
1,884✔
101
}
846✔
102

103
} // namespace
104

105
namespace realm {
106

107
std::map<DB::TransactStage, const char*> log_messages = {
108
    {DB::TransactStage::transact_Frozen, "Start frozen: %1"},
109
    {DB::TransactStage::transact_Writing, "Start write: %1"},
110
    {DB::TransactStage::transact_Reading, "Start read: %1"},
111
};
112

113
Transaction::Transaction(DBRef _db, SlabAlloc* alloc, DB::ReadLockInfo& rli, DB::TransactStage stage)
114
    : Group(alloc)
115
    , db(_db)
116
    , m_read_lock(rli)
117
{
2,746,059✔
118
    if (db->m_logger) {
2,746,059✔
119
        db->m_logger->log(util::Logger::Level::trace, log_messages[stage], rli.m_version);
671,292✔
120
    }
671,292✔
121
    bool writable = stage == DB::transact_Writing;
2,746,059✔
122
    m_transact_stage = DB::transact_Ready;
2,746,059✔
123
    set_metrics(db->m_metrics);
2,746,059✔
124
    set_transact_stage(stage);
2,746,059✔
125
    m_alloc.note_reader_start(this);
2,746,059✔
126
    attach_shared(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable,
2,746,059✔
127
                  VersionID{rli.m_version, rli.m_reader_idx});
2,746,059✔
128
}
2,746,059✔
129

130
Transaction::~Transaction()
131
{
2,746,980✔
132
    // Note that this does not call close() - calling close() is done
1,682,805✔
133
    // implicitly by the deleter.
1,682,805✔
134
}
2,746,980✔
135

136
void Transaction::close()
137
{
2,755,851✔
138
    if (m_transact_stage == DB::transact_Writing) {
2,755,851✔
139
        rollback();
9,873✔
140
    }
9,873✔
141
    if (m_transact_stage == DB::transact_Reading || m_transact_stage == DB::transact_Frozen) {
2,755,851✔
142
        do_end_read();
1,662,018✔
143
    }
1,662,018✔
144
}
2,755,851✔
145

146
size_t Transaction::get_commit_size() const
147
{
35,523✔
148
    size_t sz = 0;
35,523✔
149
    if (m_transact_stage == DB::transact_Writing) {
35,523✔
150
        sz = m_alloc.get_commit_size();
5,928✔
151
    }
5,928✔
152
    return sz;
35,523✔
153
}
35,523✔
154

155
DB::version_type Transaction::commit()
156
{
969,498✔
157
    check_attached();
969,498✔
158

491,568✔
159
    if (m_transact_stage != DB::transact_Writing)
969,498✔
160
        throw WrongTransactionState("Not a write transaction");
×
161

491,568✔
162
    REALM_ASSERT(is_attached());
969,498✔
163

491,568✔
164
    // before committing, allow any accessors at group level or below to sync
491,568✔
165
    flush_accessors_for_commit();
969,498✔
166

491,568✔
167
    DB::version_type new_version = db->do_commit(*this); // Throws
969,498✔
168

491,568✔
169
    // We need to set m_read_lock in order for wait_for_change to work.
491,568✔
170
    // To set it, we grab a readlock on the latest available snapshot
491,568✔
171
    // and release it again.
491,568✔
172
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
969,498✔
173
    db->release_read_lock(lock_after_commit);
969,498✔
174

491,568✔
175
    db->end_write_on_correct_thread();
969,498✔
176

491,568✔
177
    do_end_read();
969,498✔
178
    m_read_lock = lock_after_commit;
969,498✔
179

491,568✔
180
    return new_version;
969,498✔
181
}
969,498✔
182

183
void Transaction::rollback()
184
{
10,791✔
185
    // rollback may happen as a consequence of exception handling in cases where
5,397✔
186
    // the DB has detached. If so, just back out without trying to change state.
5,397✔
187
    // the DB object has already been closed and no further processing is possible.
5,397✔
188
    if (!is_attached())
10,791✔
189
        return;
6✔
190
    if (m_transact_stage == DB::transact_Ready)
10,785✔
191
        return; // Idempotency
×
192

5,394✔
193
    if (m_transact_stage != DB::transact_Writing)
10,785✔
194
        throw WrongTransactionState("Not a write transaction");
×
195
    db->reset_free_space_tracking();
10,785✔
196
    if (!holds_write_mutex())
10,785✔
197
        db->end_write_on_correct_thread();
10,785✔
198

5,394✔
199
    do_end_read();
10,785✔
200
}
10,785✔
201

202
void Transaction::end_read()
203
{
98,040✔
204
    if (m_transact_stage == DB::transact_Ready)
98,040✔
205
        return;
6✔
206
    if (m_transact_stage == DB::transact_Writing)
98,034✔
207
        throw WrongTransactionState("Illegal end_read when in write mode");
×
208
    do_end_read();
98,034✔
209
}
98,034✔
210

211
VersionID Transaction::commit_and_continue_as_read(bool commit_to_disk)
212
{
386,445✔
213
    check_attached();
386,445✔
214
    if (m_transact_stage != DB::transact_Writing)
386,445✔
215
        throw WrongTransactionState("Not a write transaction");
×
216

191,703✔
217
    flush_accessors_for_commit();
386,445✔
218

191,703✔
219
    DB::version_type version = db->do_commit(*this, commit_to_disk); // Throws
386,445✔
220

191,703✔
221
    // advance read lock but dont update accessors:
191,703✔
222
    // As this is done under lock, along with the addition above of the newest commit,
191,703✔
223
    // we know for certain that the read lock we will grab WILL refer to our own newly
191,703✔
224
    // completed commit.
191,703✔
225

191,703✔
226
    try {
386,445✔
227
        // Grabbing the new lock before releasing the old one prevents m_transaction_count
191,703✔
228
        // from going shortly to zero
191,703✔
229
        DB::ReadLockInfo new_read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID()); // Throws
386,445✔
230

191,703✔
231
        m_history = nullptr;
386,445✔
232
        set_transact_stage(DB::transact_Reading);
386,445✔
233

191,703✔
234
        if (commit_to_disk || m_oldest_version_not_persisted) {
386,445✔
235
            // Here we are either committing to disk or we are already
191,400✔
236
            // holding on to an older version. In either case there is
191,400✔
237
            // no need to hold onto this now historic version.
191,400✔
238
            db->release_read_lock(m_read_lock);
385,092✔
239
        }
385,092✔
240
        else {
1,353✔
241
            // We are not commiting to disk and there is no older
303✔
242
            // version not persisted, so hold onto this one
303✔
243
            m_oldest_version_not_persisted = m_read_lock;
1,353✔
244
        }
1,353✔
245

191,703✔
246
        if (commit_to_disk && m_oldest_version_not_persisted) {
386,445✔
247
            // We are committing to disk so we can release the
6✔
248
            // version we are holding on to
6✔
249
            db->release_read_lock(*m_oldest_version_not_persisted);
12✔
250
            m_oldest_version_not_persisted.reset();
12✔
251
        }
12✔
252
        m_read_lock = new_read_lock;
386,445✔
253
        // We can be sure that m_read_lock != m_oldest_version_not_persisted
191,703✔
254
        // because m_oldest_version_not_persisted is either equal to former m_read_lock
191,703✔
255
        // or older and former m_read_lock is older than current m_read_lock
191,703✔
256
        REALM_ASSERT(!m_oldest_version_not_persisted ||
386,445✔
257
                     m_read_lock.m_version != m_oldest_version_not_persisted->m_version);
386,445✔
258

191,703✔
259
        {
386,445✔
260
            util::CheckedLockGuard lock(m_async_mutex);
386,445✔
261
            REALM_ASSERT(m_async_stage != AsyncState::Syncing);
386,445✔
262
            if (commit_to_disk) {
386,445✔
263
                if (m_async_stage == AsyncState::Requesting) {
378,954✔
264
                    m_async_stage = AsyncState::HasLock;
×
265
                }
×
266
                else {
378,954✔
267
                    db->end_write_on_correct_thread();
378,954✔
268
                    m_async_stage = AsyncState::Idle;
378,954✔
269
                }
378,954✔
270
            }
378,954✔
271
            else {
7,491✔
272
                m_async_stage = AsyncState::HasCommits;
7,491✔
273
            }
7,491✔
274
        }
386,445✔
275

191,703✔
276
        // Remap file if it has grown, and update refs in underlying node structure.
191,703✔
277
        remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, false); // Throws
386,445✔
278
        return VersionID{version, new_read_lock.m_reader_idx};
386,445✔
279
    }
386,445✔
280
    catch (...) {
×
281
        // In case of failure, further use of the transaction for reading is unsafe
282
        set_transact_stage(DB::transact_Ready);
×
283
        throw;
×
284
    }
×
285
}
386,445✔
286

287
VersionID Transaction::commit_and_continue_writing()
288
{
768✔
289
    check_attached();
768✔
290
    if (m_transact_stage != DB::transact_Writing)
768✔
291
        throw WrongTransactionState("Not a write transaction");
×
292

384✔
293
    // before committing, allow any accessors at group level or below to sync
384✔
294
    flush_accessors_for_commit();
768✔
295

384✔
296
    DB::version_type version = db->do_commit(*this); // Throws
768✔
297

384✔
298
    // We need to set m_read_lock in order for wait_for_change to work.
384✔
299
    // To set it, we grab a readlock on the latest available snapshot
384✔
300
    // and release it again.
384✔
301
    DB::ReadLockInfo lock_after_commit = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
768✔
302
    db->release_read_lock(m_read_lock);
768✔
303
    m_read_lock = lock_after_commit;
768✔
304
    if (Replication* repl = db->get_replication()) {
768✔
305
        bool history_updated = false;
312✔
306
        repl->initiate_transact(*this, lock_after_commit.m_version, history_updated); // Throws
312✔
307
    }
312✔
308

384✔
309
    bool writable = true;
768✔
310
    remap_and_update_refs(m_read_lock.m_top_ref, m_read_lock.m_file_size, writable); // Throws
768✔
311
    return VersionID{version, lock_after_commit.m_reader_idx};
768✔
312
}
768✔
313

314
TransactionRef Transaction::freeze()
315
{
6,042✔
316
    if (m_transact_stage != DB::transact_Reading)
6,042✔
317
        throw WrongTransactionState("Can only freeze a read transaction");
6✔
318
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
6,036✔
319
    return db->start_frozen(version);
6,036✔
320
}
6,036✔
321

322
TransactionRef Transaction::duplicate()
323
{
49,032✔
324
    auto version = VersionID(m_read_lock.m_version, m_read_lock.m_reader_idx);
49,032✔
325
    switch (m_transact_stage) {
49,032✔
326
        case DB::transact_Ready:
✔
327
            throw WrongTransactionState("Cannot duplicate a transaction which does not have a read lock.");
×
328
        case DB::transact_Reading:
49,020✔
329
            return db->start_read(version);
49,020✔
330
        case DB::transact_Frozen:
✔
331
            return db->start_frozen(version);
×
332
        case DB::transact_Writing:
12✔
333
            if (get_commit_size() != 0)
12✔
334
                throw WrongTransactionState(
×
335
                    "Can only duplicate a write transaction before any changes have been made.");
×
336
            return db->start_read(version);
12✔
337
    }
×
338
    REALM_UNREACHABLE();
×
339
}
×
340

341
void Transaction::copy_to(TransactionRef dest) const
342
{
42✔
343
    _impl::CopyReplication repl(dest);
42✔
344
    replicate(dest.get(), repl);
42✔
345
}
42✔
346

347
_impl::History* Transaction::get_history() const
348
{
3,159,099✔
349
    if (!m_history) {
3,159,099✔
350
        if (auto repl = db->get_replication()) {
1,130,283✔
351
            switch (m_transact_stage) {
1,112,358✔
352
                case DB::transact_Reading:
144,558✔
353
                case DB::transact_Frozen:
144,558✔
354
                    if (!m_history_read)
144,558✔
355
                        m_history_read = repl->_create_history_read();
141,984✔
356
                    m_history = m_history_read.get();
144,558✔
357
                    m_history->set_group(const_cast<Transaction*>(this), false);
144,558✔
358
                    break;
144,558✔
359
                case DB::transact_Writing:
967,806✔
360
                    m_history = repl->_get_history_write();
967,806✔
361
                    break;
967,806✔
362
                case DB::transact_Ready:
71,406✔
363
                    break;
×
364
            }
3,159,093✔
365
        }
3,159,093✔
366
    }
1,130,283✔
367
    return m_history;
3,159,093✔
368
}
3,159,093✔
369

370
Obj Transaction::import_copy_of(const Obj& original)
371
{
18,828✔
372
    if (bool(original) && original.is_valid()) {
18,828✔
373
        TableKey tk = original.get_table_key();
18,798✔
374
        ObjKey rk = original.get_key();
18,798✔
375
        auto table = get_table(tk);
18,798✔
376
        if (table->is_valid(rk))
18,798✔
377
            return table->get_object(rk);
18,240✔
378
    }
588✔
379
    return {};
588✔
380
}
588✔
381

382
TableRef Transaction::import_copy_of(ConstTableRef original)
383
{
226,935✔
384
    TableKey tk = original->get_key();
226,935✔
385
    return get_table(tk);
226,935✔
386
}
226,935✔
387

388
LnkLst Transaction::import_copy_of(const LnkLst& original)
389
{
×
390
    if (Obj obj = import_copy_of(original.get_obj())) {
×
391
        ColKey ck = original.get_col_key();
×
392
        return obj.get_linklist(ck);
×
393
    }
×
394
    return LnkLst();
×
395
}
×
396

397
LstBasePtr Transaction::import_copy_of(const LstBase& original)
398
{
12✔
399
    if (Obj obj = import_copy_of(original.get_obj())) {
12✔
400
        ColKey ck = original.get_col_key();
6✔
401
        return obj.get_listbase_ptr(ck);
6✔
402
    }
6✔
403
    return {};
6✔
404
}
6✔
405

406
SetBasePtr Transaction::import_copy_of(const SetBase& original)
407
{
×
408
    if (Obj obj = import_copy_of(original.get_obj())) {
×
409
        ColKey ck = original.get_col_key();
×
410
        return obj.get_setbase_ptr(ck);
×
411
    }
×
412
    return {};
×
413
}
×
414

415
CollectionBasePtr Transaction::import_copy_of(const CollectionBase& original)
416
{
11,430✔
417
    if (Obj obj = import_copy_of(original.get_obj())) {
11,430✔
418
        ColKey ck = original.get_col_key();
10,902✔
419
        return obj.get_collection_ptr(ck);
10,902✔
420
    }
10,902✔
421
    return {};
528✔
422
}
528✔
423

424
LnkLstPtr Transaction::import_copy_of(const LnkLstPtr& original)
425
{
6✔
426
    if (!bool(original))
6✔
427
        return nullptr;
×
428
    if (Obj obj = import_copy_of(original->get_obj())) {
6✔
429
        ColKey ck = original->get_col_key();
6✔
430
        return obj.get_linklist_ptr(ck);
6✔
431
    }
6✔
432
    return std::make_unique<LnkLst>();
×
433
}
×
434

435
LnkSetPtr Transaction::import_copy_of(const LnkSetPtr& original)
436
{
×
437
    if (!original)
×
438
        return nullptr;
×
439
    if (Obj obj = import_copy_of(original->get_obj())) {
×
440
        ColKey ck = original->get_col_key();
×
441
        return obj.get_linkset_ptr(ck);
×
442
    }
×
443
    return std::make_unique<LnkSet>();
×
444
}
×
445

446
LinkCollectionPtr Transaction::import_copy_of(const LinkCollectionPtr& original)
447
{
56,700✔
448
    if (!original)
56,700✔
449
        return nullptr;
56,004✔
450
    if (Obj obj = import_copy_of(original->get_owning_obj())) {
696✔
451
        ColKey ck = original->get_owning_col_key();
654✔
452
        return obj.get_linkcollection_ptr(ck);
654✔
453
    }
654✔
454
    // return some empty collection where size() == 0
21✔
455
    // the type shouldn't matter
21✔
456
    return std::make_unique<LnkLst>();
42✔
457
}
42✔
458

459
std::unique_ptr<Query> Transaction::import_copy_of(Query& query, PayloadPolicy policy)
460
{
59,472✔
461
    return query.clone_for_handover(this, policy);
59,472✔
462
}
59,472✔
463

464
std::unique_ptr<TableView> Transaction::import_copy_of(TableView& tv, PayloadPolicy policy)
465
{
38,040✔
466
    return tv.clone_for_handover(this, policy);
38,040✔
467
}
38,040✔
468

469
void Transaction::upgrade_file_format(int target_file_format_version)
470
{
234✔
471
    REALM_ASSERT(is_attached());
234✔
472
    if (fake_target_file_format && *fake_target_file_format == target_file_format_version) {
234✔
473
        // Testing, mockup scenario, not a real upgrade. Just pretend we're done!
15✔
474
        return;
30✔
475
    }
30✔
476

102✔
477
    // Be sure to revisit the following upgrade logic when a new file format
102✔
478
    // version is introduced. The following assert attempt to help you not
102✔
479
    // forget it.
102✔
480
    REALM_ASSERT_EX(target_file_format_version == 23, target_file_format_version);
204✔
481

102✔
482
    // DB::do_open() must ensure that only supported version are allowed.
102✔
483
    // It does that by asking backup if the current file format version is
102✔
484
    // included in the accepted versions, so be sure to align the list of
102✔
485
    // versions with the logic below
102✔
486

102✔
487
    int current_file_format_version = get_file_format_version();
204✔
488
    REALM_ASSERT(current_file_format_version < target_file_format_version);
204✔
489

102✔
490
    // Upgrade from version prior to 7 (new history schema version in top array)
102✔
491
    if (current_file_format_version <= 6 && target_file_format_version >= 7) {
204✔
492
        // If top array size is 9, then add the missing 10th element containing
15✔
493
        // the history schema version.
15✔
494
        std::size_t top_size = m_top.size();
30✔
495
        REALM_ASSERT(top_size <= 9);
30✔
496
        if (top_size == 9) {
30✔
497
            int initial_history_schema_version = 0;
18✔
498
            m_top.add(initial_history_schema_version); // Throws
18✔
499
        }
18✔
500
        set_file_format_version(7);
30✔
501
        commit_and_continue_writing();
30✔
502
    }
30✔
503

102✔
504
    // Upgrade from version prior to 10 (Cluster based db)
102✔
505
    if (current_file_format_version <= 9 && target_file_format_version >= 10) {
204✔
506
        DisableReplication disable_replication(*this);
102✔
507

51✔
508
        std::vector<TableRef> table_accessors;
102✔
509
        TableRef pk_table;
102✔
510
        TableRef progress_info;
102✔
511
        ColKey col_objects;
102✔
512
        ColKey col_links;
102✔
513
        std::map<TableRef, ColKey> pk_cols;
102✔
514

51✔
515
        // Use table lookup by name. The table keys are not generated yet
51✔
516
        for (size_t t = 0; t < m_table_names.size(); t++) {
438✔
517
            StringData name = m_table_names.get(t);
336✔
518
            // In file format version 9 files, all names represent existing tables.
168✔
519
            auto table = get_table(name);
336✔
520
            if (name == "pk") {
336✔
521
                pk_table = table;
54✔
522
            }
54✔
523
            else if (name == "!UPDATE_PROGRESS") {
282✔
524
                progress_info = table;
×
525
            }
×
526
            else {
282✔
527
                table_accessors.push_back(table);
282✔
528
            }
282✔
529
        }
336✔
530

51✔
531
        if (!progress_info) {
102✔
532
            // This is the first time. Prepare for moving objects in one go.
51✔
533
            progress_info = this->add_table_with_primary_key("!UPDATE_PROGRESS", type_String, "table_name");
102✔
534
            col_objects = progress_info->add_column(type_Bool, "objects_migrated");
102✔
535
            col_links = progress_info->add_column(type_Bool, "links_migrated");
102✔
536

51✔
537

51✔
538
            for (auto k : table_accessors) {
282✔
539
                k->migrate_column_info();
282✔
540
            }
282✔
541

51✔
542
            if (pk_table) {
102✔
543
                pk_table->migrate_column_info();
54✔
544
                pk_table->migrate_indexes(ColKey());
54✔
545
                pk_table->create_columns();
54✔
546
                pk_table->migrate_objects();
54✔
547
                pk_cols = get_primary_key_columns_from_pk_table(pk_table);
54✔
548
            }
54✔
549

51✔
550
            for (auto k : table_accessors) {
282✔
551
                k->migrate_indexes(pk_cols[k]);
282✔
552
            }
282✔
553
            for (auto k : table_accessors) {
282✔
554
                k->migrate_subspec();
282✔
555
            }
282✔
556
            for (auto k : table_accessors) {
282✔
557
                k->create_columns();
282✔
558
            }
282✔
559
            commit_and_continue_writing();
102✔
560
        }
102✔
561
        else {
×
562
            if (pk_table) {
×
563
                pk_cols = get_primary_key_columns_from_pk_table(pk_table);
×
564
            }
×
565
            col_objects = progress_info->get_column_key("objects_migrated");
×
566
            col_links = progress_info->get_column_key("links_migrated");
×
567
        }
×
568

51✔
569
        bool updates = false;
102✔
570
        for (auto k : table_accessors) {
282✔
571
            if (k->verify_column_keys()) {
282✔
572
                updates = true;
6✔
573
            }
6✔
574
        }
282✔
575
        if (updates) {
102✔
576
            commit_and_continue_writing();
6✔
577
        }
6✔
578

51✔
579
        // Migrate objects
51✔
580
        for (auto k : table_accessors) {
282✔
581
            auto progress_status = progress_info->create_object_with_primary_key(k->get_name());
282✔
582
            if (!progress_status.get<bool>(col_objects)) {
282✔
583
                bool no_links = k->migrate_objects();
282✔
584
                progress_status.set(col_objects, true);
282✔
585
                progress_status.set(col_links, no_links);
282✔
586
                commit_and_continue_writing();
282✔
587
            }
282✔
588
        }
282✔
589
        for (auto k : table_accessors) {
282✔
590
            auto progress_status = progress_info->create_object_with_primary_key(k->get_name());
282✔
591
            if (!progress_status.get<bool>(col_links)) {
282✔
592
                k->migrate_links();
60✔
593
                progress_status.set(col_links, true);
60✔
594
                commit_and_continue_writing();
60✔
595
            }
60✔
596
        }
282✔
597

51✔
598
        // Final cleanup
51✔
599
        for (auto k : table_accessors) {
282✔
600
            k->finalize_migration(pk_cols[k]);
282✔
601
        }
282✔
602

51✔
603
        if (pk_table) {
102✔
604
            remove_table("pk");
54✔
605
        }
54✔
606
        remove_table(progress_info->get_key());
102✔
607
    }
102✔
608

102✔
609
    // Ensure we have search index on all primary key columns.
102✔
610
    auto table_keys = get_table_keys();
204✔
611
    if (current_file_format_version < 22) {
204✔
612
        for (auto k : table_keys) {
384✔
613
            auto t = get_table(k);
384✔
614
            if (auto col = t->get_primary_key_column()) {
384✔
615
                t->do_add_search_index(col, IndexType::General);
228✔
616
            }
228✔
617
        }
384✔
618
    }
138✔
619

102✔
620
    if (current_file_format_version == 22) {
204✔
621
        // Check that asymmetric table are empty
33✔
622
        for (auto k : table_keys) {
180✔
623
            auto t = get_table(k);
180✔
624
            if (t->is_asymmetric() && t->size() > 0) {
180✔
625
                t->clear();
6✔
626
            }
6✔
627
        }
180✔
628
    }
66✔
629
    if (current_file_format_version >= 21 && current_file_format_version < 23) {
204✔
630
        // Upgrade Set and Dictionary columns
33✔
631
        for (auto k : table_keys) {
180✔
632
            auto t = get_table(k);
180✔
633
            t->migrate_sets_and_dictionaries();
180✔
634
        }
180✔
635
    }
66✔
636
    // NOTE: Additional future upgrade steps go here.
102✔
637
}
204✔
638

639
void Transaction::promote_to_async()
640
{
7,488✔
641
    util::CheckedLockGuard lck(m_async_mutex);
7,488✔
642
    if (m_async_stage == AsyncState::Idle) {
7,488✔
643
        m_async_stage = AsyncState::HasLock;
366✔
644
    }
366✔
645
}
7,488✔
646

647
void Transaction::replicate(Transaction* dest, Replication& repl) const
648
{
78✔
649
    // We should only create entries for public tables
39✔
650
    std::vector<TableKey> public_table_keys;
78✔
651
    for (auto tk : get_table_keys()) {
222✔
652
        if (table_is_public(tk))
222✔
653
            public_table_keys.push_back(tk);
180✔
654
    }
222✔
655

39✔
656
    // Create tables
39✔
657
    for (auto tk : public_table_keys) {
180✔
658
        auto table = get_table(tk);
180✔
659
        auto table_name = table->get_name();
180✔
660
        if (!table->is_embedded()) {
180✔
661
            auto pk_col = table->get_primary_key_column();
138✔
662
            if (!pk_col)
138✔
663
                throw RuntimeError(
×
664
                    ErrorCodes::BrokenInvariant,
×
665
                    util::format("Class '%1' must have a primary key", Group::table_name_to_class_name(table_name)));
×
666
            auto pk_name = table->get_column_name(pk_col);
138✔
667
            if (pk_name != "_id")
138✔
668
                throw RuntimeError(ErrorCodes::BrokenInvariant,
×
669
                                   util::format("Primary key of class '%1' must be named '_id'. Current is '%2'",
×
670
                                                Group::table_name_to_class_name(table_name), pk_name));
×
671
            repl.add_class_with_primary_key(tk, table_name, DataType(pk_col.get_type()), pk_name,
138✔
672
                                            pk_col.is_nullable(), table->get_table_type());
138✔
673
        }
138✔
674
        else {
42✔
675
            repl.add_class(tk, table_name, Table::Type::Embedded);
42✔
676
        }
42✔
677
    }
180✔
678
    // Create columns
39✔
679
    for (auto tk : public_table_keys) {
180✔
680
        auto table = get_table(tk);
180✔
681
        auto pk_col = table->get_primary_key_column();
180✔
682
        auto cols = table->get_column_keys();
180✔
683
        for (auto col : cols) {
450✔
684
            if (col == pk_col)
450✔
685
                continue;
138✔
686
            repl.insert_column(table.unchecked_ptr(), col, DataType(col.get_type()), table->get_column_name(col),
312✔
687
                               table->get_opposite_table(col).unchecked_ptr());
312✔
688
        }
312✔
689
    }
180✔
690
    dest->commit_and_continue_writing();
78✔
691
    // Now the schema should be in place - create the objects
39✔
692
#ifdef REALM_DEBUG
78✔
693
    constexpr int number_of_objects_to_create_before_committing = 100;
78✔
694
#else
695
    constexpr int number_of_objects_to_create_before_committing = 1000;
696
#endif
697
    auto n = number_of_objects_to_create_before_committing;
78✔
698
    for (auto tk : public_table_keys) {
168✔
699
        auto table = get_table(tk);
168✔
700
        if (table->is_embedded())
168✔
701
            continue;
42✔
702
        // std::cout << "Table: " << table->get_name() << std::endl;
63✔
703
        auto pk_col = table->get_primary_key_column();
126✔
704
        auto cols = get_col_info(table.unchecked_ptr());
126✔
705
        for (auto o : *table) {
750✔
706
            auto obj_key = o.get_key();
750✔
707
            Mixed pk = o.get_any(pk_col);
750✔
708
            // std::cout << "    Object: " << pk << std::endl;
375✔
709
            repl.create_object_with_primary_key(table.unchecked_ptr(), obj_key, pk);
750✔
710
            generate_properties_for_obj(repl, o, cols);
750✔
711
            if (--n == 0) {
750✔
712
                dest->commit_and_continue_writing();
6✔
713
                n = number_of_objects_to_create_before_committing;
6✔
714
            }
6✔
715
        }
750✔
716
    }
126✔
717
}
78✔
718

719
void Transaction::complete_async_commit()
720
{
1,332✔
721
    // sync to disk:
294✔
722
    DB::ReadLockInfo read_lock;
1,332✔
723
    try {
1,332✔
724
        read_lock = db->grab_read_lock(DB::ReadLockInfo::Live, VersionID());
1,332✔
725
        GroupWriter out(*this);
1,332✔
726
        out.commit(read_lock.m_top_ref); // Throws
1,332✔
727
        // we must release the write mutex before the callback, because the callback
294✔
728
        // is allowed to re-request it.
294✔
729
        db->release_read_lock(read_lock);
1,332✔
730
        if (m_oldest_version_not_persisted) {
1,332✔
731
            db->release_read_lock(*m_oldest_version_not_persisted);
1,326✔
732
            m_oldest_version_not_persisted.reset();
1,326✔
733
        }
1,326✔
734
    }
1,332✔
735
    catch (...) {
297✔
736
        m_commit_exception = std::current_exception();
6✔
737
        m_async_commit_has_failed = true;
6✔
738
        db->release_read_lock(read_lock);
6✔
739
    }
6✔
740
}
1,332✔
741

742
void Transaction::async_complete_writes(util::UniqueFunction<void()> when_synchronized)
743
{
9,705✔
744
    util::CheckedLockGuard lck(m_async_mutex);
9,705✔
745
    if (m_async_stage == AsyncState::HasLock) {
9,705✔
746
        // Nothing to commit to disk - just release write lock
27✔
747
        m_async_stage = AsyncState::Idle;
54✔
748
        db->async_end_write();
54✔
749
    }
54✔
750
    else if (m_async_stage == AsyncState::HasCommits) {
9,651✔
751
        m_async_stage = AsyncState::Syncing;
1,308✔
752
        m_commit_exception = std::exception_ptr();
1,308✔
753
        // get a callback on the helper thread, in which to sync to disk
282✔
754
        db->async_sync_to_disk([this, cb = std::move(when_synchronized)]() noexcept {
1,308✔
755
            complete_async_commit();
1,308✔
756
            util::CheckedLockGuard lck(m_async_mutex);
1,308✔
757
            m_async_stage = AsyncState::Idle;
1,308✔
758
            if (m_waiting_for_sync) {
1,308✔
759
                m_waiting_for_sync = false;
366✔
760
                m_async_cv.notify_all();
366✔
761
            }
366✔
762
            else {
942✔
763
                cb();
942✔
764
            }
942✔
765
        });
1,308✔
766
    }
1,308✔
767
}
9,705✔
768

769
void Transaction::prepare_for_close()
770
{
2,909,055✔
771
    util::CheckedLockGuard lck(m_async_mutex);
2,909,055✔
772
    switch (m_async_stage) {
2,909,055✔
773
        case AsyncState::Idle:
2,918,904✔
774
            break;
2,918,904✔
775

776
        case AsyncState::Requesting:
24✔
777
            // We don't have the ability to cancel a wait on the write lock, so
12✔
778
            // unfortunately we have to wait for it to be acquired.
12✔
779
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
780
            REALM_ASSERT(!m_oldest_version_not_persisted);
24✔
781
            m_waiting_for_write_lock = true;
24✔
782
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
783
                return !m_waiting_for_write_lock;
48✔
784
            });
48✔
785
            db->end_write_on_correct_thread();
24✔
786
            break;
24✔
787

788
        case AsyncState::HasLock:
30✔
789
            // We have the lock and are currently in a write transaction, and
15✔
790
            // also may have some pending previous commits to write
15✔
791
            if (m_transact_stage == DB::transact_Writing) {
30✔
792
                db->reset_free_space_tracking();
18✔
793
                m_transact_stage = DB::transact_Reading;
18✔
794
            }
18✔
795
            if (m_oldest_version_not_persisted) {
30✔
796
                complete_async_commit();
×
797
            }
×
798
            db->end_write_on_correct_thread();
30✔
799
            break;
30✔
800

801
        case AsyncState::HasCommits:
24✔
802
            // We have commits which need to be synced to disk, so do that
12✔
803
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
804
            complete_async_commit();
24✔
805
            db->end_write_on_correct_thread();
24✔
806
            break;
24✔
807

808
        case AsyncState::Syncing:
24✔
809
            // The worker thread is currently writing, so wait for it to complete
12✔
810
            REALM_ASSERT(m_transact_stage == DB::transact_Reading);
24✔
811
            m_waiting_for_sync = true;
24✔
812
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
48✔
813
                return !m_waiting_for_sync;
48✔
814
            });
48✔
815
            break;
24✔
816
    }
2,919,159✔
817
    m_async_stage = AsyncState::Idle;
2,919,159✔
818
}
2,919,159✔
819

820
void Transaction::acquire_write_lock()
821
{
381,135✔
822
    util::CheckedUniqueLock lck(m_async_mutex);
381,135✔
823
    switch (m_async_stage) {
381,135✔
824
        case AsyncState::Idle:
380,685✔
825
            lck.unlock();
380,685✔
826
            db->do_begin_possibly_async_write();
380,685✔
827
            return;
380,685✔
828

829
        case AsyncState::Requesting:
105✔
830
            m_waiting_for_write_lock = true;
105✔
831
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
210✔
832
                return !m_waiting_for_write_lock;
210✔
833
            });
210✔
834
            return;
105✔
835

836
        case AsyncState::HasLock:
✔
837
        case AsyncState::HasCommits:
✔
838
            return;
×
839

840
        case AsyncState::Syncing:
342✔
841
            m_waiting_for_sync = true;
342✔
842
            m_async_cv.wait(lck.native_handle(), [this]() REQUIRES(m_async_mutex) {
684✔
843
                return !m_waiting_for_sync;
684✔
844
            });
684✔
845
            lck.unlock();
342✔
846
            db->do_begin_possibly_async_write();
342✔
847
            break;
342✔
848
    }
381,135✔
849
}
381,135✔
850

851
void Transaction::do_end_read() noexcept
852
{
2,739,735✔
853
    if (db->m_logger)
2,739,735✔
854
        db->m_logger->log(util::Logger::Level::trace, "End transaction");
671,220✔
855

1,675,701✔
856
    prepare_for_close();
2,739,735✔
857
    detach();
2,739,735✔
858

1,675,701✔
859
    // We should always be ensuring that async commits finish before we get here,
1,675,701✔
860
    // but if the fsync() failed or we failed to update the top pointer then
1,675,701✔
861
    // there's not much we can do and we have to just accept that we're losing
1,675,701✔
862
    // those commits.
1,675,701✔
863
    if (m_oldest_version_not_persisted) {
2,739,735✔
864
        REALM_ASSERT(m_async_commit_has_failed);
6✔
865
        // We need to not release our read lock on m_oldest_version_not_persisted
3✔
866
        // as that's the version the top pointer is referencing and overwriting
3✔
867
        // that version will corrupt the Realm file.
3✔
868
        db->leak_read_lock(*m_oldest_version_not_persisted);
6✔
869
    }
6✔
870
    db->release_read_lock(m_read_lock);
2,739,735✔
871

1,675,701✔
872
    m_alloc.note_reader_end(this);
2,739,735✔
873
    set_transact_stage(DB::transact_Ready);
2,739,735✔
874
    // reset the std::shared_ptr to allow the DB object to release resources
1,675,701✔
875
    // as early as possible.
1,675,701✔
876
    db.reset();
2,739,735✔
877
}
2,739,735✔
878

879
// This is the same as do_end_read() above, but with the requirement that
880
// 1) This is called with the db->mutex locked already
881
// 2) No async commits outstanding
882
void Transaction::close_read_with_lock()
883
{
144✔
884
    REALM_ASSERT(m_transact_stage == DB::transact_Reading);
144✔
885
    {
144✔
886
        util::CheckedLockGuard lck(m_async_mutex);
144✔
887
        REALM_ASSERT_EX(m_async_stage == AsyncState::Idle, size_t(m_async_stage));
144✔
888
    }
144✔
889

72✔
890
    detach();
144✔
891
    REALM_ASSERT_EX(!m_oldest_version_not_persisted, m_oldest_version_not_persisted->m_type,
144✔
892
                    m_oldest_version_not_persisted->m_version, m_oldest_version_not_persisted->m_top_ref,
144✔
893
                    m_oldest_version_not_persisted->m_file_size);
144✔
894
    db->do_release_read_lock(m_read_lock);
144✔
895

72✔
896
    m_alloc.note_reader_end(this);
144✔
897
    set_transact_stage(DB::transact_Ready);
144✔
898
    // reset the std::shared_ptr to allow the DB object to release resources
72✔
899
    // as early as possible.
72✔
900
    db.reset();
144✔
901
}
144✔
902

903

904
void Transaction::initialize_replication()
905
{
102✔
906
    if (m_transact_stage == DB::transact_Writing) {
102✔
907
        if (Replication* repl = get_replication()) {
102✔
908
            auto current_version = m_read_lock.m_version;
96✔
909
            bool history_updated = false;
96✔
910
            repl->initiate_transact(*this, current_version, history_updated); // Throws
96✔
911
        }
96✔
912
    }
102✔
913
}
102✔
914

915
void Transaction::set_transact_stage(DB::TransactStage stage) noexcept
916
{
6,279,129✔
917
#if REALM_METRICS
6,279,129✔
918
    REALM_ASSERT(m_metrics == db->m_metrics);
6,279,129✔
919
    if (m_metrics) { // null if metrics are disabled
6,279,129✔
920
        size_t free_space;
660✔
921
        size_t used_space;
660✔
922
        db->get_stats(free_space, used_space);
660✔
923
        size_t total_size = used_space + free_space;
660✔
924

330✔
925
        size_t num_objects = m_total_rows;
660✔
926
        size_t num_available_versions = static_cast<size_t>(db->get_number_of_versions());
660✔
927
        size_t num_decrypted_pages = realm::util::get_num_decrypted_pages();
660✔
928

330✔
929
        if (stage == DB::transact_Reading) {
660✔
930
            if (m_transact_stage == DB::transact_Writing) {
132✔
931
                m_metrics->end_write_transaction(total_size, free_space, num_objects, num_available_versions,
×
932
                                                 num_decrypted_pages);
×
933
            }
×
934
            m_metrics->start_read_transaction();
132✔
935
        }
132✔
936
        else if (stage == DB::transact_Writing) {
528✔
937
            if (m_transact_stage == DB::transact_Reading) {
198✔
938
                m_metrics->end_read_transaction(total_size, free_space, num_objects, num_available_versions,
×
939
                                                num_decrypted_pages);
×
940
            }
×
941
            m_metrics->start_write_transaction();
198✔
942
        }
198✔
943
        else if (stage == DB::transact_Ready) {
330✔
944
            m_metrics->end_read_transaction(total_size, free_space, num_objects, num_available_versions,
330✔
945
                                            num_decrypted_pages);
330✔
946
            m_metrics->end_write_transaction(total_size, free_space, num_objects, num_available_versions,
330✔
947
                                             num_decrypted_pages);
330✔
948
        }
330✔
949
    }
660✔
950
#endif
6,279,129✔
951

3,750,846✔
952
    m_transact_stage = stage;
6,279,129✔
953
}
6,279,129✔
954

955
class NodeTree {
956
public:
957
    NodeTree(size_t evac_limit, size_t work_limit)
958
        : m_evac_limit(evac_limit)
959
        , m_work_limit(int64_t(work_limit))
960
        , m_moved(0)
961
    {
5,337✔
962
    }
5,337✔
963
    ~NodeTree()
964
    {
5,337✔
965
        // std::cout << "Moved: " << m_moved << std::endl;
2,691✔
966
    }
5,337✔
967

968
    /// Function used to traverse the node tree and "copy on write" nodes
969
    /// that are found above the evac_limit. The function will return
970
    /// when either the whole tree has been travesed or when the work_limit
971
    /// has been reached.
972
    /// \param current_node - node to process.
973
    /// \param level - the level at which current_node is placed in the tree
974
    /// \param progress - When the traversal is initiated, this vector identifies at which
975
    ///                   node the process should be resumed. It is subesequently updated
976
    ///                   to point to the node we have just processed
977
    bool trv(Array& current_node, unsigned level, std::vector<size_t>& progress)
978
    {
1,260,993✔
979
        if (m_work_limit < 0) {
1,260,993✔
980
            return false;
5,214✔
981
        }
5,214✔
982
        if (current_node.is_read_only()) {
1,255,779✔
983
            size_t byte_size = current_node.get_byte_size();
1,228,353✔
984
            if ((current_node.get_ref() + byte_size) > m_evac_limit) {
1,228,353✔
985
                current_node.copy_on_write();
1,171,302✔
986
                m_moved++;
1,171,302✔
987
                m_work_limit -= byte_size;
1,171,302✔
988
            }
1,171,302✔
989
        }
1,228,353✔
990

628,320✔
991
        if (current_node.has_refs()) {
1,255,779✔
992
            auto sz = current_node.size();
41,625✔
993
            m_work_limit -= sz;
41,625✔
994
            if (progress.size() == level) {
41,625✔
995
                progress.push_back(0);
10,488✔
996
            }
10,488✔
997
            REALM_ASSERT_EX(level < progress.size(), level, progress.size());
41,625✔
998
            size_t ndx = progress[level];
41,625✔
999
            while (ndx < sz) {
1,280,712✔
1000
                auto val = current_node.get(ndx);
1,270,287✔
1001
                if (val && !(val & 1)) {
1,270,287✔
1002
                    Array arr(current_node.get_alloc());
1,255,536✔
1003
                    arr.set_parent(&current_node, ndx);
1,255,536✔
1004
                    arr.init_from_parent();
1,255,536✔
1005
                    if (!trv(arr, level + 1, progress)) {
1,255,536✔
1006
                        return false;
31,200✔
1007
                    }
31,200✔
1008
                }
1,239,087✔
1009
                ndx = ++progress[level];
1,239,087✔
1010
            }
1,239,087✔
1011
            while (progress.size() > level)
31,134✔
1012
                progress.pop_back();
10,428✔
1013
        }
10,425✔
1014
        return true;
1,240,191✔
1015
    }
1,255,779✔
1016

1017
private:
1018
    size_t m_evac_limit;
1019
    int64_t m_work_limit;
1020
    size_t m_moved;
1021
};
1022

1023

1024
void Transaction::cow_outliers(std::vector<size_t>& progress, size_t evac_limit, size_t work_limit)
1025
{
5,337✔
1026
    NodeTree node_tree(evac_limit, work_limit);
5,337✔
1027
    if (progress.empty()) {
5,337✔
1028
        progress.push_back(s_table_name_ndx);
135✔
1029
    }
135✔
1030
    if (progress[0] == s_table_name_ndx) {
5,337✔
1031
        if (!node_tree.trv(m_table_names, 1, progress))
135✔
1032
            return;
×
1033
        progress.back() = s_table_refs_ndx; // Handle tables next
135✔
1034
    }
135✔
1035
    if (progress[0] == s_table_refs_ndx) {
5,337✔
1036
        if (!node_tree.trv(m_tables, 1, progress))
5,337✔
1037
            return;
5,214✔
1038
        progress.back() = s_hist_ref_ndx; // Handle history next
123✔
1039
    }
123✔
1040
    if (progress[0] == s_hist_ref_ndx && m_top.get(s_hist_ref_ndx)) {
2,733✔
1041
        Array hist_arr(m_top.get_alloc());
123✔
1042
        hist_arr.set_parent(&m_top, s_hist_ref_ndx);
123✔
1043
        hist_arr.init_from_parent();
123✔
1044
        if (!node_tree.trv(hist_arr, 1, progress))
123✔
1045
            return;
×
1046
    }
123✔
1047
    progress.clear();
123✔
1048
}
123✔
1049

1050
} // namespace realm
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc